Skip to main content
Build resilient systems with comprehensive error handling, intelligent retry mechanisms, circuit breakers, and detailed error monitoring.

Overview

This example demonstrates robust error handling patterns:
  • Comprehensive error classification - Differentiate between retriable and non-retriable errors
  • Intelligent retry strategies - Exponential backoff, circuit breakers, and custom retry logic
  • Error monitoring & alerting - Track error rates, patterns, and system health
  • Graceful degradation - Fallback mechanisms when services are unavailable
  • Error aggregation - Collect and analyze error patterns for improvement
  • Recovery workflows - Automatic and manual recovery processes
Perfect for building fault-tolerant systems, API integrations, or any application requiring high reliability.

Task Definitions

from hyrex import HyrexRegistry, HyrexKV, get_hyrex_context
from pydantic import BaseModel
from typing import Dict, Any, Optional, List
import traceback
import logging
import time
import json
from datetime import datetime, timedelta
from enum import Enum

hy = HyrexRegistry()
logger = logging.getLogger(__name__)

class ErrorSeverity(Enum):
    LOW = "low"
    MEDIUM = "medium"  
    HIGH = "high"
    CRITICAL = "critical"

class ErrorCategory(Enum):
    VALIDATION = "validation"
    NETWORK = "network"
    DATABASE = "database"
    EXTERNAL_API = "external_api"
    BUSINESS_LOGIC = "business_logic"
    SYSTEM = "system"

class MonitoredTaskContext(BaseModel):
    operation: str
    data: Dict[str, Any]
    retry_config: Optional[Dict[str, Any]] = None
    circuit_breaker_key: Optional[str] = None

class TaskError(Exception):
    def __init__(self, message: str, category: ErrorCategory, severity: ErrorSeverity, 
                 retriable: bool = True, details: Optional[Dict] = None):
        super().__init__(message)
        self.category = category
        self.severity = severity
        self.retriable = retriable
        self.details = details or {}

class ValidationError(TaskError):
    def __init__(self, message: str, details: Optional[Dict] = None):
        super().__init__(message, ErrorCategory.VALIDATION, ErrorSeverity.MEDIUM, False, details)

class ExternalAPIError(TaskError):
    def __init__(self, message: str, status_code: int, details: Optional[Dict] = None):
        severity = ErrorSeverity.HIGH if status_code >= 500 else ErrorSeverity.MEDIUM
        retriable = status_code >= 500 or status_code in [429, 408]  # Server errors and rate limits
        super().__init__(message, ErrorCategory.EXTERNAL_API, severity, retriable, 
                        {**(details or {}), "status_code": status_code})

@hy.task(max_retries=5, retry_backoff=True)
def monitored_task(context: MonitoredTaskContext):
    """Task with comprehensive error handling and monitoring"""
    
    start_time = time.time()
    task_context = get_hyrex_context()
    operation = context.operation
    
    # Check circuit breaker if configured
    if context.circuit_breaker_key:
        circuit_state = check_circuit_breaker(context.circuit_breaker_key)
        if circuit_state == "OPEN":
            raise TaskError(
                f"Circuit breaker OPEN for {context.circuit_breaker_key}",
                ErrorCategory.SYSTEM,
                ErrorSeverity.HIGH,
                retriable=False
            )
    
    try:
        # Track task execution metrics
        update_task_metrics(operation, "started")
        
        # Perform the actual operation
        result = perform_operation(context.operation, context.data)
        
        # Record success metrics
        duration = time.time() - start_time
        record_task_success(operation, duration, task_context)
        
        # Reset circuit breaker on success
        if context.circuit_breaker_key:
            reset_circuit_breaker(context.circuit_breaker_key)
        
        return {
            "success": True,
            "result": result,
            "duration": duration,
            "operation": operation
        }
        
    except ValidationError as e:
        # Don't retry validation errors - log and return error response
        error_details = log_task_error(operation, e, task_context, retriable=False)
        return {
            "success": False,
            "error_type": "validation_error",
            "error_message": str(e),
            "details": e.details,
            "error_id": error_details["error_id"]
        }
        
    except ExternalAPIError as e:
        # Handle external API errors with circuit breaker logic
        if context.circuit_breaker_key:
            record_circuit_breaker_failure(context.circuit_breaker_key)
        
        error_details = log_task_error(operation, e, task_context, retriable=e.retriable)
        
        if e.retriable:
            # Let Hyrex handle the retry
            raise e
        else:
            return {
                "success": False,
                "error_type": "external_api_error", 
                "error_message": str(e),
                "status_code": e.details.get("status_code"),
                "error_id": error_details["error_id"]
            }
            
    except Exception as e:
        # Handle unexpected errors
        error_details = log_task_error(operation, e, task_context, retriable=True)
        
        # Record for circuit breaker if configured
        if context.circuit_breaker_key:
            record_circuit_breaker_failure(context.circuit_breaker_key)
        
        # Send critical error alert
        if isinstance(e, TaskError) and e.severity == ErrorSeverity.CRITICAL:
            send_critical_error_alert.send(operation, str(e), error_details)
        
        raise e  # Let Hyrex handle retries

