client.status_commands

Commands to check for workflow and task status (from CLI).

Attributes

logger

Functions

update_config_value(→ str)

Update a configuration value in the defaults.yaml file using dot notation.

workflow_status(→ pandas.DataFrame)

Get metadata about workflow progress.

workflow_tasks(→ pandas.DataFrame)

Get metadata about task state for a given workflow.

task_template_resources(→ Optional[Dict])

Get aggregate resource usage data for a given TaskTemplateVersion.

task_status(→ Union[dict, pandas.DataFrame])

Get metadata about a task and its task instances.

concurrency_limit(→ str)

Update a workflow's max_concurrently_running field in the database.

_chunk_ids(→ List[List[int]])

Chunk the ids into a list of 100 ids list.

update_task_status(→ Any)

Set the specified task IDs to the new status, pending validation.

validate_username(→ None)

Validate that the user is approved to make these changes.

validate_workflow(→ jobmon.core.constants.WorkflowStatus)

Validate workflow.

get_sub_task_tree(→ dict)

Get the sub_tree from tasks to ensure that they end up in the right states.

get_task_dependencies(→ dict)

Get the upstream and down stream of a task.

workflow_reset(→ str)

Workflow reset.

_get_yaml_data(→ Dict)

_create_yaml(→ str)

create_resource_yaml(→ str)

The method to create resource yaml.

get_filepaths(→ dict)

Allows users to get the stdout/stderr paths of their tasks.

resume_workflow_from_id(→ None)

Given a workflow ID, resume the workflow.

Module Contents

client.status_commands.logger[source]
client.status_commands.update_config_value(key: str, value: str, config_file: str | None = None) str[source]

Update a configuration value in the defaults.yaml file using dot notation.

Parameters:
  • key – Dot-notated key (e.g., ‘http.retries_attempts’, ‘distributor.poll_interval’)

  • value – New value to set

  • config_file – Optional path to specific config file to update

Returns:

Success message indicating what was updated

Raises:

ValueError – If the key doesn’t exist in the current configuration

client.status_commands.workflow_status(workflow_id: List[int] | None = None, user: List[str] | None = None, json: bool = False, limit: int | None = 5, requester: jobmon.core.requester.Requester | None = None) pandas.DataFrame[source]

Get metadata about workflow progress.

Parameters:
  • workflow_id – workflow_id/s to retrieve info for. If not specified will pull all workflows by user

  • user – user/s to retrieve info for. If not specified will return for current user.

  • limit – return # of records order by wf id desc. Return 5 if not provided; return all if [], [<0].

  • json – Flag to return data as JSON

  • requester – object to communicate with the flask services

Returns:

dataframe of all workflows and their status

client.status_commands.workflow_tasks(workflow_id: int, status: List[str] | None = None, json: bool = False, limit: int = 5, requester: jobmon.core.requester.Requester | None = None) pandas.DataFrame[source]

Get metadata about task state for a given workflow.

Parameters:
  • workflow_id – workflow_id/s to retrieve info for

  • status – limit task state to one of [PENDING, RUNNING, DONE, FATAL] tasks

  • json – Flag to return data as JSON

  • limit – return # of records order by wf id desc. Return 5 if not provided

  • requester – object to communicate with the flask services

Returns:

Dataframe of tasks for a given workflow

client.status_commands.task_template_resources(task_template_version: int, workflows: list | None = None, node_args: Dict | None = None, ci: float | None = None, requester: jobmon.core.requester.Requester | None = None) Dict | None[source]

Get aggregate resource usage data for a given TaskTemplateVersion.

Parameters:
  • task_template_version – The task template version ID the user wants to find the resource usage of.

  • workflows – list of workflows a user wants query by.

  • node_args – dictionary of node arguments a user wants to query by.

  • ci – confidence interval. Not calculate if None.

  • requester – object to communicate with the flask services

Returns:

Dataframe of TaskTemplate resource usage

client.status_commands.task_status(task_ids: List[int], status: List[str] | None = None, json: bool = False, requester: jobmon.core.requester.Requester | None = None) dict | pandas.DataFrame[source]

