Task Generator

The Task Generator is a new feature in Jobmon, currently available only in Python, that enables users to generate tasks from an existing method. This feature utilizes serialization to convert method parameters into strings, forming a CLI command for submission to the cluster. Once on the cluster node, Jobmon deserializes the strings back into their original parameters, allowing the tasks to execute as intended.

Import

The task generator can be imported from jobmon.core:

from jobmon.core import task_generator

or

from jobmon.core.task_generator import task_generator

Parameters

Parameter

Type

Description

serializers

Dict

A dict of {type: callable} to serialize the input parameters.

naming_args

Optional[List[str]]

The args used to name the task. Does not apply to array tasks.

max_attempts

Optional[int]

The maximum number of attempts to run the task.

module_source_path

Optional[str]

The path to the module source code if the module is not in the current conda environment.

Examples

Method with simple input parameter type:

@task_generator(
    serializers={},
    tool_name="test_tool",
    module_source_path=full_script_path,
    max_attempts=1,
    naming_args=["foo"],
)
def simple_function(foo: int, bar: List[str] = []) -> None:
    """Simple task_function."""
    print(f"foo: {foo}")
    print(f"bar: {bar}")

The above code allows the user to use the “simple_function” method as a task generator to generate a single task or a task array.

To create a single task workflow:

tool = Tool("test_tool")
tool.set_default_compute_resources_from_dict(
    cluster_name="slurm", compute_resources={"queue": "all.q"}
)
wf = tool.create_workflow()
compute_resources = {"queue": "all.q", "project": "proj_scicomp"}
task = simple_function.create_task(wf, foo=1, bar=["a", "b"], compute_resources=compute_resources)
wf.add_task(task)
wf.run()

The above code creates a workflow with a single task named “simple_function:foo=1” and runs it.

To create a task array workflow:

tool = Tool("test_tool")
tool.set_default_compute_resources_from_dict(
    cluster_name="slurm", compute_resources={"queue": "all.q"}
)
wf = tool.create_workflow()
compute_resources = {"queue": "all.q", "project": "proj_scicomp"}
tasks = simple_function.create_tasks(compute_resources=compute_resources, foo=[1, 2], bar=[["a", "b"]])
wf.add_task_array(task_array)
wf.run()

The above code creates a workflow with a task array of two tasks with input (foo=1, bar=[“a”, “b”] and (foo=2, bar=[“a”, “b”]) and runs it.

Method with complex input parameter type:

class TestYear:
    """A fake YearRange class for testing"""
    def __init__(self, year: int) -> None:
        self.year = year
    @staticmethod
    def parse_year(year: str):
        """Parse a year range."""
        return TestYear(int(year))
    def __str__(self) -> str:
        return str(self.year)
    def __eq__(self, other):
        return self.year == other.year
test_year_serializer = {TestYear: (str, TestYear.parse_year)}
@task_generator.task_generator(
    serializers=test_year_serializer,
    tool_name="test_tool",
    module_source_path=full_script_path,
    max_attempts=1,
    naming_args=["year"],
)
def simple_function_with_serializer(year: TestYear) -> None:
    """Simple task_function."""
    print(f"year: {year}")

The above code creates a testing class, TestYear, with a serializer to convert s string to TestYear, and a task generator, simple_function_with_serializer.

To create a single task workflow:

tool = Tool("test_tool")
tool.set_default_compute_resources_from_dict(
    cluster_name="slurm", compute_resources={"queue": "all.q"}
)
wf = tool.create_workflow()
compute_resources = {"queue": "all.q", "project": "proj_scicomp"}
task = simple_function_with_serializer.create_task(wf,
           year=TestYear(2021),
           compute_resources=compute_resources)
wf.add_task(task)
wf.run()

The above code creates a workflow with a single task named “simple_function_with_serializer:year=2021” and runs it.

To create a workflow with function input containing special characters like a single quote:

import html

def special_char_encodeing(input: str) -> str:
"""Encode special characters."""
return html.escape(input)


def special_char_decoding(input: str) -> str:
    """Decode special characters."""
    return html.unescape(input)


@task_generator.task_generator(
    serializers={str: (special_char_encodeing, special_char_decoding)},
    tool_name="test_tool",
    module_source_path=full_script_path,
    max_attempts=1,
    naming_args=["foo"],
)
def special_chars_function(foo: str) -> None:
    """Simple task_function."""
    print(f"foo: {foo}")


tool = Tool("test_tool")
tool.set_default_compute_resources_from_dict(
    cluster_name="slurm", compute_resources={"queue": "all.q"}
)
wf = tool.create_workflow()
compute_resources = {"queue": "all.q", "project": "proj_scicomp"}
simple_function = task_generator_funcs.special_chars_function
task = simple_function.create_task(compute_resources=compute_resources, foo=f"\'aaa\'")
wf.add_task(task)
wf.run()

The above code creates a workflow with a single task what requests special characters in the input. Please note that this makes the Jobmon command harder to read and understand; thus, SciComp is not responsible to debug it.

You can pass your own function to name your task. The function should take two arguments:

prefix: str - this will be your function name kwargs_for_name: Dict[str, Any] - the arguments of the task and return a string.

def custom_naming(prefix: str, kwargs_for_name: Dict[str, Any]) -> str:
    return f"Lala_{kwargs_for_name['foo']}"

@task_generator(
    serializers={},
    tool_name="test_tool",
    module_source_path=full_script_path,
    max_attempts=1,
    naming_args=["foo"],
    custom_naming=custom_naming,
)
def simple_function(foo: int, bar: List[str] = []) -> None:
    """Simple task_function."""
    print(f"foo: {foo}")
    print(f"bar: {bar}")

The above code creates a task generator with a custom naming function. The task will be named “Lala_1” instead of “simple_function:foo=1”.

@task_generator(
    default_cluster_name="slurm",
    default_compute_resources={"queue": "all.q", "project": "proj_scicomp"},
    serializers={},
    tool_name="test_tool",
    module_source_path=full_script_path,
    max_attempts=1,
    naming_args=["foo"],
)
def simple_function(foo: int, bar: List[str] = []) -> None:
    """Simple task_function."""
    print(f"foo: {foo}")
    print(f"bar: {bar}")

The above code creates a task generator with default cluster name and compute resources, so you do not need to specify them when creating a task.

@task_generator(
    yaml_file="/tmp/task_generator.yaml",
    serializers={},
    tool_name="test_tool",
    module_source_path=full_script_path,
    max_attempts=1,
    naming_args=["foo"],
)
def simple_function(foo: int, bar: List[str] = []) -> None:
    """Simple task_function."""
    print(f"foo: {foo}")
    print(f"bar: {bar}")

The above code creates a task generator with a yaml file that contains the default cluster name and compute resources,