client.workflow =============== .. py:module:: client.workflow .. autoapi-nested-parse:: The overarching framework to create tasks and dependencies within. Attributes ---------- .. autoapisummary:: client.workflow.logger Classes ------- .. autoapisummary:: client.workflow.DistributorContext client.workflow.Workflow Module Contents --------------- .. py:data:: logger .. py:class:: DistributorContext(cluster_name: str, workflow_run_id: int, timeout: int) Initialization of the DistributorContext. .. py:method:: wait_for_startup_signal(timeout: int = 180) -> bool Wait for startup signal with non-blocking reads to handle timing issues. .. py:method:: alive() -> bool .. py:method:: derive_jobmon_command_from_env() -> Optional[str] :staticmethod: If a singularity path is provided, use it when running the worker node. .. py:class:: Workflow(tool_version: jobmon.client.tool_version.ToolVersion, workflow_args: str = '', name: str = '', description: str = '', workflow_attributes: Optional[Union[List, dict]] = None, max_concurrently_running: int = MaxConcurrentlyRunning.MAXCONCURRENTLYRUNNING, requester: Optional[jobmon.core.requester.Requester] = None, chunk_size: int = 500) Bases: :py:obj:`object` (aka Batch, aka Swarm). A Workflow is a framework by which a user may define the relationship between tasks and define the relationship between multiple runs of the same set of tasks. The great benefit of the Workflow is that it's resumable. A Workflow can only be re-loaded if two things are shown to be exact matches to a previous Workflow: 1. WorkflowArgs: It is recommended to pass a meaningful unique identifier to workflow_args, to ease resuming. However, if the Workflow is a one-off project, you may instantiate the Workflow anonymously, without WorkflowArgs. Under the hood, the WorkflowArgs will default to a UUID which, as it is randomly generated, will be harder to remember and thus harder to resume. Workflow args must be hashable. For example, CodCorrect or Como version might be passed as Args to the Workflow. For now, the assumption is WorkflowArgs is a string. 2. The tasks added to the workflow. A Workflow is built up by using Workflow.add_task(). In order to resume a Workflow, all the same tasks must be added with the same dependencies between tasks. Initialization of the client workflow. :param tool_version: ToolVersion this workflow is associated :param workflow_args: Unique identifier of a workflow :param name: Name of the workflow :param description: Description of the workflow :param workflow_attributes: Attributes that make this workflow different from other workflows that the user wants to record. :param max_concurrently_running: How many running jobs to allow in parallel :param requester: object to communicate with the FastApi services. :param chunk_size: how many tasks to bind in a single request :param default_max_attempts: the default max attempts of the workflow for each array .. py:attribute:: name :value: '' .. py:attribute:: description :value: '' .. py:attribute:: max_concurrently_running :type: int :value: 2147483647 .. py:attribute:: requester :value: None .. py:attribute:: tasks :type: Dict[int, jobmon.client.task.Task] .. py:attribute:: arrays :type: Dict[str, jobmon.client.array.Array] .. py:attribute:: workflow_args_hash :value: 0 .. py:attribute:: workflow_attributes :type: Dict[str, Any] .. py:attribute:: default_cluster_name :type: str :value: '' .. py:attribute:: default_compute_resources_set :type: Dict[str, Dict[str, Any]] .. py:attribute:: default_resource_scales_set :type: Dict[str, Dict[str, float]] .. py:attribute:: last_workflow_run_id :type: Optional[int] :value: None .. py:attribute:: error :type: Optional[BaseException] :value: None .. py:property:: tool :type: jobmon.client.tool.Tool Returns the associated tool to this workflow. .. py:property:: is_bound :type: bool If the workflow has been bound to the db. .. py:property:: workflow_id :type: int If the workflow is bound then it will have been given an id. .. py:property:: dag_id :type: int If it has been bound, it will have an associated dag_id. .. py:property:: task_hash :type: int Hash of all of the tasks. .. py:property:: task_errors :type: Dict Return a dict of error associated with a task. .. py:property:: default_max_attempts :type: Optional[int] Return the workflow default max attempts. .. py:method:: add_attributes(workflow_attributes: dict) -> None Users can call either to update values of existing attributes or add new attributes. :param workflow_attributes: attributes to be bound to the db that describe this workflow. .. py:method:: add_task(task: jobmon.client.task.Task) -> jobmon.client.task.Task Add a task to the workflow to be executed. Set semantics - add tasks once only, based on hash name. :param task: single task to add. .. py:method:: add_tasks(tasks: Iterable[jobmon.client.task.Task]) -> None Add a list of task to the workflow to be executed. .. py:method:: set_default_compute_resources_from_yaml(cluster_name: str, yaml_file: str) -> None Set default compute resources from a user provided yaml file for workflow level. TODO: Implement this method. :param cluster_name: name of cluster to set default values for. :param yaml_file: the yaml file that is providing the compute resource values. .. py:method:: set_default_compute_resources_from_dict(cluster_name: str, dictionary: Dict[str, Any]) -> None Set default compute resources for a given cluster_name. :param cluster_name: name of cluster to set default values for. :param dictionary: dictionary of default compute resources to run tasks with. Can be overridden at task template, tool or task level. .. py:method:: set_default_resource_scales_from_dict(cluster_name: str, dictionary: Dict[str, float]) -> None Set default resource scales for a given cluster_name. :param cluster_name: name of cluster to set default values for. :param dictionary: dictionary of default resource scales to adjust task resources with. Can be overridden at task template or task level. .. py:method:: set_default_cluster_name(cluster_name: str) -> None Set the default cluster. :param cluster_name: name of cluster to set as default. .. py:method:: set_default_max_attempts(value: int) -> None Set the max attempts. :param value: value of max_attempts. .. py:method:: get_tasks_by_node_args(task_template_name: str, **kwargs: Any) -> List[jobmon.client.task.Task] Query tasks by node args. Used for setting dependencies. .. py:method:: set_max_concurrently_running(task_template_name: str, max_concurrently_running: int) -> None .. py:method:: run(fail_fast: bool = False, seconds_until_timeout: int = 36000, resume: bool = False, reset_running_jobs: bool = True, distributor_startup_timeout: int = 180, resume_timeout: int = 300, configure_logging: bool = False) -> Optional[str] Run the workflow. Traverse the dag and submitting new tasks when their tasks have completed successfully. :param fail_fast: whether to break out of distributor on first failure. :param seconds_until_timeout: amount of time (in seconds) to wait until the whole workflow times out. Submitted jobs will continue :param resume: whether the workflow should be resumed or not, if it is not set to resume and an identical workflow already exists, the workflow will error out :param reset_running_jobs: whether or not to reset running jobs upon resume :param distributor_startup_timeout: amount of time to wait for the distributor process to start up :param resume_timeout: seconds to wait for a workflow to become resumable before giving up :param configure_logging: setup jobmon client logging. If False, no logging will be configured. If True, automatic component logging will be configured. :returns: str of WorkflowRunStatus .. py:method:: set_task_template_max_concurrency_limit(task_template_name: str, limit: int) -> None .. py:method:: validate(strict: bool = True, raise_on_error: bool = False) -> None Confirm that the tasks in this workflow are valid. This method will: - access the database to confirm the requested resources are valid for the specified cluster - confirm that the workflow args are valid - make sure no task contains up/down stream tasks that are not in the workflow .. py:method:: bind() -> None Get a workflow_id. .. py:method:: get_errors(limit: int = 1000) -> Optional[Dict[int, Dict[str, Union[int, List[Dict[str, Union[str, int]]]]]]] Method to get all errors. Return a dictionary with the erring task_id as the key, and the Task.get_errors content as the value. When limit is specifically set as None from the client, this return set will pass back all the erred tasks in the workflow. .. py:method:: get_cluster_by_name(cluster_name: str) -> jobmon.core.cluster.Cluster Check if the cluster that the task specified is in the cache. If the cluster is not in the cache, create it and add to cache.