Overview
This example demonstrates robust error handling patterns:- Comprehensive error classification - Differentiate between retriable and non-retriable errors
- Intelligent retry strategies - Exponential backoff, circuit breakers, and custom retry logic
- Error monitoring & alerting - Track error rates, patterns, and system health
- Graceful degradation - Fallback mechanisms when services are unavailable
- Error aggregation - Collect and analyze error patterns for improvement
- Recovery workflows - Automatic and manual recovery processes
Task Definitions
Copy
Ask AI
from hyrex import HyrexRegistry, HyrexKV, get_hyrex_context
from pydantic import BaseModel
from typing import Dict, Any, Optional, List
import traceback
import logging
import time
import json
from datetime import datetime, timedelta
from enum import Enum
hy = HyrexRegistry()
logger = logging.getLogger(__name__)
class ErrorSeverity(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class ErrorCategory(Enum):
VALIDATION = "validation"
NETWORK = "network"
DATABASE = "database"
EXTERNAL_API = "external_api"
BUSINESS_LOGIC = "business_logic"
SYSTEM = "system"
class MonitoredTaskContext(BaseModel):
operation: str
data: Dict[str, Any]
retry_config: Optional[Dict[str, Any]] = None
circuit_breaker_key: Optional[str] = None
class TaskError(Exception):
def __init__(self, message: str, category: ErrorCategory, severity: ErrorSeverity,
retriable: bool = True, details: Optional[Dict] = None):
super().__init__(message)
self.category = category
self.severity = severity
self.retriable = retriable
self.details = details or {}
class ValidationError(TaskError):
def __init__(self, message: str, details: Optional[Dict] = None):
super().__init__(message, ErrorCategory.VALIDATION, ErrorSeverity.MEDIUM, False, details)
class ExternalAPIError(TaskError):
def __init__(self, message: str, status_code: int, details: Optional[Dict] = None):
severity = ErrorSeverity.HIGH if status_code >= 500 else ErrorSeverity.MEDIUM
retriable = status_code >= 500 or status_code in [429, 408] # Server errors and rate limits
super().__init__(message, ErrorCategory.EXTERNAL_API, severity, retriable,
{**(details or {}), "status_code": status_code})
@hy.task(max_retries=5, retry_backoff=True)
def monitored_task(context: MonitoredTaskContext):
"""Task with comprehensive error handling and monitoring"""
start_time = time.time()
task_context = get_hyrex_context()
operation = context.operation
# Check circuit breaker if configured
if context.circuit_breaker_key:
circuit_state = check_circuit_breaker(context.circuit_breaker_key)
if circuit_state == "OPEN":
raise TaskError(
f"Circuit breaker OPEN for {context.circuit_breaker_key}",
ErrorCategory.SYSTEM,
ErrorSeverity.HIGH,
retriable=False
)
try:
# Track task execution metrics
update_task_metrics(operation, "started")
# Perform the actual operation
result = perform_operation(context.operation, context.data)
# Record success metrics
duration = time.time() - start_time
record_task_success(operation, duration, task_context)
# Reset circuit breaker on success
if context.circuit_breaker_key:
reset_circuit_breaker(context.circuit_breaker_key)
return {
"success": True,
"result": result,
"duration": duration,
"operation": operation
}
except ValidationError as e:
# Don't retry validation errors - log and return error response
error_details = log_task_error(operation, e, task_context, retriable=False)
return {
"success": False,
"error_type": "validation_error",
"error_message": str(e),
"details": e.details,
"error_id": error_details["error_id"]
}
except ExternalAPIError as e:
# Handle external API errors with circuit breaker logic
if context.circuit_breaker_key:
record_circuit_breaker_failure(context.circuit_breaker_key)
error_details = log_task_error(operation, e, task_context, retriable=e.retriable)
if e.retriable:
# Let Hyrex handle the retry
raise e
else:
return {
"success": False,
"error_type": "external_api_error",
"error_message": str(e),
"status_code": e.details.get("status_code"),
"error_id": error_details["error_id"]
}
except Exception as e:
# Handle unexpected errors
error_details = log_task_error(operation, e, task_context, retriable=True)
# Record for circuit breaker if configured
if context.circuit_breaker_key:
record_circuit_breaker_failure(context.circuit_breaker_key)
# Send critical error alert
if isinstance(e, TaskError) and e.severity == ErrorSeverity.CRITICAL:
send_critical_error_alert.send(operation, str(e), error_details)
raise e # Let Hyrex handle retries
@hy.task(max_retries=3)
def resilient_external_api_call(url: str, method: str = "GET", data: Optional[Dict] = None,
timeout: int = 30, circuit_breaker_key: Optional[str] = None):
"""Make external API call with comprehensive error handling"""
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
# Check circuit breaker
if circuit_breaker_key:
circuit_state = check_circuit_breaker(circuit_breaker_key)
if circuit_state == "OPEN":
raise ExternalAPIError("Service unavailable - circuit breaker OPEN", 503)
# Configure requests with retry strategy
session = requests.Session()
retry_strategy = Retry(
total=3,
status_forcelist=[429, 500, 502, 503, 504],
method_whitelist=["HEAD", "GET", "OPTIONS"],
backoff_factor=1
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
try:
response = session.request(
method=method,
url=url,
json=data,
timeout=timeout,
headers={"User-Agent": "HyrexApp/1.0"}
)
# Check for HTTP errors
if response.status_code >= 400:
raise ExternalAPIError(
f"API request failed with status {response.status_code}",
response.status_code,
{"url": url, "response_body": response.text[:1000]}
)
# Reset circuit breaker on success
if circuit_breaker_key:
reset_circuit_breaker(circuit_breaker_key)
return {
"status_code": response.status_code,
"data": response.json() if response.headers.get('content-type', '').startswith('application/json') else response.text,
"headers": dict(response.headers)
}
except requests.exceptions.Timeout:
if circuit_breaker_key:
record_circuit_breaker_failure(circuit_breaker_key)
raise ExternalAPIError("Request timeout", 408, {"url": url, "timeout": timeout})
except requests.exceptions.ConnectionError as e:
if circuit_breaker_key:
record_circuit_breaker_failure(circuit_breaker_key)
raise ExternalAPIError("Connection failed", 503, {"url": url, "error": str(e)})
except requests.exceptions.RequestException as e:
if circuit_breaker_key:
record_circuit_breaker_failure(circuit_breaker_key)
raise ExternalAPIError(f"Request failed: {str(e)}", 500, {"url": url})
@hy.task
def error_recovery_workflow(error_id: str, recovery_strategy: str):
"""Handle error recovery workflows"""
# Get error details
error_details = get_error_details(error_id)
if not error_details:
raise ValueError(f"Error {error_id} not found")
try:
if recovery_strategy == "retry_with_fallback":
# Retry original operation with fallback data
fallback_data = get_fallback_data(error_details["operation"])
result = perform_operation(error_details["operation"], fallback_data)
elif recovery_strategy == "manual_intervention":
# Create ticket for manual intervention
ticket_id = create_support_ticket(error_details)
result = {"ticket_id": ticket_id, "status": "manual_intervention_required"}
elif recovery_strategy == "alternative_workflow":
# Execute alternative workflow
result = execute_alternative_workflow(error_details)
else:
raise ValueError(f"Unknown recovery strategy: {recovery_strategy}")
# Mark error as recovered
mark_error_recovered(error_id, recovery_strategy, result)
return {
"error_id": error_id,
"recovery_strategy": recovery_strategy,
"recovered": True,
"result": result
}
except Exception as e:
# Recovery failed
mark_recovery_failed(error_id, recovery_strategy, str(e))
raise TaskError(
f"Recovery failed for error {error_id}: {str(e)}",
ErrorCategory.SYSTEM,
ErrorSeverity.HIGH
)
def log_task_error(operation: str, error: Exception, task_context: Any, retriable: bool = True) -> Dict:
"""Log detailed error information"""
error_id = f"err_{int(time.time())}_{hash(str(error)) % 10000}"
error_data = {
"error_id": error_id,
"operation": operation,
"error_type": type(error).__name__,
"error_message": str(error),
"retriable": retriable,
"timestamp": datetime.now().isoformat(),
"task_id": getattr(task_context, 'task_id', 'unknown'),
"attempt_number": getattr(task_context, 'attempt_number', 0),
"traceback": traceback.format_exc()
}
# Add error-specific details
if isinstance(error, TaskError):
error_data.update({
"category": error.category.value,
"severity": error.severity.value,
"details": error.details
})
# Store error for analysis
HyrexKV.set(
f"error:{error_id}",
json.dumps(error_data),
expiry_seconds=604800 # 7 days
)
# Update error metrics
update_error_metrics(operation, error_data)
# Log to application logs
logger.error(f"Task error in {operation}: {error_data}")
return error_data
def update_error_metrics(operation: str, error_data: Dict):
"""Update error tracking metrics"""
# Update error counts
error_key = f"error_metrics:{operation}"
try:
metrics_data = HyrexKV.get(error_key)
metrics = json.loads(metrics_data) if metrics_data else {
"total_errors": 0,
"error_types": {},
"error_categories": {},
"hourly_counts": {}
}
except:
metrics = {"total_errors": 0, "error_types": {}, "error_categories": {}, "hourly_counts": {}}
# Update counters
metrics["total_errors"] += 1
error_type = error_data["error_type"]
metrics["error_types"][error_type] = metrics["error_types"].get(error_type, 0) + 1
if "category" in error_data:
category = error_data["category"]
metrics["error_categories"][category] = metrics["error_categories"].get(category, 0) + 1
# Update hourly counts for trend analysis
hour_key = datetime.now().strftime("%Y-%m-%d-%H")
metrics["hourly_counts"][hour_key] = metrics["hourly_counts"].get(hour_key, 0) + 1
# Store updated metrics
HyrexKV.set(error_key, json.dumps(metrics), expiry_seconds=2592000) # 30 days
def check_circuit_breaker(key: str) -> str:
"""Check circuit breaker state"""
cb_key = f"circuit_breaker:{key}"
try:
cb_data = HyrexKV.get(cb_key)
if not cb_data:
return "CLOSED"
cb_state = json.loads(cb_data)
# Check if circuit breaker should be reset
if cb_state["state"] == "OPEN":
if time.time() - cb_state["opened_at"] > cb_state.get("timeout", 60):
cb_state["state"] = "HALF_OPEN"
HyrexKV.set(cb_key, json.dumps(cb_state), expiry_seconds=3600)
return cb_state["state"]
except:
return "CLOSED"
def record_circuit_breaker_failure(key: str):
"""Record a failure for circuit breaker logic"""
cb_key = f"circuit_breaker:{key}"
try:
cb_data = HyrexKV.get(cb_key)
if cb_data:
cb_state = json.loads(cb_data)
else:
cb_state = {
"failures": 0,
"state": "CLOSED",
"threshold": 5,
"timeout": 60
}
cb_state["failures"] += 1
cb_state["last_failure"] = time.time()
# Open circuit breaker if threshold exceeded
if cb_state["failures"] >= cb_state["threshold"] and cb_state["state"] != "OPEN":
cb_state["state"] = "OPEN"
cb_state["opened_at"] = time.time()
# Send circuit breaker alert
send_circuit_breaker_alert.send(key, cb_state)
HyrexKV.set(cb_key, json.dumps(cb_state), expiry_seconds=3600)
except Exception as e:
logger.error(f"Failed to update circuit breaker {key}: {e}")
def reset_circuit_breaker(key: str):
"""Reset circuit breaker on successful operation"""
cb_key = f"circuit_breaker:{key}"
try:
cb_data = HyrexKV.get(cb_key)
if cb_data:
cb_state = json.loads(cb_data)
cb_state["failures"] = 0
cb_state["state"] = "CLOSED"
HyrexKV.set(cb_key, json.dumps(cb_state), expiry_seconds=3600)
except:
pass
@hy.task
def send_critical_error_alert(operation: str, error_message: str, error_details: Dict):
"""Send alert for critical errors"""
alert_message = f"""
🚨 CRITICAL ERROR ALERT 🚨
Operation: {operation}
Error: {error_message}
Error ID: {error_details.get('error_id')}
Task ID: {error_details.get('task_id')}
Time: {error_details.get('timestamp')}
Immediate attention required!
"""
# Send to multiple channels
send_slack_alert(alert_message, channel="#alerts")
send_email_alert("Critical System Error", alert_message, ["oncall@company.com"])
return {"alert_sent": True, "error_id": error_details.get('error_id')}
@hy.task
def send_circuit_breaker_alert(key: str, cb_state: Dict):
"""Send alert when circuit breaker opens"""
alert_message = f"""
⚡ Circuit Breaker OPEN ⚡
Service: {key}
Failure Count: {cb_state['failures']}
Threshold: {cb_state['threshold']}
Opened At: {datetime.fromtimestamp(cb_state['opened_at']).isoformat()}
Service calls are being blocked to prevent cascade failures.
"""
send_slack_alert(alert_message, channel="#infrastructure")
return {"alert_sent": True, "circuit_breaker": key}
@hy.task
def generate_error_report(operation: Optional[str] = None, hours: int = 24):
"""Generate comprehensive error analysis report"""
end_time = datetime.now()
start_time = end_time - timedelta(hours=hours)
# Collect error data
if operation:
operations = [operation]
else:
operations = get_all_monitored_operations()
report_data = {
"period": {
"start": start_time.isoformat(),
"end": end_time.isoformat(),
"hours": hours
},
"operations": {},
"summary": {
"total_errors": 0,
"most_common_errors": {},
"error_trends": {},
"recovery_stats": {}
}
}
for op in operations:
error_metrics = get_error_metrics(op, start_time, end_time)
if error_metrics:
report_data["operations"][op] = error_metrics
report_data["summary"]["total_errors"] += error_metrics.get("total_errors", 0)
# Store report
report_id = f"error_report_{int(time.time())}"
HyrexKV.set(
f"report:{report_id}",
json.dumps(report_data),
expiry_seconds=2592000 # 30 days
)
# Send report to stakeholders if errors are above threshold
if report_data["summary"]["total_errors"] > 10:
send_error_report_email.send(report_id, report_data)
return {
"report_id": report_id,
"total_errors": report_data["summary"]["total_errors"],
"operations_analyzed": len(operations)
}
# Helper functions (simplified implementations)
def perform_operation(operation: str, data: Dict) -> Any:
"""Perform the actual operation - placeholder"""
if operation == "test_validation_error":
raise ValidationError("Invalid input data", {"field": "email"})
elif operation == "test_api_error":
raise ExternalAPIError("API service unavailable", 503)
return {"success": True, "data": data}
def update_task_metrics(operation: str, status: str):
"""Update task execution metrics"""
pass
def record_task_success(operation: str, duration: float, task_context: Any):
"""Record successful task execution"""
pass
def get_fallback_data(operation: str) -> Dict:
"""Get fallback data for recovery"""
return {"fallback": True}
def execute_alternative_workflow(error_details: Dict) -> Dict:
"""Execute alternative workflow"""
return {"alternative_executed": True}
Usage Examples
Basic Error Handling
Copy
Ask AI
# Process task with error handling
curl -X POST http://localhost:8000/tasks/monitored \
-H "Content-Type: application/json" \
-d '{
"operation": "process_user_data",
"data": {"user_id": 123, "email": "test@example.com"},
"circuit_breaker_key": "user_service"
}'
# Make resilient API call
curl -X POST http://localhost:8000/tasks/api-call \
-H "Content-Type: application/json" \
-d '{
"url": "https://api.external.com/data",
"method": "GET",
"circuit_breaker_key": "external_api"
}'
Error Recovery
Copy
Ask AI
# Trigger error recovery workflow
curl -X POST http://localhost:8000/recovery/start \
-H "Content-Type: application/json" \
-d '{
"error_id": "err_1234567890_5678",
"recovery_strategy": "retry_with_fallback"
}'
# Generate error report
curl -X POST http://localhost:8000/reports/errors \
-H "Content-Type: application/json" \
-d '{
"operation": "process_payments",
"hours": 48
}'
Circuit Breaker Configuration
Copy
Ask AI
# Configure circuit breaker thresholds
CIRCUIT_BREAKER_CONFIG = {
"payment_service": {
"threshold": 3, # Open after 3 failures
"timeout": 30, # Stay open for 30 seconds
"half_open_max": 1 # Allow 1 request in half-open state
},
"external_api": {
"threshold": 5,
"timeout": 60,
"half_open_max": 2
}
}
Monitoring Dashboard
Copy
Ask AI
@hy.task
def health_check_dashboard():
"""Generate system health dashboard data"""
services = ["payment_service", "user_service", "external_api"]
dashboard_data = {
"timestamp": datetime.now().isoformat(),
"services": {}
}
for service in services:
# Check circuit breaker status
cb_status = check_circuit_breaker(service)
# Get error metrics
error_metrics = get_recent_error_metrics(service, hours=1)
dashboard_data["services"][service] = {
"circuit_breaker_status": cb_status,
"error_rate": error_metrics.get("error_rate", 0),
"recent_errors": error_metrics.get("total_errors", 0),
"availability": calculate_availability(service)
}
return dashboard_data
Production Considerations
- Error classification: Properly categorize errors for appropriate handling
- Circuit breaker tuning: Configure thresholds based on service characteristics
- Alert fatigue: Implement intelligent alerting to avoid noise
- Error aggregation: Use tools like Sentry or Rollbar for error tracking
- Performance impact: Monitor overhead of error handling mechanisms
- Recovery testing: Regularly test error recovery workflows