"""Custom Exceptions used throughout Jobmon."""
[docs]
class ReturnCodes(object):
"""Bash return codes used in distributor wrapper."""
[docs]
WORKER_NODE_ENV_FAILURE = 198
[docs]
WORKER_NODE_CLI_FAILURE = 199
[docs]
class InvalidResponse(Exception):
"""Invalid Response type Exception."""
pass
[docs]
class InvalidRequest(Exception):
"""Invalid Request type Exception."""
pass
[docs]
class RemoteExitInfoNotAvailable(Exception):
"""Exception raised when Exit Info is not available for different executor types."""
pass
[docs]
class CallableReturnedInvalidObject(Exception):
"""Invalid Object got returned."""
pass
[docs]
class WorkflowAlreadyExists(Exception):
"""Workflow with the same workflow args already exists."""
pass
[docs]
class WorkflowAlreadyComplete(Exception):
"""This Workflow is already done."""
pass
[docs]
class WorkflowNotResumable(Exception):
"""This Workflow is not set to be resumed."""
pass
[docs]
class EmptyWorkflowError(Exception):
"""This Workflow is empty."""
pass
[docs]
class DistributorStartupTimeout(Exception):
"""Distributor was not able to start in time."""
pass
[docs]
class DistributorNotAlive(Exception):
"""The Distributor is not running."""
pass
[docs]
class DistributorUnexpected(Exception):
"""Unexpected situation in Distributor."""
pass
[docs]
class WorkflowRunStateError(Exception):
"""Error with the Workflow Run status."""
pass
[docs]
class ResumeSet(Exception):
"""Resume Exception."""
pass
[docs]
class NodeDependencyNotExistError(Exception):
"""Dependency does not exist."""
pass
[docs]
class DuplicateNodeArgsError(Exception):
"""Multiple nodes with the same args for the same TaskTemplate not allowed."""
pass
[docs]
class InvalidMemoryUnit(Exception):
"""Memory convert unit invalid."""
pass
[docs]
class ConfigError(Exception):
"""No configuration found for server."""
pass
[docs]
class InvalidStateTransition(Exception):
"""Invalid State Transition implementation."""
def __init__(self, model: str, id: int, old_state: str, new_state: str) -> None:
"""Initialize Exception."""
[docs]
msg = f"Cannot transition {model} id: {id} from {old_state} to {new_state}"
super(InvalidStateTransition, self).__init__(self, msg)
[docs]
class TransitionError(Exception):
"""Transition failed."""
pass
[docs]
class WorkflowTestError(Exception):
"""Workflow Run encountered and error."""
pass
[docs]
class DistributorInterruptedError(Exception):
"""raised when signal is sent to distributor."""
pass
[docs]
class CyclicGraphError(Exception):
"""Cyclic graph detected."""
pass