client.task =========== .. py:module:: client.task .. autoapi-nested-parse:: Task object defines a single executable object that will be added to a Workflow. TaskInstances will be created from it for every execution. Attributes ---------- .. autoapisummary:: client.task.logger Classes ------- .. autoapisummary:: client.task.Task Functions --------- .. autoapisummary:: client.task.validate_task_resource_scales Module Contents --------------- .. py:data:: logger .. py:function:: validate_task_resource_scales(resource_scales: Dict[str, Any]) -> None Validate resource scales are expected types. .. py:class:: Task(node: jobmon.client.node.Node, task_args: Dict[str, Any], op_args: Dict[str, Any], array: Optional[jobmon.client.array.Array] = None, cluster_name: str = '', compute_resources: Optional[Dict[str, Any]] = None, compute_resources_callable: Optional[Callable] = None, resource_scales: Optional[Dict[str, float]] = None, fallback_queues: Optional[List[str]] = None, name: Optional[str] = None, max_attempts: Optional[int] = None, upstream_tasks: Optional[List[Task]] = None, task_attributes: Union[List, dict, None] = None, requester: Optional[jobmon.core.requester.Requester] = None) Task object defines a single executable object that will be added to a Workflow. Task Instances will be created from it for every execution. Create a single executable object in the workflow, aka a Task. Relate it to a Task Template in order to classify it as a type of job within the context of your workflow. :param node: Node this task is associated with. :param task_args: Task arguments that make the command unique across workflows usually pertaining to data flowing through the task. :param op_args: Task arguments that can change across runs of the same workflow. usually pertaining to trivial things like log level or code location. :param array: the array that the task is associated with. :param cluster_name: the name of the cluster the user wants to run their task on. :param compute_resources: A dictionary that includes the users requested resources for the current run. E.g. {cores: 1, mem: 1, runtime: 60, queue: all.q}. :param compute_resources_callable: callable compute resources. :param resource_scales: how much users want to scale their resource request if the the initial request fails. :param fallback_queues: a list of queues that a user wants to try if their original queue is unable to accommodate their requested resources. :param name: name that will be visible in the job status information (e.g. squeue or qstat) for this job. :param max_attempts: number of attempts to allow the cluster to try before giving up. Default is 3. :param upstream_tasks: Task objects that must be run prior to this :param task_attributes: dictionary of attributes and their values or list of attributes that will be assigned later. :param requester: requester object to communicate with the FastApi services. :raises ValueError: If the hashed command is not allowed as an SGE job name; see is_valid_job_name .. py:method:: is_valid_job_name(name: str) -> bool :staticmethod: If the name is invalid it will raises an exception. Primarily based on the restrictions SGE places on job names. The list of illegal characters might not be complete, I could not find an official list. Must: - Not be null or the empty string - being with a digit - contain am illegal character :param name: :returns: True (or raises) :raises ValueError: if the name is not valid. .. py:attribute:: requester :value: None .. py:attribute:: node .. py:attribute:: task_args .. py:attribute:: mapped_task_args .. py:attribute:: task_args_hash :value: 0 .. py:attribute:: op_args .. py:attribute:: command .. py:attribute:: name :value: None .. py:attribute:: upstream_tasks :type: Set[Task] .. py:attribute:: downstream_tasks :type: Set[Task] .. py:attribute:: task_attributes :type: dict .. py:attribute:: fallback_queues :type: List[str] .. py:property:: compute_resources :type: Dict[str, Any] .. py:property:: requested_resources :type: Dict[str, Any] A dictionary that includes the users requested resources for the current run. E.g. {cores: 1, mem: 1, runtime: 60, queue: all.q}. .. py:property:: resource_scales :type: Dict[str, float] A dictionary that includes the users requested resource scales for the current run. E.g. {memory: 0.1, runtime: 0.7}. .. py:property:: cluster_name :type: str The name of the cluster the user wants to run their task on. .. py:property:: max_attempts :type: int Get the max_attempts. .. py:property:: compute_resources_callable :type: Optional[Callable] A callable that returns a compute resources dict. .. py:property:: queue_name :type: str .. py:property:: original_task_resources :type: jobmon.client.task_resources.TaskResources Get the id of the task if it has been bound to the db otherwise raise an error. .. py:property:: is_bound :type: bool If the task template version has been bound to the database. .. py:property:: task_id :type: int Get the id of the task if it has been bound to the db otherwise raise an error. .. py:property:: initial_status :type: str Get initial status of the task if it has been bound to the db; else raise error. .. py:property:: final_status :type: str Get initial status of the task if it has been bound, otherwise raise error. .. py:property:: array :type: jobmon.client.array.Array Get the array the task has been added to or else raise an AttributeError. .. py:property:: workflow :type: jobmon.client.workflow.Workflow Get the workflow the task has been added to or else raise an AttributeError. .. py:method:: add_upstream(ancestor: Task) -> None Add an upstream (ancestor) Task. This has Set semantics, an upstream task will only be added once. Symmetrically, this method also adds this Task as a downstream on the ancestor. .. py:method:: add_upstreams(tasks: List[Task]) -> None Add all Tasks in user provided list as upstreams. .. py:method:: add_downstream(descendent: Task) -> None Add a downstream (ancestor) Task. This has Set semantics, a downstream task will only be added once. Symmetrically, this method also adds this Task as an upstream on the ancestor. .. py:method:: add_downstreams(tasks: List[Task]) -> None Add all Tasks in user provided list as downstreams. .. py:method:: add_attribute(attribute: str, value: str) -> None Function that users can call to add a single attribute for a task. .. py:method:: get_errors() -> Union[None, Dict[str, Union[int, List[Dict[str, Union[str, int]]]]]] Return all errors for each task, with the recent task_instance_id actually used. .. py:method:: set_compute_resources_from_yaml(cluster_name: str, yaml_file: str) -> None Set default compute resources from a user provided yaml file for task level. TODO: Implement this method. :param cluster_name: name of cluster to set default values for. :param yaml_file: the yaml file that is providing the compute resource values. .. py:method:: update_compute_resources(**kwargs: Any) -> None Function that allows users to update their compute resources. .. py:method:: update_resource_scales(**kwargs: Any) -> None Function that allows users to update their resource scales. .. py:method:: resource_usage() -> dict Get the resource usage for the successful TaskInstance of a Task.