core.task_generator =================== .. py:module:: core.task_generator Attributes ---------- .. autoapisummary:: core.task_generator.SIMPLE_TYPES core.task_generator.BUILT_IN_COLLECTIONS core.task_generator.OPTIONAL_TYPES core.task_generator.TASK_RUNNER_NAME core.task_generator.TASK_RUNNER_SUB_COMMAND core.task_generator.HELP_TEXT_INTRO_FORMAT core.task_generator.SERIALIZED_EMPTY_STRING core.task_generator.logger Classes ------- .. autoapisummary:: core.task_generator.TaskGenerator core.task_generator.TaskGeneratorDocumenter core.task_generator.TaskGeneratorModuleDocumenter Functions --------- .. autoapisummary:: core.task_generator.create_task_name core.task_generator.is_optional_type core.task_generator.get_optional_type_parameter core.task_generator.make_cli_argument_string core.task_generator.task_generator core.task_generator.get_tasks_by_node_args core.task_generator.setup Module Contents --------------- .. py:data:: SIMPLE_TYPES .. py:data:: BUILT_IN_COLLECTIONS .. py:data:: OPTIONAL_TYPES .. py:data:: TASK_RUNNER_NAME :value: 'worker_node_entry_point' .. py:data:: TASK_RUNNER_SUB_COMMAND :value: 'task_generator' .. py:data:: HELP_TEXT_INTRO_FORMAT :value: 'Command Line Documentation for {full_path}' .. py:data:: SERIALIZED_EMPTY_STRING :value: '""' .. py:data:: logger .. py:function:: create_task_name(kwargs_for_name: dict, prefix: str = '', name_func: Optional[Callable] = None) -> str Create a task name from the kwargs. .. py:function:: is_optional_type(obj_type: Any) -> bool Return whether the ``obj_type`` is an Optional type. Ex. ``Optional[str]`` would return ``True``, ``NoneType`` would return ``True``, ``str | None`` would return ``True``, ``str`` would return ``False``. .. py:function:: get_optional_type_parameter(obj_type: Type) -> Type Return the type inside an optional. .. py:function:: make_cli_argument_string(arg_value: Union[str, List]) -> str Make a CLI argument string from an argument name and value. For string, return itself. For list, say ["a", "b", "c"], return string '[a,b,c]' .. py:class:: TaskGenerator(task_function: Callable, serializers: Dict, tool_name: str, naming_args: Optional[List[str]] = None, max_attempts: Optional[int] = None, module_source_path: Optional[str] = None, name_func: Optional[Callable] = None, default_cluster_name: str = '', default_compute_resources: Optional[Dict[str, Any]] = None, default_resource_scales: Optional[Dict[str, float]] = None, yaml_file: Optional[str] = None) Class for auto-generating jobmon tasks from a python function. Arguments on the task_function are inspected to generate the task template, and type annotations are used for serializing and deserializing arguments to strings passed on the command line. The TaskGenerator has built in serializers for: * str, int, bool, float * list, tuple, set of other serializable types * Optional types (and types that are equivalent to Optional types) of other serializable types Users can also supply serializers for custom types by providing a dictionary of type to a tuple of serialization and deserialization functions. Serializers must always turn objects into a single string. Lists of strings are represented on the command line as ``arg_name='[value1,value2]'``. .. note:: While lists, tuples and sets (and their ``Optional`` counterparts) can currently serialize empty collections, we can't currently serialize custom types as empty lists. Initialize TaskGenerator. Generates and saves the task template. :param task_function: The function that the task will run. :param serializers: A dict mapping types to two callables. The first callable is the serializer, which converts an object of the type to a string. And the second is a deserializer, which converts a string to an object of the type. :param tool_name: A jobmon tool name for generating tasks. :param naming_args: A list of arguments to use in the task name. If not provided (or ``None``), it uses all the arguments. If ``[]`` is provided, it uses no naming arguments, and the name of the task will just be the name of the task function. :param max_attempts: The max number of attempts jobmon will make on the tasks :param max_attempts: The max number of attempts jobmon will make on the tasks :param module_source_path: The path to the module source code. If not provided, the module is assumed to be installed in the system. :param name_func: A function that takes in the task name prefix and the kwargs to generate the task name. If not provided, the FHS default is used. :param default_cluster_name: The default cluster name to use when creating tasks. If not provided, an empty string is used. :param default_compute_resources: The default compute resources when creating tasks. If not provided, an empty dictionary is used. :param default_resource_scales: The default resource scales to use when creating tasks. If not provided, an empty dictionary is used. :param yaml_file: The path to the yaml file that contains the task template. If not provided, the default template is used. .. py:attribute:: task_function .. py:attribute:: serializers .. py:attribute:: tool .. py:attribute:: max_attempts :value: None .. py:attribute:: name .. py:attribute:: module_source_path :value: None .. py:attribute:: mod_name .. py:attribute:: full_path .. py:attribute:: name_func :value: None .. py:method:: serialize(obj: Any, expected_type: Type) -> str Serialize ``obj``, validating that it is actually of type ``expected_type``. .. py:method:: serialize_array(obj: Any, expected_type: Type) -> List Serialize obj into a list of serialized string. .. py:method:: deserialize(obj: str, obj_type: Type) -> Any Deserialize ``obj``. .. py:method:: create_task(cluster_name: str = '', compute_resources: Optional[Dict] = None, resource_scales: Optional[Dict[str, Any]] = None, upstream_tasks: List[jobmon.client.task.Task] = [], task_attributes: Union[List[str], Dict[str, Any]] = {}, **kwargs: Any) -> jobmon.client.task.Task Create a task for the task_function with the given kwargs. .. py:method:: create_tasks(cluster_name: str = '', compute_resources: Optional[Dict] = None, resource_scales: Optional[Dict[str, Any]] = None, upstream_tasks: Optional[List[jobmon.client.task.Task]] = None, **kwargs: Any) -> List[jobmon.client.task.Task] Create a task array for the task_function with the given kwargs. .. py:method:: run(args: List[str]) -> Any Run the task_function with the given args and return any result. .. py:method:: help() -> str Return help text for the task_function. .. py:function:: task_generator(serializers: Dict, tool_name: str, naming_args: Optional[List[str]] = None, max_attempts: Optional[int] = None, module_source_path: Optional[str] = None, name_func: Optional[Callable] = None, default_cluster_name: str = '', default_compute_resources: Optional[Dict[str, Any]] = None, default_resource_scales: Optional[Dict[str, float]] = None, yaml_file: Optional[str] = None) -> Callable Decorator for generating jobmon tasks from a python function. .. py:function:: get_tasks_by_node_args(workflow: jobmon.client.workflow.Workflow, task_generator: TaskGenerator, node_args_dict: dict[str, Any], error_on_empty: bool = True) -> list[jobmon.client.task.Task] Get the tasks of a TaskGenerator in a workflow that have the given node arguments. This method does some value serialization and formatting before handing the node argument string over to ``workflow.get_tasks_by_node_args``. .. py:class:: TaskGeneratorDocumenter(name, arguments, options, content, lineno, content_offset, block_text, state, state_machine) Bases: :py:obj:`docutils.parsers.rst.Directive` Directive for generating documentation for a single task generator. .. py:attribute:: required_arguments :value: 1 Number of required directive arguments. .. py:attribute:: optional_arguments :value: 1 Number of optional arguments after the required arguments. .. py:attribute:: final_argument_whitespace :value: True May the final argument contain whitespace? .. py:attribute:: option_spec Mapping of option names to validator functions. .. py:method:: run() -> list[docutils.nodes.Node] The function sphinx/docutils use to generate documentation for the directive. .. py:class:: TaskGeneratorModuleDocumenter(name, arguments, options, content, lineno, content_offset, block_text, state, state_machine) Bases: :py:obj:`docutils.parsers.rst.Directive` Directive for generating documentation for all the task generators in a module. .. py:attribute:: required_arguments :value: 1 Number of required directive arguments. .. py:attribute:: optional_arguments :value: 1 Number of optional arguments after the required arguments. .. py:attribute:: final_argument_whitespace :value: True May the final argument contain whitespace? .. py:attribute:: option_spec Mapping of option names to validator functions. .. py:method:: run() -> list[docutils.nodes.Node] The function sphinx/docutils use to generate documentation for the directive. .. py:function:: setup(app: sphinx.application.Sphinx) -> dict The function that registers the extension with sphinx.