server.web.repositories.task_repository

Repository for Task operations.

Attributes

logger

_task_instance_label_mapping

_reversed_task_instance_label_mapping

Classes

TaskRepository

Module Contents

server.web.repositories.task_repository.logger[source]
server.web.repositories.task_repository._task_instance_label_mapping[source]
server.web.repositories.task_repository._reversed_task_instance_label_mapping[source]
class server.web.repositories.task_repository.TaskRepository(session: sqlalchemy.orm.Session)[source]
session[source]
update_task_statuses(workflow_id: str, recursive: bool, workflow_status: str | None, task_ids: List[int] | str, new_status: str) None[source]

Update the status of tasks with business logic.

Description:
  • When task_ids=’all’, it updates all tasks in the workflow with

recursive=False. This improves performance. - When recursive=True, it updates the tasks and it’s dependencies all the way up or down the DAG. - When recursive=False, it updates only the tasks in the task_ids list. - When workflow_status is None, it gets the workflow status from the db. - After updating the tasks, it checks the workflow status and updates it.

_get_all_task_ids(workflow_id: str) List[int][source]

Get all task IDs for a workflow.

_get_recursive_task_ids(task_ids: List[int], new_status: str) List[int][source]

Get task IDs including dependencies based on status direction.

_update_task_statuses_in_db(task_ids: List[int], new_status: str) None[source]

Update task statuses in the database.

_get_workflow_run(workflow_id: str) jobmon.server.web.models.workflow_run.WorkflowRun | None[source]

Get the latest workflow run for a workflow.

_kill_active_task_instances(task_ids: List[int], workflow_run_id: int) None[source]

Kill active task instances for the given tasks.

_handle_registering_status(workflow_id: str, task_ids: List[int], workflow_status: str | None) None[source]

Handle special logic for REGISTERING status.

_handle_done_status(workflow_id: str, new_status: str) None[source]

Handle special logic for DONE status.

_get_tasks_recursive(task_ids: Set[int], direction: jobmon.core.constants.Direction) Set[int][source]

Get all task IDs connected in the specified direction iteratively.

Starting with the given task_ids, the function traverses the dependency graph and returns all tasks found, including the input set. It also verifies that all tasks belong to the same workflow.

Parameters:
  • task_ids (Set[int]) – Initial set of task IDs.

  • direction (Direction) – Either Direction.UP or Direction.DOWN.

Returns:

The complete set of task IDs connected in the specified direction.

Return type:

Set[int]

_get_node_dependencies(nodes: set, dag_id: int, direction: jobmon.core.constants.Direction) Set[int][source]

Get all upstream or downstream nodes of a node.

Parameters:
  • nodes (set) – set of nodes

  • dag_id (int) – ID of DAG

  • direction (Direction) – either up or down

_get_tasks_from_nodes(workflow_id: int, nodes: List, task_status: List) dict[source]

Get task ids of the given node ids.

Parameters:
  • workflow_id (int) – ID of the workflow

  • nodes (list) – list of nodes

  • task_status (list) – list of task statuses

get_task_status(task_ids: int | List[int] | None, status: str | List[str] | None) jobmon.server.web.schemas.task.TaskStatusResponse[source]

Get the status of tasks with filtering.

get_task_subdag(task_ids: List[int], task_status: List[str]) jobmon.server.web.schemas.task.TaskSubdagResponse[source]

Get the sub DAG of given tasks.

_get_subdag(node_ids: List[int], dag_id: int) List[int][source]

Get all descendants of given nodes.

Parameters:
  • node_ids (list) – list of node IDs

  • dag_id (int) – ID of DAG

get_task_dependencies(task_id: int) jobmon.server.web.schemas.task.TaskDependenciesResponse[source]

Get task’s downstream and upstream tasks and their status.

_get_dag_and_wf_id(task_id: int) tuple[source]

Get DAG ID, workflow ID, and node ID for a task.

get_task_resource_usage(task_id: int) jobmon.server.web.schemas.task.TaskResourceUsageResponse[source]

Return the resource usage for a given Task ID.

get_downstream_tasks(task_ids: List[int], dag_id: int, client_version: str | None = None) jobmon.server.web.schemas.task.DownstreamTasksResponse[source]

Get only the direct downstreams of a task.

get_task_instance_details(task_id: int) jobmon.server.web.schemas.task.TaskInstanceDetailsResponse[source]

Get information about TaskInstances associated with specific Task ID.

get_task_details_viz(task_id: int) jobmon.server.web.schemas.task.TaskDetailsResponse[source]

Get status of Task from Task ID.