core.task_generator

Attributes

SIMPLE_TYPES

BUILT_IN_COLLECTIONS

OPTIONAL_TYPES

TASK_RUNNER_NAME

TASK_RUNNER_SUB_COMMAND

HELP_TEXT_INTRO_FORMAT

SERIALIZED_EMPTY_STRING

logger

Classes

TaskGenerator

Class for auto-generating jobmon tasks from a python function.

TaskGeneratorDocumenter

Directive for generating documentation for a single task generator.

TaskGeneratorModuleDocumenter

Directive for generating documentation for all the task generators in a module.

Functions

create_task_name(→ str)

Create a task name from the kwargs.

is_optional_type(→ bool)

Return whether the obj_type is an Optional type.

get_optional_type_parameter(→ Type)

Return the type inside an optional.

make_cli_argument_string(→ str)

Make a CLI argument string from an argument name and value.

task_generator(→ Callable)

Decorator for generating jobmon tasks from a python function.

get_tasks_by_node_args(→ list[jobmon.client.task.Task])

Get the tasks of a TaskGenerator in a workflow that have the given node arguments.

setup(→ dict)

The function that registers the extension with sphinx.

Module Contents

core.task_generator.SIMPLE_TYPES
core.task_generator.BUILT_IN_COLLECTIONS
core.task_generator.OPTIONAL_TYPES
core.task_generator.TASK_RUNNER_NAME = 'worker_node_entry_point'
core.task_generator.TASK_RUNNER_SUB_COMMAND = 'task_generator'
core.task_generator.HELP_TEXT_INTRO_FORMAT = 'Command Line Documentation for {full_path}'
core.task_generator.SERIALIZED_EMPTY_STRING = '""'
core.task_generator.logger
core.task_generator.create_task_name(kwargs_for_name: dict, prefix: str = '', name_func: Callable | None = None) str

Create a task name from the kwargs.

core.task_generator.is_optional_type(obj_type: Any) bool

Return whether the obj_type is an Optional type.

Ex. Optional[str] would return True, NoneType would return True, str | None would return True, str would return False.

core.task_generator.get_optional_type_parameter(obj_type: Type) Type

Return the type inside an optional.

core.task_generator.make_cli_argument_string(arg_value: str | List) str

Make a CLI argument string from an argument name and value.

For string, return itself. For list, say [“a”, “b”, “c”], return string ‘[a,b,c]’

class core.task_generator.TaskGenerator(task_function: Callable, serializers: Dict, tool_name: str, naming_args: List[str] | None = None, max_attempts: int | None = None, module_source_path: str | None = None, name_func: Callable | None = None, default_cluster_name: str = '', default_compute_resources: Dict[str, Any] | None = None, default_resource_scales: Dict[str, float] | None = None, yaml_file: str | None = None)

Class for auto-generating jobmon tasks from a python function.

Arguments on the task_function are inspected to generate the task template, and type annotations are used for serializing and deserializing arguments to strings passed on the command line.

The TaskGenerator has built in serializers for:

  • str, int, bool, float

  • list, tuple, set of other serializable types

  • Optional types (and types that are equivalent to Optional types) of other

    serializable types

Users can also supply serializers for custom types by providing a dictionary of type to a tuple of serialization and deserialization functions. Serializers must always turn objects into a single string. Lists of strings are represented on the command line as arg_name='[value1,value2]'.

Note

While lists, tuples and sets (and their Optional counterparts) can currently serialize empty collections, we can’t currently serialize custom types as empty lists.

Initialize TaskGenerator.

Generates and saves the task template.

