client.workflow =============== .. py:module:: client.workflow .. autoapi-nested-parse:: The overarching framework to create tasks and dependencies within. Attributes ---------- .. autoapisummary:: client.workflow.logger Classes ------- .. autoapisummary:: client.workflow.DistributorContext client.workflow.Workflow Module Contents --------------- .. py:data:: logger .. py:class:: DistributorContext(cluster_name: str, workflow_run_id: int, timeout: int) .. py:attribute:: _cluster_name .. py:attribute:: _workflow_run_id .. py:attribute:: _timeout .. py:method:: wait_for_startup_signal(timeout: int = 180) -> bool Wait for startup signal with non-blocking reads to handle timing issues. .. py:method:: __enter__() -> DistributorContext Starts the Distributor Process. .. py:method:: __exit__(exc_type: Optional[BaseException], exc_value: Optional[BaseException], exc_traceback: Optional[types.TracebackType]) -> None Stops the Distributor Process. .. py:method:: alive() -> bool .. py:method:: _shutdown() -> str Shutdown the distributor process. .. py:method:: derive_jobmon_command_from_env() -> Optional[str] :staticmethod: If a singularity path is provided, use it when running the worker node. .. py:class:: Workflow(tool_version: jobmon.client.tool_version.ToolVersion, workflow_args: str = '', name: str = '', description: str = '', workflow_attributes: Optional[Union[List, dict]] = None, max_concurrently_running: int = MaxConcurrentlyRunning.MAXCONCURRENTLYRUNNING, requester: Optional[jobmon.core.requester.Requester] = None, chunk_size: int = 500) Bases: :py:obj:`object` (aka Batch, aka Swarm). A Workflow is a framework by which a user may define the relationship between tasks and define the relationship between multiple runs of the same set of tasks. The great benefit of the Workflow is that it's resumable. A Workflow can only be re-loaded if two things are shown to be exact matches to a previous Workflow: 1. WorkflowArgs: It is recommended to pass a meaningful unique identifier to workflow_args, to ease resuming. However, if the Workflow is a one-off project, you may instantiate the Workflow anonymously, without WorkflowArgs. Under the hood, the WorkflowArgs will default to a UUID which, as it is randomly generated, will be harder to remember and thus harder to resume. Workflow args must be hashable. For example, CodCorrect or Como version might be passed as Args to the Workflow. For now, the assumption is WorkflowArgs is a string. 2. The tasks added to the workflow. A Workflow is built up by using Workflow.add_task(). In order to resume a Workflow, all the same tasks must be added with the same dependencies between tasks. .. py:attribute:: _tool_version .. py:attribute:: name :value: '' .. py:attribute:: description :value: '' .. py:attribute:: max_concurrently_running :type: int :value: 2147483647 .. py:attribute:: requester :value: None .. py:attribute:: _dag .. py:attribute:: tasks :type: Dict[int, jobmon.client.task.Task] .. py:attribute:: arrays :type: Dict[str, jobmon.client.array.Array] .. py:attribute:: _chunk_size :type: int :value: 500 .. py:attribute:: workflow_args_hash :value: 0 .. py:attribute:: workflow_attributes :type: Dict[str, Any] .. py:attribute:: _clusters :type: Dict[str, jobmon.core.cluster.Cluster] .. py:attribute:: _task_resources :type: Dict[int, jobmon.client.task_resources.TaskResources] .. py:attribute:: default_cluster_name :type: str :value: '' .. py:attribute:: _default_max_attempts :type: Optional[int] :value: None .. py:attribute:: default_compute_resources_set :type: Dict[str, Dict[str, Any]] .. py:attribute:: default_resource_scales_set :type: Dict[str, Dict[str, float]] .. py:attribute:: _fail_after_n_executions :value: 1000000000 .. py:attribute:: last_workflow_run_id :type: Optional[int] :value: None .. py:property:: tool :type: jobmon.client.tool.Tool Returns the associated tool to this workflow. .. py:property:: is_bound :type: bool If the workflow has been bound to the db. .. py:property:: workflow_id :type: int If the workflow is bound then it will have been given an id. .. py:property:: dag_id :type: int If it has been bound, it will have an associated dag_id. .. py:property:: task_hash :type: int Hash of all of the tasks. .. py:property:: task_errors :type: Dict Return a dict of error associated with a task. .. py:property:: default_max_attempts :type: Optional[int] Return the workflow default max attempts. .. py:method:: add_attributes(workflow_attributes: dict) -> None Users can call either to update values of existing attributes or add new attributes. :param workflow_attributes: attributes to be bound to the db that describe this workflow. .. py:method:: add_task(task: jobmon.client.task.Task) -> jobmon.client.task.Task Add a task to the workflow to be executed. Set semantics - add tasks once only, based on hash name. :param task: single task to add. .. py:method:: _link_array_and_workflow(array: jobmon.client.array.Array) -> None .. py:method:: add_tasks(tasks: Iterable[jobmon.client.task.Task]) -> None Add a list of task to the workflow to be executed. .. py:method:: set_default_compute_resources_from_yaml(cluster_name: str, yaml_file: str) -> None Set default compute resources from a user provided yaml file for workflow level. TODO: Implement this method. :param cluster_name: name of cluster to set default values for. :param yaml_file: the yaml file that is providing the compute resource values. .. py:method:: set_default_compute_resources_from_dict(cluster_name: str, dictionary: Dict[str, Any]) -> None Set default compute resources for a given cluster_name. :param cluster_name: name of cluster to set default values for. :param dictionary: dictionary of default compute resources to run tasks with. Can be overridden at task template, tool or task level. .. py:method:: set_default_resource_scales_from_dict(cluster_name: str, dictionary: Dict[str, float]) -> None Set default resource scales for a given cluster_name. :param cluster_name: name of cluster to set default values for. :param dictionary: dictionary of default resource scales to adjust task resources with. Can be overridden at task template or task level. .. py:method:: set_default_cluster_name(cluster_name: str) -> None Set the default cluster. :param cluster_name: name of cluster to set as default. .. py:method:: set_default_max_attempts(value: int) -> None Set the max attempts. :param value: value of max_attempts. .. py:method:: get_tasks_by_node_args(task_template_name: str, **kwargs: Any) -> List[jobmon.client.task.Task] Query tasks by node args. Used for setting dependencies. .. py:method:: set_max_concurrently_running(task_template_name: str, max_concurrently_running: int) -> None .. py:method:: run(fail_fast: bool = False, seconds_until_timeout: int = 36000, resume: bool = False, reset_running_jobs: bool = True, distributor_startup_timeout: int = 180, resume_timeout: int = 300, configure_logging: bool = False) -> Optional[str] Run the workflow. Traverse the dag and submitting new tasks when their tasks have completed successfully. :param fail_fast: whether to break out of distributor on first failure. :param seconds_until_timeout: amount of time (in seconds) to wait until the whole workflow times out. Submitted jobs will continue :param resume: whether the workflow should be resumed or not, if it is not set to resume and an identical workflow already exists, the workflow will error out :param reset_running_jobs: whether or not to reset running jobs upon resume :param distributor_startup_timeout: amount of time to wait for the distributor process to start up :param resume_timeout: seconds to wait for a workflow to become resumable before giving up :param configure_logging: setup jobmon client logging. If False, no logging will be configured. If True, automatic component logging will be configured. :returns: str of WorkflowRunStatus .. py:method:: _configure_component_logging() -> None Configure component logging for client workflow operations. .. py:method:: set_task_template_max_concurrency_limit(task_template_name: str, limit: int) -> None .. py:method:: validate(strict: bool = True, raise_on_error: bool = False) -> None Confirm that the tasks in this workflow are valid. This method will: - access the database to confirm the requested resources are valid for the specified cluster - confirm that the workflow args are valid - make sure no task contains up/down stream tasks that are not in the workflow .. py:method:: bind() -> None Get a workflow_id. .. py:method:: _bind_tasks(reset_if_running: bool = True, chunk_size: int = 500) -> None .. py:method:: _bind_task_args(chunk_size: int = 500) -> None Bind all task args to the database. Loop through our bound task dict in chunks in order to bind new args and arg types to the database. .. py:method:: _bind_task_attributes(chunk_size: int = 500) -> None .. py:method:: get_errors(limit: int = 1000) -> Optional[Dict[int, Dict[str, Union[int, List[Dict[str, Union[str, int]]]]]]] Method to get all errors. Return a dictionary with the erring task_id as the key, and the Task.get_errors content as the value. When limit is specifically set as None from the client, this return set will pass back all the erred tasks in the workflow. .. py:method:: get_cluster_by_name(cluster_name: str) -> jobmon.core.cluster.Cluster Check if the cluster that the task specified is in the cache. If the cluster is not in the cache, create it and add to cache. .. py:method:: _set_original_task_resources(task: jobmon.client.task.Task) -> None .. py:method:: _matching_wf_args_diff_hash() -> None Check that that an existing workflow does not contain different tasks. Check that an existing workflow with the same workflow_args does not have a different hash, this would indicate that thgat the workflow contains different tasks. .. py:method:: __hash__() -> int Hash to encompass tool version id, workflow args, tasks and dag. .. py:method:: __repr__() -> str A representation string for a Workflow instance.