Source code for distributor.distributor_command

from __future__ import annotations

import logging
from typing import Any, Callable

[docs] logger = logging.getLogger(__name__)
[docs] class DistributorCommand: def __init__(self, func: Callable[..., None], *args: Any, **kwargs: Any) -> None: """A command to be run by the distributor service. Args: func: a callable which does work and optionally modifies task instance state. *args: positional args to be passed into func. **kwargs: kwargs to be passed into func. """
[docs] self._func = func
[docs] self._args = args
[docs] self._kwargs = kwargs
[docs] self.error_raised = False
[docs] def __call__(self, raise_on_error: bool = False) -> None: try: self._func(*self._args, **self._kwargs) except Exception as e: if raise_on_error: raise else: logger.warning(e)