core.task_generator

Attributes

SIMPLE_TYPES

BUILT_IN_COLLECTIONS

OPTIONAL_TYPES

TASK_RUNNER_NAME

TASK_RUNNER_SUB_COMMAND

HELP_TEXT_INTRO_FORMAT

SERIALIZED_EMPTY_STRING

logger

Classes

TaskGenerator

Class for auto-generating jobmon tasks from a python function.

TaskGeneratorDocumenter

Directive for generating documentation for a single task generator.

TaskGeneratorModuleDocumenter

Directive for generating documentation for all the task generators in a module.

Functions

create_task_name(→ str)

Create a task name from the kwargs.

_is_multidimensional_type(→ bool)

_find_executable_path(→ str)

_is_unannotated_built_in_collection_type(→ bool)

Return whether the obj_type is an unannotated, built-in collection type.

_get_collection_type(→ Optional[Type])

Return the type of collection.

_get_generic_type_parameters(→ Any)

Return the parameters for a generic type.

_is_annotated_built_in_collection_type(→ bool)

Return whether the type is a properly annotated built-in collection type.

_zip_collection_items_and_types(→ Any)

Return a list of tuples mapping collection items and their types.

is_optional_type(→ bool)

Return whether the obj_type is an Optional type.

get_optional_type_parameter(→ Type)

Return the type inside an optional.

_clean_arg_name(→ str)

Remove the -- from the arg name and convert dashes to underscores.

_get_short_description(→ str)

make_cli_argument_string(→ str)

Make a CLI argument string from an argument name and value.

task_generator(→ Callable)

Decorator for generating jobmon tasks from a python function.

get_tasks_by_node_args(→ list[jobmon.client.task.Task])

Get the tasks of a TaskGenerator in a workflow that have the given node arguments.

_generate_nodes(→ docutils.nodes.Node)

Makes a docutils node with the documentation for a task generator.

_format_description(→ list[str])

Format the description of the task generator into proper rST.

_format_options(→ list[str])

Format the options of the task generator into proper rST.

setup(→ dict)

The function that registers the extension with sphinx.

Module Contents

core.task_generator.SIMPLE_TYPES[source]
core.task_generator.BUILT_IN_COLLECTIONS[source]
core.task_generator.OPTIONAL_TYPES[source]
core.task_generator.TASK_RUNNER_NAME = 'worker_node_entry_point'[source]
core.task_generator.TASK_RUNNER_SUB_COMMAND = 'task_generator'[source]
core.task_generator.HELP_TEXT_INTRO_FORMAT = 'Command Line Documentation for {full_path}'[source]
core.task_generator.SERIALIZED_EMPTY_STRING = '""'[source]
core.task_generator.logger[source]
core.task_generator.create_task_name(kwargs_for_name: dict, prefix: str = '', name_func: Callable | None = None) str[source]

Create a task name from the kwargs.

core.task_generator._is_multidimensional_type(type_hint: Any) bool[source]
core.task_generator._find_executable_path(executable_name: str) str[source]
core.task_generator._is_unannotated_built_in_collection_type(obj_type: Any) bool[source]

Return whether the obj_type is an unannotated, built-in collection type.

Ex. list would return True, tuple would return True, list[str] would return False.

core.task_generator._get_collection_type(obj_type: Any) Type | None[source]

Return the type of collection.

To check, we extract the __origin__ component of the obj_type – this would be the type of collection (list, tuple, set).

Ex. list[int] would return list, tuple[str] would return tuple, float would return None (because it is not a collection).

core.task_generator._get_generic_type_parameters(obj_type: Any) Any[source]

Return the parameters for a generic type.

The results are a tuple of types. For example, a list[int] returns (int,), a tuple[int, str], returns (int, str), and a Union[str, None] returns (str, NoneType).

core.task_generator._is_annotated_built_in_collection_type(obj_type: Any) bool[source]

