client.task

Task object defines a single executable object that will be added to a Workflow.

TaskInstances will be created from it for every execution.

Attributes

logger

Classes

Task

Task object defines a single executable object that will be added to a Workflow.

Functions

validate_task_resource_scales(→ None)

Validate resource scales are expected types.

Module Contents

client.task.logger[source]
client.task.validate_task_resource_scales(resource_scales: Dict[str, Any]) None[source]

Validate resource scales are expected types.

class client.task.Task(node: jobmon.client.node.Node, task_args: Dict[str, Any], op_args: Dict[str, Any], array: jobmon.client.array.Array | None = None, cluster_name: str = '', compute_resources: Dict[str, Any] | None = None, compute_resources_callable: Callable | None = None, resource_scales: Dict[str, float] | None = None, fallback_queues: List[str] | None = None, name: str | None = None, max_attempts: int | None = None, upstream_tasks: List[Task] | None = None, task_attributes: List | dict | None = None, requester: jobmon.core.requester.Requester | None = None)[source]

Task object defines a single executable object that will be added to a Workflow.

Task Instances will be created from it for every execution.

static is_valid_job_name(name: str) bool[source]

If the name is invalid it will raises an exception.

Primarily based on the restrictions SGE places on job names. The list of illegal characters might not be complete, I could not find an official list.

Must:
  • Not be null or the empty string

  • being with a digit

  • contain am illegal character

Parameters:

name

Returns:

True (or raises)

Raises:

ValueError – if the name is not valid.

requester[source]
node[source]
task_args[source]
mapped_task_args[source]
task_args_hash[source]
op_args[source]
command[source]
name[source]
upstream_tasks: Set[Task][source]
downstream_tasks: Set[Task][source]
task_attributes: dict[source]
_instance_max_attempts[source]
_instance_cluster_name[source]
_instance_compute_resources[source]
_instance_compute_resources_callable[source]
_instance_resource_scales[source]
fallback_queues: List[str][source]
_errors: None | Dict[str, int | List[Dict[str, str | int]]] = None[source]
property compute_resources: Dict[str, Any][source]
property requested_resources: Dict[str, Any][source]

A dictionary that includes the users requested resources for the current run.

E.g. {cores: 1, mem: 1, runtime: 60, queue: all.q}.

property resource_scales: Dict[str, float][source]

A dictionary that includes the users requested resource scales for the current run.

E.g. {memory: 0.1, runtime: 0.7}.

property cluster_name: str[source]

The name of the cluster the user wants to run their task on.

property max_attempts: int[source]

Get the max_attempts.

property compute_resources_callable: Callable | None[source]

A callable that returns a compute resources dict.

property queue_name: str[source]
property original_task_resources: jobmon.client.task_resources.TaskResources[source]

Get the id of the task if it has been bound to the db otherwise raise an error.

property is_bound: bool[source]

If the task template version has been bound to the database.

property task_id: int[source]

Get the id of the task if it has been bound to the db otherwise raise an error.

property initial_status: str[source]

Get initial status of the task if it has been bound to the db; else raise error.

property final_status: str[source]

Get initial status of the task if it has been bound, otherwise raise error.

property array: jobmon.client.array.Array[source]

Get the array the task has been added to or else raise an AttributeError.

property workflow: jobmon.client.workflow.Workflow[source]

Get the workflow the task has been added to or else raise an AttributeError.

add_upstream(ancestor: Task) None[source]

Add an upstream (ancestor) Task.

This has Set semantics, an upstream task will only be added once. Symmetrically, this method also adds this Task as a downstream on the ancestor.

add_upstreams(tasks: List[Task]) None[source]

Add all Tasks in user provided list as upstreams.

add_downstream(descendent: Task) None[source]

Add a downstream (ancestor) Task.

This has Set semantics, a downstream task will only be added once. Symmetrically, this method also adds this Task as an upstream on the ancestor.

add_downstreams(tasks: List[Task]) None[source]

Add all Tasks in user provided list as downstreams.

add_attribute(attribute: str, value: str) None[source]

Function that users can call to add a single attribute for a task.

get_errors() None | Dict[str, int | List[Dict[str, str | int]]][source]

Return all errors for each task, with the recent task_instance_id actually used.

set_compute_resources_from_yaml(cluster_name: str, yaml_file: str) None[source]

Set default compute resources from a user provided yaml file for task level.

TODO: Implement this method.

Parameters:
  • cluster_name – name of cluster to set default values for.

  • yaml_file – the yaml file that is providing the compute resource values.

update_compute_resources(**kwargs: Any) None[source]

Function that allows users to update their compute resources.

update_resource_scales(**kwargs: Any) None[source]

Function that allows users to update their resource scales.

_hash_task_args() int[source]

A hash of the encoded result of the args and values concatenated together.

__eq__(other: object) bool[source]

Check if the hashes of two tasks are equivalent.

__lt__(other: Task) bool[source]

Check if one hash is less than the has of another Task.

__hash__() int[source]

Create the hash for a task to determine if it is unique within a dag.

__repr__() str[source]

A representation string for a Task instance.

resource_usage() dict[source]

Get the resource usage for the successful TaskInstance of a Task.