client.task_template

A framework that many tasks have in common while varying by a declared set of arguments.

Attributes

logger

Classes

TaskTemplate

Task Template outlines the structure of a Task to give it more context within the DAG.

Module Contents

client.task_template.logger
class client.task_template.TaskTemplate(template_name: str, requester: jobmon.core.requester.Requester | None = None)

Task Template outlines the structure of a Task to give it more context within the DAG.

A Task Template defines a framework that many tasks have in common while varying by a declared set of arguments.

Groups tasks of a type.

Declares the concrete arguments that instances may vary over either from workflow to workflow or between nodes in the stage of a dag.

Parameters:
  • template_name – the name of this task template.

  • requester – object to communicate with the flask services.

template_name
requester = None
classmethod get_task_template(tool_version: jobmon.client.tool_version.ToolVersion, template_name: str) TaskTemplate

Get a bound instance of TaskTemplate.

Parameters:
  • tool_version – ToolVersion, to associate this task template with

  • template_name – name of this specific task template

classmethod from_wire(wire_tuple: Tuple, tool_version: jobmon.client.tool_version.ToolVersion) TaskTemplate

Get a bound instance of TaskTemplate from the http wire format.

Parameters:
  • wire_tuple – Wire format for ToolVersion defined in jobmon.serializers.

  • tool_version – ToolVersion, to associate this task template with

  • requester – communicate with the flask services.

bind(tool_version: jobmon.client.tool_version.ToolVersion) None

Bind task template to the db.

Parameters:

tool_version – the ToolVersion this task template is associated with.

property is_bound: bool

If the task template version has been bound to the database.

property id: int

Unique id from db if task_template has been bound.

property tool_version: jobmon.client.tool_version.ToolVersion

The ToolVersion this task_template has been bound to.

property task_template_versions: List[jobmon.client.task_template_version.TaskTemplateVersion]

Version of task template if it has been bound.

property active_task_template_version: jobmon.client.task_template_version.TaskTemplateVersion

The TaskTemplateVersion to use when spawning tasks.

property default_cluster_name: str

Default cluster_name associated with active tool version.

property default_max_attempts: int | None

Default max attempts of the active tool version.

set_default_max_attempts(value: int) None

Set default max_attempts.

Parameters:

value – value of max_attempts.

property default_compute_resources_set: Dict[str, Dict[str, Any]]

Default compute resources associated with active tool version.

property default_resource_scales_set: Dict[str, Dict[str, float]]

Default resource scales associated with active tool version.

update_default_compute_resources(cluster_name: str, **kwargs: Any) None

Update default compute resources in place only overridding specified keys.

If no default cluster is specified when this method is called, cluster_name will become the default cluster.

Parameters:
  • cluster_name – name of cluster to modify default values for.

  • **kwargs – any key/value pair you want to update specified as an argument.

update_default_resource_scales(cluster_name: str, **kwargs: Any) None

Update default resource scales in place only overridding specified keys.

If no default cluster is specified when this method is called, cluster_name will become the default cluster.

Parameters:
  • cluster_name – name of cluster to modify default values for.

  • **kwargs – any key/value pair you want to update specified as an argument.

set_default_compute_resources_from_yaml(yaml_file: str, default_cluster_name: str = '') None

Set default ComputeResources from a user provided yaml file for task template level.

Parameters:
  • default_cluster_name – name of cluster to set default values for.

  • yaml_file – the yaml file that is providing the compute resource values.

set_default_resource_scales_from_yaml(yaml_file: str, default_cluster_name: str = '') None

Set default Resource Scales from a user provided yaml file for task template level.

Parameters:
  • default_cluster_name – name of cluster to set default values for.

  • yaml_file – the yaml file that is providing the compute resource values.

set_default_compute_resources_from_dict(cluster_name: str, compute_resources: Dict[str, Any]) None

Set default compute resources for a given cluster_name.

If no default cluster is specified when this method is called, cluster_name will become the default cluster.

Parameters:
  • cluster_name – name of cluster to set default values for.

  • compute_resources – dictionary of default compute resources to run tasks with. Can be overridden at task level. dict of {resource_name: resource_value}

set_default_resource_scales_from_dict(cluster_name: str, resource_scales: Dict[str, float]) None

Set default resource scales for a given cluster_name.

If no default cluster is specified when this method is called, cluster_name will become the default cluster.

Parameters:
  • cluster_name – name of cluster to set default values for.

  • resource_scales – dictionary of default resource scales to adjust task resources with. Can be overridden at task level.

set_active_task_template_version_id(task_template_version_id: str | int = 'latest') None

The TaskTemplateVersion that is set as the active one (latest is default).

Parameters:

task_template_version_id – which version to set as active on this object.

set_active_task_template_version(task_template_version: jobmon.client.task_template_version.TaskTemplateVersion) None