Return whether the type is a properly annotated built-in collection type.

core.task_generator._zip_collection_items_and_types(obj: Collection[str], collection_items_type: tuple) Any[source]

Return a list of tuples mapping collection items and their types.

Ex. obj is [1, 2, 3] and collection_items_type is (int,) would return [(1, int), (2, int), (3, int)].

core.task_generator.is_optional_type(obj_type: Any) bool[source]

Return whether the obj_type is an Optional type.

Ex. Optional[str] would return True, NoneType would return True, str | None would return True, str would return False.

core.task_generator.get_optional_type_parameter(obj_type: Type) Type[source]

Return the type inside an optional.

core.task_generator._clean_arg_name(arg_name: str) str[source]

Remove the -- from the arg name and convert dashes to underscores.

core.task_generator._get_short_description(task_function_docstring: docstring_parser.Docstring) str[source]
core.task_generator.make_cli_argument_string(arg_value: str | List) str[source]

Make a CLI argument string from an argument name and value.

For string, return itself. For list, say [“a”, “b”, “c”], return string ‘[a,b,c]’

class core.task_generator.TaskGenerator(task_function: Callable, serializers: Dict, tool_name: str, naming_args: List[str] | None = None, max_attempts: int | None = None, module_source_path: str | None = None, name_func: Callable | None = None, default_cluster_name: str = '', default_compute_resources: Dict[str, Any] | None = None, default_resource_scales: Dict[str, float] | None = None, yaml_file: str | None = None)[source]

Class for auto-generating jobmon tasks from a python function.

Arguments on the task_function are inspected to generate the task template, and type annotations are used for serializing and deserializing arguments to strings passed on the command line.

The TaskGenerator has built in serializers for:

  • str, int, bool, float

  • list, tuple, set of other serializable types

  • Optional types (and types that are equivalent to Optional types) of other

    serializable types

Users can also supply serializers for custom types by providing a dictionary of type to a tuple of serialization and deserialization functions. Serializers must always turn objects into a single string. Lists of strings are represented on the command line as arg_name='[value1,value2]'.

Note

While lists, tuples and sets (and their Optional counterparts) can currently serialize empty collections, we can’t currently serialize custom types as empty lists.

task_function[source]
serializers[source]
tool[source]
max_attempts = None[source]
mod_name = 'Uninferable'[source]
name[source]
full_path = 'Uninferable:Uninferable'[source]
module_source_path = None[source]
params[source]
_naming_args = None[source]
_task_template = None[source]
_default_cluster_name = ''[source]
_default_compute_resources = None[source]
_default_resource_scales = None[source]
_yaml_file = None[source]
name_func = None[source]
_validate_task_function() None[source]

Check that a task can be generated from the task_function.

Currently checks: - All parameters have type annotations - All parameters have serializers

_is_valid_annotation(annotation: Type) bool[source]

Returns True if the annotation is valid, False otherwise.

_generate_task_template() None[source]

Generate and store the task template.

_is_valid_type(obj: Any, expected_type: Type) bool[source]

Check that the obj type matches the expected type.

serialize(obj: Any, expected_type: Type) str[source]

Serialize obj, validating that it is actually of type expected_type.

serialize_array(obj: Any, expected_type: Type) List[source]

Serialize obj into a list of serialized string.

deserialize(obj: str, obj_type: Type) Any[source]

Deserialize obj.

_cluster_resource_check(cluster_name: str, compute_resources: Dict | None) str[source]

Make sure a cluster name is available to use, and compute_resource is a dict.

create_task(cluster_name: str = '', compute_resources: Dict | None = None, resource_scales: Dict[str, Any] | None = None, upstream_tasks: List[jobmon.client.task.Task] = [], task_attributes: List[str] | Dict[str, Any] = {}, **kwargs: Any) jobmon.client.task.Task[source]

Create a task for the task_function with the given kwargs.