@hy.task(max_retries=3)
def resilient_external_api_call(url: str, method: str = "GET", data: Optional[Dict] = None, 
                               timeout: int = 30, circuit_breaker_key: Optional[str] = None):
    """Make external API call with comprehensive error handling"""
    
    import requests
    from requests.adapters import HTTPAdapter
    from requests.packages.urllib3.util.retry import Retry
    
    # Check circuit breaker
    if circuit_breaker_key:
        circuit_state = check_circuit_breaker(circuit_breaker_key)
        if circuit_state == "OPEN":
            raise ExternalAPIError("Service unavailable - circuit breaker OPEN", 503)
    
    # Configure requests with retry strategy
    session = requests.Session()
    retry_strategy = Retry(
        total=3,
        status_forcelist=[429, 500, 502, 503, 504],
        method_whitelist=["HEAD", "GET", "OPTIONS"],
        backoff_factor=1
    )
    adapter = HTTPAdapter(max_retries=retry_strategy)
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    
    try:
        response = session.request(
            method=method,
            url=url,
            json=data,
            timeout=timeout,
            headers={"User-Agent": "HyrexApp/1.0"}
        )
        
        # Check for HTTP errors
        if response.status_code >= 400:
            raise ExternalAPIError(
                f"API request failed with status {response.status_code}",
                response.status_code,
                {"url": url, "response_body": response.text[:1000]}
            )
        
        # Reset circuit breaker on success
        if circuit_breaker_key:
            reset_circuit_breaker(circuit_breaker_key)
        
        return {
            "status_code": response.status_code,
            "data": response.json() if response.headers.get('content-type', '').startswith('application/json') else response.text,
            "headers": dict(response.headers)
        }
        
    except requests.exceptions.Timeout:
        if circuit_breaker_key:
            record_circuit_breaker_failure(circuit_breaker_key)
        raise ExternalAPIError("Request timeout", 408, {"url": url, "timeout": timeout})
    
    except requests.exceptions.ConnectionError as e:
        if circuit_breaker_key:
            record_circuit_breaker_failure(circuit_breaker_key)
        raise ExternalAPIError("Connection failed", 503, {"url": url, "error": str(e)})
    
    except requests.exceptions.RequestException as e:
        if circuit_breaker_key:
            record_circuit_breaker_failure(circuit_breaker_key)
        raise ExternalAPIError(f"Request failed: {str(e)}", 500, {"url": url})