Parameters:
  • task_function – The function that the task will run.

  • serializers – A dict mapping types to two callables. The first callable is the serializer, which converts an object of the type to a string. And the second is a deserializer, which converts a string to an object of the type.

  • tool_name – A jobmon tool name for generating tasks.

  • naming_args – A list of arguments to use in the task name. If not provided (or None), it uses all the arguments. If [] is provided, it uses no naming arguments, and the name of the task will just be the name of the task function.

  • max_attempts – The max number of attempts jobmon will make on the tasks

  • max_attempts – The max number of attempts jobmon will make on the tasks

  • module_source_path – The path to the module source code. If not provided, the module is assumed to be installed in the system.

  • name_func – A function that takes in the task name prefix and the kwargs to generate the task name. If not provided, the FHS default is used.

  • default_cluster_name – The default cluster name to use when creating tasks. If not provided, an empty string is used.

  • default_compute_resources – The default compute resources when creating tasks. If not provided, an empty dictionary is used.

  • default_resource_scales – The default resource scales to use when creating tasks. If not provided, an empty dictionary is used.

  • yaml_file – The path to the yaml file that contains the task template. If not provided, the default template is used.

task_function
serializers
tool
max_attempts = None
name
module_source_path = None
mod_name
full_path
name_func = None
serialize(obj: Any, expected_type: Type) str

Serialize obj, validating that it is actually of type expected_type.

serialize_array(obj: Any, expected_type: Type) List

Serialize obj into a list of serialized string.

deserialize(obj: str, obj_type: Type) Any

Deserialize obj.

create_task(cluster_name: str = '', compute_resources: Dict | None = None, resource_scales: Dict[str, Any] | None = None, upstream_tasks: List[jobmon.client.task.Task] = [], task_attributes: List[str] | Dict[str, Any] = {}, **kwargs: Any) jobmon.client.task.Task

Create a task for the task_function with the given kwargs.

create_tasks(cluster_name: str = '', compute_resources: Dict | None = None, resource_scales: Dict[str, Any] | None = None, upstream_tasks: List[jobmon.client.task.Task] | None = None, **kwargs: Any) List[jobmon.client.task.Task]

Create a task array for the task_function with the given kwargs.

run(args: List[str]) Any

Run the task_function with the given args and return any result.

help() str

Return help text for the task_function.

core.task_generator.task_generator(serializers: Dict, tool_name: str, naming_args: List[str] | None = None, max_attempts: int | None = None, module_source_path: str | None = None, name_func: Callable | None = None, default_cluster_name: str = '', default_compute_resources: Dict[str, Any] | None = None, default_resource_scales: Dict[str, float] | None = None, yaml_file: str | None = None) Callable

Decorator for generating jobmon tasks from a python function.

core.task_generator.get_tasks_by_node_args(workflow: jobmon.client.workflow.Workflow, task_generator: TaskGenerator, node_args_dict: dict[str, Any], error_on_empty: bool = True) list[jobmon.client.task.Task]

Get the tasks of a TaskGenerator in a workflow that have the given node arguments.

This method does some value serialization and formatting before handing the node argument string over to workflow.get_tasks_by_node_args.

class core.task_generator.TaskGeneratorDocumenter(name, arguments, options, content, lineno, content_offset, block_text, state, state_machine)

Bases: docutils.parsers.rst.Directive

Directive for generating documentation for a single task generator.

required_arguments = 1

Number of required directive arguments.

optional_arguments = 1

Number of optional arguments after the required arguments.

final_argument_whitespace = True

May the final argument contain whitespace?

option_spec

Mapping of option names to validator functions.

run() list[docutils.nodes.Node]

The function sphinx/docutils use to generate documentation for the directive.

class core.task_generator.TaskGeneratorModuleDocumenter(name, arguments, options, content, lineno, content_offset, block_text, state, state_machine)

Bases: docutils.parsers.rst.Directive

Directive for generating documentation for all the task generators in a module.

required_arguments = 1

Number of required directive arguments.

optional_arguments = 1

Number of optional arguments after the required arguments.

final_argument_whitespace = True

May the final argument contain whitespace?

option_spec

Mapping of option names to validator functions.

run() list[docutils.nodes.Node]

The function sphinx/docutils use to generate documentation for the directive.

core.task_generator.setup(app: sphinx.application.Sphinx) dict

The function that registers the extension with sphinx.