Tasks are the fundamental unit of work in Hyrex. They are Python functions that can be executed asynchronously by workers, with automatic retry handling, monitoring, and durability.
Basic Task Definition
Define a task by decorating a function with @hy.task
:
from hyrex import HyrexRegistry
from pydantic import BaseModel
hy = HyrexRegistry()
class ProcessContext(BaseModel):
file_path: str
options: dict
@hy.task
def process_file(context: ProcessContext):
# Task logic
print(f"Processing {context.file_path}")
return {"status": "completed", "file": context.file_path}
All task parameters must have type hints. Hyrex enforces this requirement to ensure type safety.
Task Configuration
Configure task behavior with decorator options:
@hy.task(
queue="processing", # Target queue
max_retries=3, # Retry attempts
timeout_seconds=300, # Timeout (5 minutes)
priority=5, # 1-10 (higher = more important)
retry_backoff=lambda n: n*10 # Backoff strategy
)
def configured_task(data: dict):
# Process with specific configuration
return {"processed": True}
Configuration Options
- queue: Route tasks to specific worker pools
- max_retries: Number of retry attempts on failure (default: 0)
- timeout_seconds: Maximum execution time before timeout
- priority: Task priority from 1-10 (default: 5)
- retry_backoff: Function to calculate retry delay
Sending Tasks
Queue tasks for asynchronous execution:
# Send with default configuration
task = process_file.send(ProcessContext(
file_path="/data/input.csv",
options={"format": "csv"}
))
# Override configuration at runtime
urgent_task = process_file.with_config(
queue="high-priority",
max_retries=5
).send(context)
# Get task ID
print(f"Task ID: {task.id}")
Task Context
Access metadata about the current task execution:
from hyrex import get_hyrex_context
@hy.task
def task_with_context(data: dict):
# Get current task context
context = get_hyrex_context()
if context:
task_id = context.task_id
attempt_number = context.attempt_number
queue_name = context.queue
print(f"Processing task {task_id} (attempt {attempt_number})")
# Access parent task info if this is a sub-task
if context.parent_id:
print(f"Spawned by task: {context.parent_id}")
# Your task logic here
return {"processed_by": str(task_id) if context else "unknown"}
Error Handling
Tasks can raise exceptions to trigger retries:
@hy.task(max_retries=3)
def task_with_retries(data: dict):
try:
# Attempt operation
result = external_api_call(data)
return result
except TemporaryError:
# This will trigger a retry
raise
except PermanentError as e:
# This will not retry
return {"error": str(e), "status": "failed"}
Error Callbacks
Use on_error
to handle errors without affecting retry behavior:
def error_handler(e: Exception):
# Log error, send alert, etc.
print(f"Task error: {type(e).__name__}: {str(e)}")
@hy.task(
max_retries=3,
on_error=error_handler
)
def monitored_task(data: dict):
# Task logic that might fail
result = risky_operation(data)
return result
Retry Behavior
- Raising any exception triggers a retry (if retries remain)
- Return a value to complete the task (even with errors)
- Use
retry_backoff
for exponential or custom backoff
on_error
callbacks run on each failure but don’t affect retries
Task Results
Track task execution:
# Send a task
task = process_file.send(context)
# Task ID is immediately available
print(f"Task ID: {task.id}")
# Get task name
print(f"Task name: {task.task_name}")
# Refresh task status from database
task.refresh()
# Access task runs (attempts)
for run in task.task_runs:
print(f"Attempt {run.attempt_number}: {run.status}")
if run.status == "completed":
print(f"Result: {run.result}")
Best Practices
-
Use Type-Safe Contexts
class OrderContext(BaseModel):
order_id: int
customer_email: str
items: List[dict]
-
Set Appropriate Timeouts
@hy.task(timeout_seconds=30) # Fast operations
@hy.task(timeout_seconds=3600) # Long-running jobs
-
Design for Idempotency
@hy.task
def process_payment(context: PaymentContext):
# Check if already processed
if payment_exists(context.payment_id):
return get_payment_result(context.payment_id)
# Process payment
result = charge_card(context)
save_payment_result(context.payment_id, result)
return result
-
Handle Errors Gracefully
@hy.task(max_retries=3)
def fetch_data(context: FetchContext):
try:
return fetch_from_api(context.url)
except RateLimitError:
# Retry with backoff
raise
except InvalidDataError as e:
# Don't retry, return error
return {"error": str(e), "status": "invalid_data"}
Next Steps