@hy.task
def error_recovery_workflow(error_id: str, recovery_strategy: str):
    """Handle error recovery workflows"""
    
    # Get error details
    error_details = get_error_details(error_id)
    if not error_details:
        raise ValueError(f"Error {error_id} not found")
    
    try:
        if recovery_strategy == "retry_with_fallback":
            # Retry original operation with fallback data
            fallback_data = get_fallback_data(error_details["operation"])
            result = perform_operation(error_details["operation"], fallback_data)
            
        elif recovery_strategy == "manual_intervention":
            # Create ticket for manual intervention
            ticket_id = create_support_ticket(error_details)
            result = {"ticket_id": ticket_id, "status": "manual_intervention_required"}
            
        elif recovery_strategy == "alternative_workflow":
            # Execute alternative workflow
            result = execute_alternative_workflow(error_details)
            
        else:
            raise ValueError(f"Unknown recovery strategy: {recovery_strategy}")
        
        # Mark error as recovered
        mark_error_recovered(error_id, recovery_strategy, result)
        
        return {
            "error_id": error_id,
            "recovery_strategy": recovery_strategy,
            "recovered": True,
            "result": result
        }
        
    except Exception as e:
        # Recovery failed
        mark_recovery_failed(error_id, recovery_strategy, str(e))
        raise TaskError(
            f"Recovery failed for error {error_id}: {str(e)}",
            ErrorCategory.SYSTEM,
            ErrorSeverity.HIGH
        )

def log_task_error(operation: str, error: Exception, task_context: Any, retriable: bool = True) -> Dict:
    """Log detailed error information"""
    
    error_id = f"err_{int(time.time())}_{hash(str(error)) % 10000}"
    
    error_data = {
        "error_id": error_id,
        "operation": operation,
        "error_type": type(error).__name__,
        "error_message": str(error),
        "retriable": retriable,
        "timestamp": datetime.now().isoformat(),
        "task_id": getattr(task_context, 'task_id', 'unknown'),
        "attempt_number": getattr(task_context, 'attempt_number', 0),
        "traceback": traceback.format_exc()
    }
    
    # Add error-specific details
    if isinstance(error, TaskError):
        error_data.update({
            "category": error.category.value,
            "severity": error.severity.value,
            "details": error.details
        })
    
    # Store error for analysis
    HyrexKV.set(
        f"error:{error_id}",
        json.dumps(error_data),
        expiry_seconds=604800  # 7 days
    )
    
    # Update error metrics
    update_error_metrics(operation, error_data)
    
    # Log to application logs
    logger.error(f"Task error in {operation}: {error_data}")
    
    return error_data

def update_error_metrics(operation: str, error_data: Dict):
    """Update error tracking metrics"""
    
    # Update error counts
    error_key = f"error_metrics:{operation}"
    
    try:
        metrics_data = HyrexKV.get(error_key)
        metrics = json.loads(metrics_data) if metrics_data else {
            "total_errors": 0,
            "error_types": {},
            "error_categories": {},
            "hourly_counts": {}
        }
    except:
        metrics = {"total_errors": 0, "error_types": {}, "error_categories": {}, "hourly_counts": {}}
    
    # Update counters
    metrics["total_errors"] += 1
    error_type = error_data["error_type"]
    metrics["error_types"][error_type] = metrics["error_types"].get(error_type, 0) + 1
    
    if "category" in error_data:
        category = error_data["category"]
        metrics["error_categories"][category] = metrics["error_categories"].get(category, 0) + 1
    
    # Update hourly counts for trend analysis
    hour_key = datetime.now().strftime("%Y-%m-%d-%H")
    metrics["hourly_counts"][hour_key] = metrics["hourly_counts"].get(hour_key, 0) + 1
    
    # Store updated metrics
    HyrexKV.set(error_key, json.dumps(metrics), expiry_seconds=2592000)  # 30 days

def check_circuit_breaker(key: str) -> str:
    """Check circuit breaker state"""
    
    cb_key = f"circuit_breaker:{key}"
    
    try:
        cb_data = HyrexKV.get(cb_key)
        if not cb_data:
            return "CLOSED"
        
        cb_state = json.loads(cb_data)
        
        # Check if circuit breaker should be reset
        if cb_state["state"] == "OPEN":
            if time.time() - cb_state["opened_at"] > cb_state.get("timeout", 60):
                cb_state["state"] = "HALF_OPEN"
                HyrexKV.set(cb_key, json.dumps(cb_state), expiry_seconds=3600)
        
        return cb_state["state"]
        
    except:
        return "CLOSED"

