client.task

Task object defines a single executable object that will be added to a Workflow.

TaskInstances will be created from it for every execution.

Attributes

logger

Classes

Task

Task object defines a single executable object that will be added to a Workflow.

Functions

validate_task_resource_scales(→ None)

Validate resource scales are expected types.

Module Contents

client.task.logger
client.task.validate_task_resource_scales(resource_scales: Dict[str, Any]) None

Validate resource scales are expected types.

class client.task.Task(node: jobmon.client.node.Node, task_args: Dict[str, Any], op_args: Dict[str, Any], array: jobmon.client.array.Array | None = None, cluster_name: str = '', compute_resources: Dict[str, Any] | None = None, compute_resources_callable: Callable | None = None, resource_scales: Dict[str, float] | None = None, fallback_queues: List[str] | None = None, name: str | None = None, max_attempts: int | None = None, upstream_tasks: List[Task] | None = None, task_attributes: List | dict | None = None, requester: jobmon.core.requester.Requester | None = None)

Task object defines a single executable object that will be added to a Workflow.

Task Instances will be created from it for every execution.

Create a single executable object in the workflow, aka a Task.

Relate it to a Task Template in order to classify it as a type of job within the context of your workflow.

Parameters:
  • node – Node this task is associated with.

  • task_args – Task arguments that make the command unique across workflows usually pertaining to data flowing through the task.

  • op_args – Task arguments that can change across runs of the same workflow. usually pertaining to trivial things like log level or code location.

  • array – the array that the task is associated with.

  • cluster_name – the name of the cluster the user wants to run their task on.

  • compute_resources – A dictionary that includes the users requested resources for the current run. E.g. {cores: 1, mem: 1, runtime: 60, queue: all.q}.

  • compute_resources_callable – callable compute resources.

  • resource_scales – how much users want to scale their resource request if the the initial request fails.

  • fallback_queues – a list of queues that a user wants to try if their original queue is unable to accommodate their requested resources.

  • name – name that will be visible in the job status information (e.g. squeue or qstat) for this job.

  • max_attempts – number of attempts to allow the cluster to try before giving up. Default is 3.

  • upstream_tasks – Task objects that must be run prior to this

  • task_attributes – dictionary of attributes and their values or list of attributes that will be assigned later.

  • requester – requester object to communicate with the FastApi services.

Raises:

ValueError – If the hashed command is not allowed as an SGE job name; see is_valid_job_name

static is_valid_job_name(name: str) bool

If the name is invalid it will raises an exception.

Primarily based on the restrictions SGE places on job names. The list of illegal characters might not be complete, I could not find an official list.

Must:
  • Not be null or the empty string

  • being with a digit

  • contain am illegal character

Parameters:

name

Returns:

True (or raises)

Raises:

ValueError – if the name is not valid.

requester = None
node
task_args
mapped_task_args
task_args_hash = 0
op_args
command
name = None
upstream_tasks: Set[Task]
downstream_tasks: Set[Task]
task_attributes: dict
fallback_queues: List[str]
property compute_resources: Dict[str, Any]
property requested_resources: Dict[str, Any]

A dictionary that includes the users requested resources for the current run.

E.g. {cores: 1, mem: 1, runtime: 60, queue: all.q}.

property resource_scales: Dict[str, float]

A dictionary that includes the users requested resource scales for the current run.

E.g. {memory: 0.1, runtime: 0.7}.

property cluster_name: str

The name of the cluster the user wants to run their task on.

property max_attempts: int

Get the max_attempts.

property compute_resources_callable: Callable | None

A callable that returns a compute resources dict.

property queue_name: str
property original_task_resources: jobmon.client.task_resources.TaskResources

Get the id of the task if it has been bound to the db otherwise raise an error.

property is_bound: bool

If the task template version has been bound to the database.

property task_id: int

Get the id of the task if it has been bound to the db otherwise raise an error.

property initial_status: str

Get initial status of the task if it has been bound to the db; else raise error.

property final_status: str

Get initial status of the task if it has been bound, otherwise raise error.

property array: jobmon.client.array.Array

Get the array the task has been added to or else raise an AttributeError.

property workflow: jobmon.client.workflow.Workflow

Get the workflow the task has been added to or else raise an AttributeError.

add_upstream(ancestor: Task) None

Add an upstream (ancestor) Task.

This has Set semantics, an upstream task will only be added once. Symmetrically, this method also adds this Task as a downstream on the ancestor.

add_upstreams(tasks: List[Task]) None

Add all Tasks in user provided list as upstreams.

add_downstream(descendent: Task) None

Add a downstream (ancestor) Task.

This has Set semantics, a downstream task will only be added once. Symmetrically, this method also adds this Task as an upstream on the ancestor.

add_downstreams(tasks: List[Task]) None

Add all Tasks in user provided list as downstreams.

add_attribute(attribute: str, value: str) None

Function that users can call to add a single attribute for a task.

get_errors() None | Dict[str, int | List[Dict[str, str | int]]]

Return all errors for each task, with the recent task_instance_id actually used.

set_compute_resources_from_yaml(cluster_name: str, yaml_file: str) None

Set default compute resources from a user provided yaml file for task level.

TODO: Implement this method.

Parameters:
  • cluster_name – name of cluster to set default values for.

  • yaml_file – the yaml file that is providing the compute resource values.

update_compute_resources(**kwargs: Any) None

Function that allows users to update their compute resources.

update_resource_scales(**kwargs: Any) None

Function that allows users to update their resource scales.

resource_usage() dict

Get the resource usage for the successful TaskInstance of a Task.