The TaskTemplateVersion that is set as the active one.

Parameters:

task_template_version – which version to set as active on this object.

load_task_template_versions() None

Load task template versions associated with this task template from the database.

get_task_template_version(command_template: str, node_args: List[str] | None = None, task_args: List[str] | None = None, op_args: List[str] | None = None, default_cluster_name: str = '', default_compute_resources: Dict[str, Any] | None = None, default_resource_scales: Dict[str, float] | None = None, default_max_attempts: int | None = None) jobmon.client.task_template_version.TaskTemplateVersion

Create a task template version instance. If it already exists, activate it.

Parameters:
  • command_template – an abstract command representing a task, where the arguments to the command have defined names but the values are not assigned. eg: ‘{python} {script} –data {data} –para {para} {verbose}’

  • node_args – any named arguments in command_template that make the command unique within this template for a given workflow run. Generally these are arguments that can be parallelized over.

  • task_args – any named arguments in command_template that make the command unique across workflows if the node args are the same as a previous workflow. Generally these are arguments about data moving though the task.

  • op_args – any named arguments in command_template that can change without changing the identity of the task. Generally these are things like the task executable location or the verbosity of the script.

  • default_cluster_name – the default cluster to run each task associated with this template on.

  • default_compute_resources – dictionary of default compute resources to run tasks with. Can be overridden at task level. dict of {resource_name: resource_value}. Must specify default_cluster_name when this option is used.

  • default_resource_scales – dictionary of default resource scales to adjust task resources with. Can be overridden at task level. dict of {resource_name: scale_factor}. Scale factor can be a numeric value, a Callable that will be applied to the existing resources, or an Iterator. Any Callable should take a single numeric value as its sole argument. Any Iterator should only yield numeric values. Any Iterable can be easily converted to an Iterator by using the iter() built-in (e.g. iter([80, 160, 190])).

  • default_max_attempts – default max_attempts associated with this template on.

create_task(name: str = '', upstream_tasks: List[jobmon.client.task.Task] = [], task_attributes: List | dict = {}, max_attempts: int | None = None, compute_resources: Dict[str, Any] | None = None, compute_resources_callable: Callable | None = None, resource_scales: Dict[str, Any] | None = None, cluster_name: str = '', fallback_queues: List[str] | None = None, **kwargs: Any) jobmon.client.task.Task

Create an instance of a task associated with this template.

Parameters:
  • name – a name associated with this specific task

  • upstream_tasks – Task objects that must be run prior to this one

  • task_attributes (dict or list) – attributes and their values or just the attributes that will be given values later

  • max_attempts – Number of attempts to try this task before giving up. Default is 3.

  • cluster_name – name of cluster to run task on.

  • compute_resources – dictionary of default compute resources to run tasks with. Can be overridden at task template or task level. dict of {resource_name: resource_value}

  • compute_resources_callable – compute resources generating callable.

  • resource_scales (dict) – How much users want to scale their resource request if the the initial request fails. Scale factor can be a numeric value, a Callable that will be applied to the existing resources, or an Iterator. Any Callable should take a single numeric value as its sole argument. Any Iterator should only yield numeric values. Any Iterable can be easily converted to an Iterator by using the iter() built-in (e.g. iter([80, 160, 190])).

  • **kwargs – values for each argument specified in command_template

Returns:

ExecutableTask

Raises:

ValueError – if the args that are supplied do not match the args in the command template.

create_tasks(max_attempts: int | None = None, upstream_tasks: List[jobmon.client.task.Task] | None = None, max_concurrently_running: int = MaxConcurrentlyRunning.MAXCONCURRENTLYRUNNING, compute_resources: Dict[str, Any] | None = None, compute_resources_callable: Callable | None = None, resource_scales: Dict[str, Any] | None = None, cluster_name: str = '', name: str | None = None, **kwargs: Any) List[jobmon.client.task.Task]

Creates a set of tasks equal to the cross product of all node args.

Parameters:
  • max_attempts – the max number of attempts a task in the array can be retried

  • upstream_tasks – dependencies for all tasks in this array

  • max_concurrently_running – the max number of tasks that can run at once

  • compute_resources – resources to associate with this array, if different from the task template default resources

  • compute_resources_callable – a function that can dynamically generate resources on retries, if different from the task template callable.

  • resource_scales – Parameters for how aggressive we will rescale tasks that are resource killed.

  • cluster_name – The cluster the array will run on

  • **kwargs – task, node, and op_args as defined in the command template. If you provide node_args as an iterable, they will be expanded.

  • name – the name of the array.

resource_usage(workflows: List[int] | None = None, node_args: Dict[str, Any] | None = None, ci: float | None = None) dict | None

Get the aggregate resource usage for a TaskTemplate.