def record_circuit_breaker_failure(key: str):
    """Record a failure for circuit breaker logic"""
    
    cb_key = f"circuit_breaker:{key}"
    
    try:
        cb_data = HyrexKV.get(cb_key)
        if cb_data:
            cb_state = json.loads(cb_data)
        else:
            cb_state = {
                "failures": 0,
                "state": "CLOSED",
                "threshold": 5,
                "timeout": 60
            }
        
        cb_state["failures"] += 1
        cb_state["last_failure"] = time.time()
        
        # Open circuit breaker if threshold exceeded
        if cb_state["failures"] >= cb_state["threshold"] and cb_state["state"] != "OPEN":
            cb_state["state"] = "OPEN"
            cb_state["opened_at"] = time.time()
            
            # Send circuit breaker alert
            send_circuit_breaker_alert.send(key, cb_state)
        
        HyrexKV.set(cb_key, json.dumps(cb_state), expiry_seconds=3600)
        
    except Exception as e:
        logger.error(f"Failed to update circuit breaker {key}: {e}")

def reset_circuit_breaker(key: str):
    """Reset circuit breaker on successful operation"""
    
    cb_key = f"circuit_breaker:{key}"
    
    try:
        cb_data = HyrexKV.get(cb_key)
        if cb_data:
            cb_state = json.loads(cb_data)
            cb_state["failures"] = 0
            cb_state["state"] = "CLOSED"
            HyrexKV.set(cb_key, json.dumps(cb_state), expiry_seconds=3600)
    except:
        pass

@hy.task
def send_critical_error_alert(operation: str, error_message: str, error_details: Dict):
    """Send alert for critical errors"""
    
    alert_message = f"""
    🚨 CRITICAL ERROR ALERT 🚨
    
    Operation: {operation}
    Error: {error_message}
    Error ID: {error_details.get('error_id')}
    Task ID: {error_details.get('task_id')}
    Time: {error_details.get('timestamp')}
    
    Immediate attention required!
    """
    
    # Send to multiple channels
    send_slack_alert(alert_message, channel="#alerts")
    send_email_alert("Critical System Error", alert_message, ["oncall@company.com"])
    
    return {"alert_sent": True, "error_id": error_details.get('error_id')}

@hy.task
def send_circuit_breaker_alert(key: str, cb_state: Dict):
    """Send alert when circuit breaker opens"""
    
    alert_message = f"""
    ⚡ Circuit Breaker OPEN ⚡
    
    Service: {key}
    Failure Count: {cb_state['failures']}
    Threshold: {cb_state['threshold']}
    Opened At: {datetime.fromtimestamp(cb_state['opened_at']).isoformat()}
    
    Service calls are being blocked to prevent cascade failures.
    """
    
    send_slack_alert(alert_message, channel="#infrastructure")
    
    return {"alert_sent": True, "circuit_breaker": key}

@hy.task
def generate_error_report(operation: Optional[str] = None, hours: int = 24):
    """Generate comprehensive error analysis report"""
    
    end_time = datetime.now()
    start_time = end_time - timedelta(hours=hours)
    
    # Collect error data
    if operation:
        operations = [operation]
    else:
        operations = get_all_monitored_operations()
    
    report_data = {
        "period": {
            "start": start_time.isoformat(),
            "end": end_time.isoformat(),
            "hours": hours
        },
        "operations": {},
        "summary": {
            "total_errors": 0,
            "most_common_errors": {},
            "error_trends": {},
            "recovery_stats": {}
        }
    }
    
    for op in operations:
        error_metrics = get_error_metrics(op, start_time, end_time)
        if error_metrics:
            report_data["operations"][op] = error_metrics
            report_data["summary"]["total_errors"] += error_metrics.get("total_errors", 0)
    
    # Store report
    report_id = f"error_report_{int(time.time())}"
    HyrexKV.set(
        f"report:{report_id}",
        json.dumps(report_data),
        expiry_seconds=2592000  # 30 days
    )
    
    # Send report to stakeholders if errors are above threshold
    if report_data["summary"]["total_errors"] > 10:
        send_error_report_email.send(report_id, report_data)
    
    return {
        "report_id": report_id,
        "total_errors": report_data["summary"]["total_errors"],
        "operations_analyzed": len(operations)
    }