Get metadata about a task and its task instances.

Parameters:
  • task_ids – a list of task_ids to retrieve task_instance metadata for.

  • status – a list of statuses to check for.

  • json – Flag to return data as JSON.

  • requester – object to communicate with the flask services

Returns:

Task status and task_instance metadata

client.status_commands.concurrency_limit(workflow_id: int, max_tasks: int, requester: jobmon.core.requester.Requester | None = None) str[source]

Update a workflow’s max_concurrently_running field in the database.

Used to dynamically adjust the allowed number of jobs concurrently running.

Parameters:
  • workflow_id (int) – ID of the running workflow whose max_running value needs to be reset

  • max_tasks (int) – new allowed value of parallel tasks

  • requester – object to communicate with the flask services

Returns: string displaying success or failure of the update.

client.status_commands._chunk_ids(ids: List[int], chunk_size: int = 100) List[List[int]][source]

Chunk the ids into a list of 100 ids list.

Parameters:
  • ids – list of ids

  • chunk_size – the size of each chunk; default to 100

Returns: a list of list

client.status_commands.update_task_status(task_ids: List[int], workflow_id: int, new_status: str, force: bool = False, recursive: bool = False, requester: jobmon.core.requester.Requester | None = None) Any[source]

Set the specified task IDs to the new status, pending validation.

Parameters:
  • task_ids – List of task IDs to reset in the database

  • workflow_id – The workflow to which each task belongs. Users can only self-service 1 workflow at a time for the moment.

  • new_status – the status to set tasks to

  • force – if true, allow all source statuses and all workflow statuses.

  • recursive – if true and force, apply recursive update_status downstream or upstream depending on new_status (upstream if new_status == ‘D’; downstream if new_status == ‘G’).

  • requester – object to communicate with the flask services

client.status_commands.validate_username(workflow_id: int, username: str, requester: jobmon.core.requester.Requester) None[source]

Validate that the user is approved to make these changes.

client.status_commands.validate_workflow(task_ids: List[int], requester: jobmon.core.requester.Requester) jobmon.core.constants.WorkflowStatus[source]

Validate workflow.

The task_ids provided belong to the expected workflow, and the workflow status is in expected status unless we want to force it through.

client.status_commands.get_sub_task_tree(task_ids: list, task_status: list | None = None, requester: jobmon.core.requester.Requester | None = None) dict[source]

Get the sub_tree from tasks to ensure that they end up in the right states.

client.status_commands.get_task_dependencies(task_id: int, requester: jobmon.core.requester.Requester | None = None) dict[source]

Get the upstream and down stream of a task.

client.status_commands.workflow_reset(workflow_id: int, partial_reset: bool = False, requester: jobmon.core.requester.Requester | None = None) str[source]

Workflow reset.

Returns:

A string to indicate the workflow_reset result.

Parameters:
  • workflow_id – the workflow id to be reset.

  • partial_reset – if False, tasks in D state will be reset as well

  • requester – http server interface.

client.status_commands._get_yaml_data(wfid: int, tid: int, v_mem: str, v_core: str, v_runtime: str, requester: jobmon.core.requester.Requester) Dict[source]
client.status_commands._create_yaml(data: Dict | None = None, clusters: List | None = None) str[source]
client.status_commands.create_resource_yaml(wfid: int, tid: int, v_mem: str, v_core: str, v_runtime: str, clusters: List, requester: jobmon.core.requester.Requester | None = None) str[source]

The method to create resource yaml.

client.status_commands.get_filepaths(workflow_id: int, array_name: str = '', job_name: str = '', limit: int = 5, requester: jobmon.core.requester.Requester | None = None) dict[source]

Allows users to get the stdout/stderr paths of their tasks.

client.status_commands.resume_workflow_from_id(workflow_id: int, cluster_name: str, reset_if_running: bool = True, log: bool = True, timeout: int = 180, seconds_until_timeout: int = 36000) None[source]

Given a workflow ID, resume the workflow.

Raise an error if the workflow is not completed successfully on resume.