API Monitoring in Python: Tools, Libraries, and Best Practices for 2026
Python is everywhere in API integration work. Data pipelines, automation scripts, backend services, ML model serving, ETL jobs — most of these depend on external APIs, and most of them don't have adequate monitoring.
This guide covers the full range of API monitoring approaches in Python: from quick health check scripts you can write in 15 minutes to production-grade monitoring with proper alerting, schema validation, and drift detection. We'll also cover where rolling your own monitoring makes sense versus using a dedicated tool like Rumbliq.
The Simplest Possible Python API Health Check
Before we get into libraries and frameworks, here's the minimum viable API health check:
import httpx
import sys
from datetime import datetime
def check_api_health(url: str, timeout: float = 10.0) -> dict:
start = datetime.now()
try:
response = httpx.get(url, timeout=timeout)
duration_ms = (datetime.now() - start).total_seconds() * 1000
return {
"url": url,
"status": "healthy" if response.status_code < 400 else "degraded",
"status_code": response.status_code,
"response_time_ms": round(duration_ms, 2),
"timestamp": start.isoformat(),
}
except httpx.TimeoutException:
return {"url": url, "status": "timeout", "response_time_ms": None}
except httpx.RequestError as e:
return {"url": url, "status": "error", "error": str(e)}
if __name__ == "__main__":
result = check_api_health("https://api.example.com/health")
print(result)
sys.exit(0 if result["status"] == "healthy" else 1)
This is a good starting point. Run it with cron or a task scheduler and pipe failures to a notification channel. It takes 10 minutes to set up and will catch outright failures.
The limitations become apparent quickly: it only checks status codes, it doesn't validate response structure, it has no persistence, and it doesn't handle authentication.
Choosing Your HTTP Library
Python's HTTP landscape in 2026 has a clear modern choice:
httpx (httpx>=0.28) — The modern standard. Async-native, HTTP/2 support, clean API, synchronous interface when you need it, proper connection pooling. Use this for new projects.
pip install httpx
requests — Ubiquitous but synchronous-only and not actively gaining new features. Still fine for simple scripts. If you're maintaining existing code that uses requests, there's no urgent reason to migrate.
aiohttp — Full async HTTP client/server. More complex API than httpx. Prefer httpx unless you specifically need something aiohttp provides.
For API monitoring scripts, httpx is the right choice. It handles async naturally (important for checking multiple endpoints concurrently), has a clean error hierarchy, and supports HTTP/2 for modern APIs.
Async Health Checks: Monitoring Multiple Endpoints Concurrently
Checking endpoints sequentially gets slow when you're monitoring many APIs. Async lets you check all of them in parallel:
import asyncio
import httpx
from dataclasses import dataclass
from datetime import datetime
@dataclass
class HealthResult:
url: str
status: str
status_code: int | None
response_time_ms: float | None
error: str | None = None
timestamp: str = ""
async def check_endpoint(
client: httpx.AsyncClient,
url: str,
headers: dict | None = None,
timeout: float = 10.0
) -> HealthResult:
start = datetime.now()
try:
response = await client.get(url, headers=headers or {}, timeout=timeout)
duration_ms = (datetime.now() - start).total_seconds() * 1000
return HealthResult(
url=url,
status="healthy" if response.status_code < 400 else "degraded",
status_code=response.status_code,
response_time_ms=round(duration_ms, 2),
timestamp=start.isoformat(),
)
except httpx.TimeoutException:
return HealthResult(url=url, status="timeout", status_code=None, response_time_ms=None)
except httpx.RequestError as e:
return HealthResult(url=url, status="error", status_code=None, response_time_ms=None, error=str(e))
async def check_all(endpoints: list[dict]) -> list[HealthResult]:
async with httpx.AsyncClient() as client:
tasks = [
check_endpoint(client, ep["url"], ep.get("headers"))
for ep in endpoints
]
return await asyncio.gather(*tasks)
if __name__ == "__main__":
endpoints = [
{"url": "https://api.stripe.com/v1/charges", "headers": {"Authorization": "Bearer sk_test_..."}},
{"url": "https://api.twilio.com/2010-04-01/Accounts.json"},
{"url": "https://api.sendgrid.com/v3/user/profile"},
]
results = asyncio.run(check_all(endpoints))
for r in results:
print(f"{r.url}: {r.status} ({r.response_time_ms}ms)")
This checks all three endpoints simultaneously. With 10 endpoints at 500ms average response time, sequential checking takes 5 seconds; async checking takes ~500ms.
Schema Validation: Checking Response Structure
Status codes tell you if the API responded. Schema validation tells you if it responded correctly.
Python has excellent schema validation libraries. Pydantic v2 is the current standard for structured data validation:
from pydantic import BaseModel, ValidationError
import httpx
class StripeCustomer(BaseModel):
id: str
email: str
name: str | None
created: int
livemode: bool
metadata: dict
def validate_stripe_customer(customer_id: str, api_key: str) -> tuple[bool, str]:
response = httpx.get(
f"https://api.stripe.com/v1/customers/{customer_id}",
headers={"Authorization": f"Bearer {api_key}"}
)
if response.status_code != 200:
return False, f"HTTP {response.status_code}"
try:
customer = StripeCustomer.model_validate(response.json())
return True, f"Valid: {customer.email}"
except ValidationError as e:
return False, f"Schema validation failed: {e}"
ok, message = validate_stripe_customer("cus_test123", "sk_test_...")
print(f"Stripe API: {'OK' if ok else 'FAIL'} — {message}")
Pydantic catches:
- Missing required fields
- Type mismatches (API returns a string where you expect an int)
- Extra or unexpected fields (configurable)
The limitation: you have to write and maintain the Pydantic models. When the API changes, you update the model. This works well for your own APIs and for stable third-party APIs with good documentation.
For rapidly-changing or poorly-documented third-party APIs, maintaining Pydantic models becomes burdensome — and you still have to know the change happened before you can update the model.
Schema Drift Detection: Catching Changes Automatically
Rather than maintaining explicit schemas, schema drift detection infers the current structure and alerts when it changes. This is the approach Rumbliq takes, and you can approximate it in Python:
import json
import hashlib
from pathlib import Path
from typing import Any
def extract_schema(data: Any, path: str = "root") -> dict:
"""Recursively extract the schema structure (field names and types) from JSON data."""
if isinstance(data, dict):
return {
"type": "object",
"fields": {
key: extract_schema(value, f"{path}.{key}")
for key, value in data.items()
}
}
elif isinstance(data, list):
if data:
return {"type": "array", "items": extract_schema(data[0], f"{path}[0]")}
return {"type": "array", "items": None}
elif isinstance(data, bool):
return {"type": "boolean"}
elif isinstance(data, int):
return {"type": "integer"}
elif isinstance(data, float):
return {"type": "number"}
elif isinstance(data, str):
return {"type": "string"}
elif data is None:
return {"type": "null"}
return {"type": "unknown"}
def schema_fingerprint(schema: dict) -> str:
return hashlib.sha256(json.dumps(schema, sort_keys=True).encode()).hexdigest()
class SchemaDriftDetector:
def __init__(self, baseline_dir: str = ".schema_baselines"):
self.baseline_dir = Path(baseline_dir)
self.baseline_dir.mkdir(exist_ok=True)
def check(self, endpoint_id: str, response_data: Any) -> dict:
current_schema = extract_schema(response_data)
current_fingerprint = schema_fingerprint(current_schema)
baseline_path = self.baseline_dir / f"{endpoint_id}.json"
if not baseline_path.exists():
# First run — store as baseline
baseline_path.write_text(json.dumps({
"schema": current_schema,
"fingerprint": current_fingerprint,
}, indent=2))
return {"status": "baseline_established", "drift": False}
stored = json.loads(baseline_path.read_text())
if stored["fingerprint"] == current_fingerprint:
return {"status": "ok", "drift": False}
# Schema changed — compute diff
drift_details = self._diff_schemas(stored["schema"], current_schema)
return {
"status": "drift_detected",
"drift": True,
"changes": drift_details,
}
def _diff_schemas(self, baseline: dict, current: dict, path: str = "") -> list[str]:
changes = []
if baseline.get("type") != current.get("type"):
changes.append(f"Type changed at '{path}': {baseline.get('type')} → {current.get('type')}")
return changes
if baseline.get("type") == "object":
baseline_fields = set(baseline.get("fields", {}).keys())
current_fields = set(current.get("fields", {}).keys())
for removed in baseline_fields - current_fields:
changes.append(f"Field removed: '{path}.{removed}'")
for added in current_fields - baseline_fields:
changes.append(f"Field added: '{path}.{added}'")
for common in baseline_fields & current_fields:
changes.extend(self._diff_schemas(
baseline["fields"][common],
current["fields"][common],
f"{path}.{common}"
))
return changes
Use it like this:
import httpx
detector = SchemaDriftDetector()
response = httpx.get("https://api.example.com/users/1")
data = response.json()
result = detector.check("example-users-detail", data)
if result["drift"]:
print("SCHEMA DRIFT DETECTED:")
for change in result["changes"]:
print(f" - {change}")
# Send alert here
This approach:
- Requires no manual schema definitions
- Automatically tracks all fields in the response
- Detects removals, additions, and type changes
- Persists baselines between runs
The tradeoff versus a dedicated tool like Rumbliq: you have to manage the scheduler, storage, alerting, and runbook yourself. For a few internal scripts, this is fine. For production monitoring of 10+ third-party APIs, the maintenance overhead adds up.
Adding Alerting to Python Monitoring Scripts
A monitoring script without alerting is a report nobody reads. Here are the common alerting patterns:
Slack webhook:
import httpx
def alert_slack(webhook_url: str, message: str, severity: str = "warning") -> None:
color_map = {"info": "#36a64f", "warning": "#ff9900", "critical": "#ff0000"}
payload = {
"attachments": [{
"color": color_map.get(severity, "#cccccc"),
"text": message,
"footer": "API Monitor",
}]
}
httpx.post(webhook_url, json=payload, timeout=5.0)
PagerDuty Events API v2:
import httpx
def trigger_pagerduty(routing_key: str, summary: str, source: str, severity: str = "error") -> None:
httpx.post(
"https://events.pagerduty.com/v2/enqueue",
json={
"routing_key": routing_key,
"event_action": "trigger",
"payload": {
"summary": summary,
"source": source,
"severity": severity, # critical, error, warning, info
}
},
timeout=5.0
)
Email via Resend (the modern SMTP alternative):
import httpx
def alert_email(api_key: str, to: str, subject: str, body: str) -> None:
httpx.post(
"https://api.resend.com/emails",
headers={"Authorization": f"Bearer {api_key}"},
json={
"from": "[email protected]",
"to": [to],
"subject": subject,
"text": body,
},
timeout=10.0,
)
Scheduling Python Health Checks
Cron — simplest option. Works well if you have a server:
*/5 * * * * /usr/bin/python3 /opt/monitoring/check_apis.py >> /var/log/api-monitor.log 2>&1
APScheduler — if you want scheduling in-process:
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.interval import IntervalTrigger
scheduler = BlockingScheduler()
@scheduler.scheduled_job(IntervalTrigger(minutes=5))
def run_checks():
# your check logic here
pass
scheduler.start()
Celery + Redis/Redis — for distributed monitoring with multiple workers, retry logic, and task persistence. More setup, more power:
from celery import Celery
app = Celery("monitor", broker="redis://localhost:6379/0")
@app.task
def check_endpoint(url: str, headers: dict) -> dict:
# check logic
pass
# Schedule via Celery Beat
app.conf.beat_schedule = {
"check-stripe-api": {
"task": "monitor.check_endpoint",
"schedule": 300.0, # every 5 minutes
"args": ["https://api.stripe.com/v1/charges", {"Authorization": "Bearer sk_..."}],
},
}
GitHub Actions — increasingly popular for lightweight monitoring. Free for public repos, and the YAML-based scheduling (schedule: cron: '*/15 * * * *') is simple:
name: API Health Check
on:
schedule:
- cron: '*/15 * * * *'
jobs:
check:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.13'
- run: pip install httpx pydantic
- run: python check_apis.py
env:
STRIPE_API_KEY: ${{ secrets.STRIPE_API_KEY }}
This is lightweight and free, but GitHub Actions has a minimum cron interval of 5 minutes (and in practice, scheduled workflows often have delays of 5-10 minutes). It's not suitable for sub-minute monitoring.
When to Roll Your Own vs. Use a Dedicated Tool
Custom Python monitoring makes sense when:
- You have highly specific validation logic that no off-the-shelf tool supports
- You need to integrate monitoring into an existing Python codebase
- You're monitoring internal endpoints with custom authentication that's hard to configure externally
- You want monitoring logic that triggers actions (not just alerts), like automatic cache invalidation
A dedicated tool like Rumbliq makes more sense when:
- You're monitoring third-party APIs you don't control
- You want automatic schema drift detection without writing schemas
- You need a UI for browsing historical checks and viewing diffs
- You want multi-channel alerting (Slack, PagerDuty, email, webhook) without plumbing
- You need monitoring to run from external locations (not your own infrastructure)
- You have 10+ endpoints to monitor and don't want to maintain a custom scheduler
The honest answer for most teams: use both. Python scripts for integration-specific validation that requires custom logic. Rumbliq for the broader monitoring coverage — especially schema drift detection on third-party dependencies.
A Complete Python API Monitoring Script
Here's a production-ready starter that incorporates the patterns above:
#!/usr/bin/env python3
"""
Simple API health monitor with Slack alerting and schema drift detection.
Usage: python monitor.py
Schedule with cron: */5 * * * * /usr/bin/python3 /opt/monitor/monitor.py
"""
import asyncio
import json
import os
import httpx
from datetime import datetime, timezone
from pathlib import Path
SLACK_WEBHOOK = os.environ["SLACK_WEBHOOK_URL"]
BASELINES_DIR = Path(".baselines")
BASELINES_DIR.mkdir(exist_ok=True)
ENDPOINTS = [
{
"id": "stripe-charges",
"url": "https://api.stripe.com/v1/charges?limit=1",
"headers": {"Authorization": f"Bearer {os.environ.get('STRIPE_API_KEY', '')}"},
"timeout": 10.0,
},
{
"id": "internal-api-health",
"url": "https://api.yourapp.com/health",
"headers": {},
"timeout": 5.0,
},
]
def extract_schema(data, depth=0, max_depth=5):
if depth >= max_depth:
return {"type": "truncated"}
if isinstance(data, dict):
return {"type": "object", "fields": {k: extract_schema(v, depth+1) for k, v in data.items()}}
elif isinstance(data, list):
return {"type": "array", "items": extract_schema(data[0], depth+1) if data else None}
elif isinstance(data, bool):
return {"type": "boolean"}
elif isinstance(data, int):
return {"type": "integer"}
elif isinstance(data, float):
return {"type": "number"}
elif isinstance(data, str):
return {"type": "string"}
return {"type": "null"}
async def check_endpoint(client: httpx.AsyncClient, endpoint: dict) -> dict:
start = datetime.now(tz=timezone.utc)
try:
r = await client.get(endpoint["url"], headers=endpoint["headers"], timeout=endpoint["timeout"])
duration_ms = (datetime.now(tz=timezone.utc) - start).total_seconds() * 1000
result = {
"id": endpoint["id"],
"status": "ok" if r.status_code < 400 else "error",
"status_code": r.status_code,
"response_time_ms": round(duration_ms, 1),
"drift": False,
"drift_changes": [],
}
if r.status_code == 200:
try:
data = r.json()
schema = extract_schema(data)
schema_str = json.dumps(schema, sort_keys=True)
baseline_file = BASELINES_DIR / f"{endpoint['id']}.json"
if baseline_file.exists():
stored = baseline_file.read_text()
if stored != schema_str:
result["drift"] = True
result["drift_changes"] = ["Schema structure changed"]
else:
baseline_file.write_text(schema_str)
except Exception:
pass
return result
except httpx.TimeoutException:
return {"id": endpoint["id"], "status": "timeout", "status_code": None, "response_time_ms": None, "drift": False, "drift_changes": []}
except Exception as e:
return {"id": endpoint["id"], "status": "error", "error": str(e), "status_code": None, "response_time_ms": None, "drift": False, "drift_changes": []}
async def send_slack_alert(message: str, color: str = "#ff0000"):
async with httpx.AsyncClient() as client:
await client.post(SLACK_WEBHOOK, json={"attachments": [{"color": color, "text": message}]}, timeout=5.0)
async def main():
async with httpx.AsyncClient() as client:
results = await asyncio.gather(*[check_endpoint(client, ep) for ep in ENDPOINTS])
for result in results:
if result["status"] != "ok":
await send_slack_alert(
f":red_circle: *{result['id']}* is {result['status']} "
f"(HTTP {result.get('status_code', 'N/A')}, {result.get('response_time_ms', 'N/A')}ms)",
color="#ff0000"
)
elif result["drift"]:
await send_slack_alert(
f":warning: *{result['id']}* schema drift detected!\n"
+ "\n".join(f" • {c}" for c in result["drift_changes"]),
color="#ff9900"
)
else:
print(f"OK: {result['id']} ({result['response_time_ms']}ms)")
if __name__ == "__main__":
asyncio.run(main())
Summary
Python gives you excellent tools for API monitoring: httpx for async HTTP, pydantic for schema validation, APScheduler or Celery for scheduling, and straightforward integration with Slack, PagerDuty, and email services for alerting.
For lightweight monitoring of a small number of endpoints — especially internal APIs or integrations that need custom validation logic — a Python script is fast to write and easy to maintain.
For broader coverage of third-party API integrations, especially where schema drift detection matters, a dedicated tool like Rumbliq handles the plumbing (scheduling, baseline storage, diff computation, multi-channel alerting) so your Python code can focus on application logic rather than monitoring infrastructure.
The right answer is usually both: Python for custom integration-specific checks, and a dedicated monitoring service for the schema drift detection that's hard to build and maintain yourself.
Related Posts
Start monitoring your APIs free → — 25 monitors, 3 sequences, no credit card required.