create_tasks(cluster_name: str = '', compute_resources: Dict | None = None, resource_scales: Dict[str, Any] | None = None, upstream_tasks: List[jobmon.client.task.Task] | None = None, **kwargs: Any) List[jobmon.client.task.Task][source]

Create a task array for the task_function with the given kwargs.

run(args: List[str]) Any[source]

Run the task_function with the given args and return any result.

help() str[source]

Return help text for the task_function.

_get_param_names_and_descriptions(task_function_docstring_param_names_to_annotations: Dict, task_function_docstring_param_names_to_descriptions: Dict) List[str][source]
core.task_generator.task_generator(serializers: Dict, tool_name: str, naming_args: List[str] | None = None, max_attempts: int | None = None, module_source_path: str | None = None, name_func: Callable | None = None, default_cluster_name: str = '', default_compute_resources: Dict[str, Any] | None = None, default_resource_scales: Dict[str, float] | None = None, yaml_file: str | None = None) Callable[source]

Decorator for generating jobmon tasks from a python function.

core.task_generator.get_tasks_by_node_args(workflow: jobmon.client.workflow.Workflow, task_generator: TaskGenerator, node_args_dict: dict[str, Any], error_on_empty: bool = True) list[jobmon.client.task.Task][source]

Get the tasks of a TaskGenerator in a workflow that have the given node arguments.

This method does some value serialization and formatting before handing the node argument string over to workflow.get_tasks_by_node_args.

class core.task_generator.TaskGeneratorDocumenter(name, arguments, options, content, lineno, content_offset, block_text, state, state_machine)[source]

Bases: docutils.parsers.rst.Directive

Directive for generating documentation for a single task generator.

required_arguments = 1[source]

Number of required directive arguments.

optional_arguments = 1[source]

Number of optional arguments after the required arguments.

final_argument_whitespace = True[source]

May the final argument contain whitespace?

option_spec[source]

Mapping of option names to validator functions.

run() list[docutils.nodes.Node][source]

The function sphinx/docutils use to generate documentation for the directive.

_load_task_generator(task_generator_path: str, module_path: str | None = None) TaskGenerator[source]

Load the task generator from the given path.

class core.task_generator.TaskGeneratorModuleDocumenter(name, arguments, options, content, lineno, content_offset, block_text, state, state_machine)[source]

Bases: docutils.parsers.rst.Directive

Directive for generating documentation for all the task generators in a module.

required_arguments = 1[source]

Number of required directive arguments.

optional_arguments = 1[source]

Number of optional arguments after the required arguments.

final_argument_whitespace = True[source]

May the final argument contain whitespace?

option_spec[source]

Mapping of option names to validator functions.

run() list[docutils.nodes.Node][source]

The function sphinx/docutils use to generate documentation for the directive.

_load_module_task_generators(module_name: str, module_path: str | None = None) tuple[types.ModuleType, list[TaskGenerator]][source]

Load all the task generators in a given module.

_generate_module_section(module_name: str, module: types.ModuleType, task_generators: Iterable[TaskGenerator]) docutils.nodes.Node[source]

Makes the docutils node for the module section.

Includes the docstring for the module, and a list of task generator names. Doesn’t include any task generator detailed documentation.

core.task_generator._generate_nodes(task_generator: TaskGenerator, state: Any) docutils.nodes.Node[source]

Makes a docutils node with the documentation for a task generator.

Includes the docstring description for the task generator and all the arguments.

core.task_generator._format_description(task_generator: TaskGenerator, parsed_docstring: docstring_parser.Docstring) list[str][source]

Format the description of the task generator into proper rST.

core.task_generator._format_options(task_generator: TaskGenerator, parsed_docstring: docstring_parser.Docstring) list[str][source]

Format the options of the task generator into proper rST.

core.task_generator.setup(app: sphinx.application.Sphinx) dict[source]

The function that registers the extension with sphinx.