# Helper functions (simplified implementations)
def perform_operation(operation: str, data: Dict) -> Any:
    """Perform the actual operation - placeholder"""
    if operation == "test_validation_error":
        raise ValidationError("Invalid input data", {"field": "email"})
    elif operation == "test_api_error":
        raise ExternalAPIError("API service unavailable", 503)
    return {"success": True, "data": data}

def update_task_metrics(operation: str, status: str):
    """Update task execution metrics"""
    pass

def record_task_success(operation: str, duration: float, task_context: Any):
    """Record successful task execution"""
    pass

def get_fallback_data(operation: str) -> Dict:
    """Get fallback data for recovery"""
    return {"fallback": True}

def execute_alternative_workflow(error_details: Dict) -> Dict:
    """Execute alternative workflow"""
    return {"alternative_executed": True}

Usage Examples

Basic Error Handling

# Process task with error handling
curl -X POST http://localhost:8000/tasks/monitored \
  -H "Content-Type: application/json" \
  -d '{
    "operation": "process_user_data",
    "data": {"user_id": 123, "email": "test@example.com"},
    "circuit_breaker_key": "user_service"
  }'

# Make resilient API call
curl -X POST http://localhost:8000/tasks/api-call \
  -H "Content-Type: application/json" \
  -d '{
    "url": "https://api.external.com/data",
    "method": "GET",
    "circuit_breaker_key": "external_api"
  }'

Error Recovery

# Trigger error recovery workflow  
curl -X POST http://localhost:8000/recovery/start \
  -H "Content-Type: application/json" \
  -d '{
    "error_id": "err_1234567890_5678",
    "recovery_strategy": "retry_with_fallback"
  }'

# Generate error report
curl -X POST http://localhost:8000/reports/errors \
  -H "Content-Type: application/json" \
  -d '{
    "operation": "process_payments", 
    "hours": 48
  }'

Circuit Breaker Configuration

# Configure circuit breaker thresholds
CIRCUIT_BREAKER_CONFIG = {
    "payment_service": {
        "threshold": 3,        # Open after 3 failures
        "timeout": 30,         # Stay open for 30 seconds
        "half_open_max": 1     # Allow 1 request in half-open state
    },
    "external_api": {
        "threshold": 5,
        "timeout": 60,
        "half_open_max": 2
    }
}

Monitoring Dashboard

@hy.task
def health_check_dashboard():
    """Generate system health dashboard data"""
    
    services = ["payment_service", "user_service", "external_api"]
    dashboard_data = {
        "timestamp": datetime.now().isoformat(),
        "services": {}
    }
    
    for service in services:
        # Check circuit breaker status
        cb_status = check_circuit_breaker(service)
        
        # Get error metrics
        error_metrics = get_recent_error_metrics(service, hours=1)
        
        dashboard_data["services"][service] = {
            "circuit_breaker_status": cb_status,
            "error_rate": error_metrics.get("error_rate", 0),
            "recent_errors": error_metrics.get("total_errors", 0),
            "availability": calculate_availability(service)
        }
    
    return dashboard_data

Production Considerations

  • Error classification: Properly categorize errors for appropriate handling
  • Circuit breaker tuning: Configure thresholds based on service characteristics
  • Alert fatigue: Implement intelligent alerting to avoid noise
  • Error aggregation: Use tools like Sentry or Rollbar for error tracking
  • Performance impact: Monitor overhead of error handling mechanisms
  • Recovery testing: Regularly test error recovery workflows

Next Steps

I