Task Generator
The Task Generator is a new feature in Jobmon, currently available only in Python, that enables users to generate tasks from an existing method. This feature utilizes serialization to convert method parameters into strings, forming a CLI command for submission to the cluster. Once on the cluster node, Jobmon deserializes the strings back into their original parameters, allowing the tasks to execute as intended.
Import
- The task generator can be imported from jobmon.core:
from jobmon.core import task_generator
- or
from jobmon.core.task_generator import task_generator
Parameters
Parameter |
Type |
Description |
|---|---|---|
serializers |
Dict |
A dict of {type: callable} to serialize the input parameters. |
naming_args |
Optional[List[str]] |
The args used to name the task. Does not apply to array tasks. |
max_attempts |
Optional[int] |
The maximum number of attempts to run the task. |
module_source_path |
Optional[str] |
The path to the module source code if the module is not in the current conda environment. |
Examples
Method with simple input parameter type:
@task_generator(
serializers={},
tool_name="test_tool",
module_source_path=full_script_path,
max_attempts=1,
naming_args=["foo"],
)
def simple_function(foo: int, bar: List[str] = []) -> None:
"""Simple task_function."""
print(f"foo: {foo}")
print(f"bar: {bar}")
The above code allows the user to use the “simple_function” method as a task generator to generate a single task or a task array.
To create a single task workflow:
tool = Tool("test_tool")
tool.set_default_compute_resources_from_dict(
cluster_name="slurm", compute_resources={"queue": "all.q"}
)
wf = tool.create_workflow()
compute_resources = {"queue": "all.q", "project": "proj_scicomp"}
task = simple_function.create_task(wf, foo=1, bar=["a", "b"], compute_resources=compute_resources)
wf.add_task(task)
wf.run()
The above code creates a workflow with a single task named “simple_function:foo=1” and runs it.
To create a task array workflow:
tool = Tool("test_tool")
tool.set_default_compute_resources_from_dict(
cluster_name="slurm", compute_resources={"queue": "all.q"}
)
wf = tool.create_workflow()
compute_resources = {"queue": "all.q", "project": "proj_scicomp"}
tasks = simple_function.create_tasks(compute_resources=compute_resources, foo=[1, 2], bar=[["a", "b"]])
wf.add_task_array(task_array)
wf.run()
The above code creates a workflow with a task array of two tasks with input (foo=1, bar=[“a”, “b”] and (foo=2, bar=[“a”, “b”]) and runs it.
Method with complex input parameter type:
class TestYear:
"""A fake YearRange class for testing"""
def __init__(self, year: int) -> None:
self.year = year
@staticmethod
def parse_year(year: str):
"""Parse a year range."""
return TestYear(int(year))
def __str__(self) -> str:
return str(self.year)
def __eq__(self, other):
return self.year == other.year
test_year_serializer = {TestYear: (str, TestYear.parse_year)}
@task_generator.task_generator(
serializers=test_year_serializer,
tool_name="test_tool",
module_source_path=full_script_path,
max_attempts=1,
naming_args=["year"],
)
def simple_function_with_serializer(year: TestYear) -> None:
"""Simple task_function."""
print(f"year: {year}")
The above code creates a testing class, TestYear, with a serializer to convert s string to TestYear, and a task generator, simple_function_with_serializer.
To create a single task workflow:
tool = Tool("test_tool")
tool.set_default_compute_resources_from_dict(
cluster_name="slurm", compute_resources={"queue": "all.q"}
)
wf = tool.create_workflow()
compute_resources = {"queue": "all.q", "project": "proj_scicomp"}
task = simple_function_with_serializer.create_task(wf,
year=TestYear(2021),
compute_resources=compute_resources)
wf.add_task(task)
wf.run()
The above code creates a workflow with a single task named “simple_function_with_serializer:year=2021” and runs it.
To create a workflow with function input containing special characters like a single quote:
import html
def special_char_encodeing(input: str) -> str:
"""Encode special characters."""
return html.escape(input)
def special_char_decoding(input: str) -> str:
"""Decode special characters."""
return html.unescape(input)
@task_generator.task_generator(
serializers={str: (special_char_encodeing, special_char_decoding)},
tool_name="test_tool",
module_source_path=full_script_path,
max_attempts=1,
naming_args=["foo"],
)
def special_chars_function(foo: str) -> None:
"""Simple task_function."""
print(f"foo: {foo}")
tool = Tool("test_tool")
tool.set_default_compute_resources_from_dict(
cluster_name="slurm", compute_resources={"queue": "all.q"}
)
wf = tool.create_workflow()
compute_resources = {"queue": "all.q", "project": "proj_scicomp"}
simple_function = task_generator_funcs.special_chars_function
task = simple_function.create_task(compute_resources=compute_resources, foo=f"\'aaa\'")
wf.add_task(task)
wf.run()
The above code creates a workflow with a single task what requests special characters in the input. Please note that this makes the Jobmon command harder to read and understand; thus, SciComp is not responsible to debug it.
- You can pass your own function to name your task. The function should take two arguments:
prefix: str - this will be your function name kwargs_for_name: Dict[str, Any] - the arguments of the task and return a string.
def custom_naming(prefix: str, kwargs_for_name: Dict[str, Any]) -> str:
return f"Lala_{kwargs_for_name['foo']}"
@task_generator(
serializers={},
tool_name="test_tool",
module_source_path=full_script_path,
max_attempts=1,
naming_args=["foo"],
custom_naming=custom_naming,
)
def simple_function(foo: int, bar: List[str] = []) -> None:
"""Simple task_function."""
print(f"foo: {foo}")
print(f"bar: {bar}")
The above code creates a task generator with a custom naming function. The task will be named “Lala_1” instead of “simple_function:foo=1”.
@task_generator(
default_cluster_name="slurm",
default_compute_resources={"queue": "all.q", "project": "proj_scicomp"},
serializers={},
tool_name="test_tool",
module_source_path=full_script_path,
max_attempts=1,
naming_args=["foo"],
)
def simple_function(foo: int, bar: List[str] = []) -> None:
"""Simple task_function."""
print(f"foo: {foo}")
print(f"bar: {bar}")
The above code creates a task generator with default cluster name and compute resources, so you do not need to specify them when creating a task.
@task_generator(
yaml_file="/tmp/task_generator.yaml",
serializers={},
tool_name="test_tool",
module_source_path=full_script_path,
max_attempts=1,
naming_args=["foo"],
)
def simple_function(foo: int, bar: List[str] = []) -> None:
"""Simple task_function."""
print(f"foo: {foo}")
print(f"bar: {bar}")
The above code creates a task generator with a yaml file that contains the default cluster name and compute resources,