from __future__ import annotations
from typing import Any, Callable
import structlog
[docs]
logger = structlog.get_logger(__name__)
[docs]
class DistributorCommand:
def __init__(self, func: Callable[..., None], *args: Any, **kwargs: Any) -> None:
"""A command to be run by the distributor service.
Args:
func: a callable which does work and optionally modifies task instance state.
*args: positional args to be passed into func.
**kwargs: kwargs to be passed into func.
"""
[docs]
self.error_raised = False
[docs]
def __call__(self, raise_on_error: bool = False) -> None:
try:
self._func(*self._args, **self._kwargs)
except Exception as e:
if raise_on_error:
raise
else:
logger.exception("Distributor command failed", error=str(e))