core.task_generator
Attributes
Classes
Class for auto-generating jobmon tasks from a python function. |
|
Directive for generating documentation for a single task generator. |
|
Directive for generating documentation for all the task generators in a module. |
Functions
|
Create a task name from the kwargs. |
|
Return whether the |
|
Return the type inside an optional. |
|
Make a CLI argument string from an argument name and value. |
|
Decorator for generating jobmon tasks from a python function. |
|
Get the tasks of a TaskGenerator in a workflow that have the given node arguments. |
|
The function that registers the extension with sphinx. |
Module Contents
- core.task_generator.SIMPLE_TYPES
- core.task_generator.BUILT_IN_COLLECTIONS
- core.task_generator.OPTIONAL_TYPES
- core.task_generator.TASK_RUNNER_NAME = 'worker_node_entry_point'
- core.task_generator.TASK_RUNNER_SUB_COMMAND = 'task_generator'
- core.task_generator.HELP_TEXT_INTRO_FORMAT = 'Command Line Documentation for {full_path}'
- core.task_generator.SERIALIZED_EMPTY_STRING = '""'
- core.task_generator.logger
- core.task_generator.create_task_name(kwargs_for_name: dict, prefix: str = '', name_func: Callable | None = None) str
Create a task name from the kwargs.
- core.task_generator.is_optional_type(obj_type: Any) bool
Return whether the
obj_typeis an Optional type.Ex.
Optional[str]would returnTrue,NoneTypewould returnTrue,str | Nonewould returnTrue,strwould returnFalse.
- core.task_generator.get_optional_type_parameter(obj_type: Type) Type
Return the type inside an optional.
- core.task_generator.make_cli_argument_string(arg_value: str | List) str
Make a CLI argument string from an argument name and value.
For string, return itself. For list, say [“a”, “b”, “c”], return string ‘[a,b,c]’
- class core.task_generator.TaskGenerator(task_function: Callable, serializers: Dict, tool_name: str, naming_args: List[str] | None = None, max_attempts: int | None = None, module_source_path: str | None = None, name_func: Callable | None = None, default_cluster_name: str = '', default_compute_resources: Dict[str, Any] | None = None, default_resource_scales: Dict[str, float] | None = None, yaml_file: str | None = None)
Class for auto-generating jobmon tasks from a python function.
Arguments on the task_function are inspected to generate the task template, and type annotations are used for serializing and deserializing arguments to strings passed on the command line.
The TaskGenerator has built in serializers for:
str, int, bool, float
list, tuple, set of other serializable types
- Optional types (and types that are equivalent to Optional types) of other
serializable types
Users can also supply serializers for custom types by providing a dictionary of type to a tuple of serialization and deserialization functions. Serializers must always turn objects into a single string. Lists of strings are represented on the command line as
arg_name='[value1,value2]'.Note
While lists, tuples and sets (and their
Optionalcounterparts) can currently serialize empty collections, we can’t currently serialize custom types as empty lists.Initialize TaskGenerator.
Generates and saves the task template.
- Parameters:
task_function – The function that the task will run.
serializers – A dict mapping types to two callables. The first callable is the serializer, which converts an object of the type to a string. And the second is a deserializer, which converts a string to an object of the type.
tool_name – A jobmon tool name for generating tasks.
naming_args – A list of arguments to use in the task name. If not provided (or
None), it uses all the arguments. If[]is provided, it uses no naming arguments, and the name of the task will just be the name of the task function.max_attempts – The max number of attempts jobmon will make on the tasks
max_attempts – The max number of attempts jobmon will make on the tasks
module_source_path – The path to the module source code. If not provided, the module is assumed to be installed in the system.
name_func – A function that takes in the task name prefix and the kwargs to generate the task name. If not provided, the FHS default is used.
default_cluster_name – The default cluster name to use when creating tasks. If not provided, an empty string is used.
default_compute_resources – The default compute resources when creating tasks. If not provided, an empty dictionary is used.
default_resource_scales – The default resource scales to use when creating tasks. If not provided, an empty dictionary is used.
yaml_file – The path to the yaml file that contains the task template. If not provided, the default template is used.
- task_function
- serializers
- tool
- max_attempts = None
- name
- module_source_path = None
- mod_name
- full_path
- name_func = None
- serialize(obj: Any, expected_type: Type) str
Serialize
obj, validating that it is actually of typeexpected_type.
- serialize_array(obj: Any, expected_type: Type) List
Serialize obj into a list of serialized string.
- create_task(cluster_name: str = '', compute_resources: Dict | None = None, resource_scales: Dict[str, Any] | None = None, upstream_tasks: List[jobmon.client.task.Task] = [], task_attributes: List[str] | Dict[str, Any] = {}, **kwargs: Any) jobmon.client.task.Task
Create a task for the task_function with the given kwargs.
- core.task_generator.task_generator(serializers: Dict, tool_name: str, naming_args: List[str] | None = None, max_attempts: int | None = None, module_source_path: str | None = None, name_func: Callable | None = None, default_cluster_name: str = '', default_compute_resources: Dict[str, Any] | None = None, default_resource_scales: Dict[str, float] | None = None, yaml_file: str | None = None) Callable
Decorator for generating jobmon tasks from a python function.
- core.task_generator.get_tasks_by_node_args(workflow: jobmon.client.workflow.Workflow, task_generator: TaskGenerator, node_args_dict: dict[str, Any], error_on_empty: bool = True) list[jobmon.client.task.Task]
Get the tasks of a TaskGenerator in a workflow that have the given node arguments.
This method does some value serialization and formatting before handing the node argument string over to
workflow.get_tasks_by_node_args.
- class core.task_generator.TaskGeneratorDocumenter(name, arguments, options, content, lineno, content_offset, block_text, state, state_machine)
Bases:
docutils.parsers.rst.DirectiveDirective for generating documentation for a single task generator.
- required_arguments = 1
Number of required directive arguments.
- optional_arguments = 1
Number of optional arguments after the required arguments.
- final_argument_whitespace = True
May the final argument contain whitespace?
- option_spec
Mapping of option names to validator functions.
- class core.task_generator.TaskGeneratorModuleDocumenter(name, arguments, options, content, lineno, content_offset, block_text, state, state_machine)
Bases:
docutils.parsers.rst.DirectiveDirective for generating documentation for all the task generators in a module.
- required_arguments = 1
Number of required directive arguments.
- optional_arguments = 1
Number of optional arguments after the required arguments.
- final_argument_whitespace = True
May the final argument contain whitespace?
- option_spec
Mapping of option names to validator functions.