server.web.repositories.task_repository
Repository for Task operations.
Attributes
Classes
Module Contents
- class server.web.repositories.task_repository.TaskRepository(session: sqlalchemy.orm.Session)[source]
-
- update_task_statuses(workflow_id: str, recursive: bool, workflow_status: str | None, task_ids: List[int] | str, new_status: str) None[source]
Update the status of tasks with business logic.
- Description:
When task_ids=’all’, it updates all tasks in the workflow with
recursive=False. This improves performance. - When recursive=True, it updates the tasks and it’s dependencies all the way up or down the DAG. - When recursive=False, it updates only the tasks in the task_ids list. - When workflow_status is None, it gets the workflow status from the db. - After updating the tasks, it checks the workflow status and updates it.
- _get_recursive_task_ids(task_ids: List[int], new_status: str) List[int][source]
Get task IDs including dependencies based on status direction.
- _update_task_statuses_in_db(task_ids: List[int], new_status: str) None[source]
Update task statuses in the database.
- _get_workflow_run(workflow_id: str) jobmon.server.web.models.workflow_run.WorkflowRun | None[source]
Get the latest workflow run for a workflow.
- _kill_active_task_instances(task_ids: List[int], workflow_run_id: int) None[source]
Kill active task instances for the given tasks.
- _handle_registering_status(workflow_id: str, task_ids: List[int], workflow_status: str | None) None[source]
Handle special logic for REGISTERING status.
- _handle_done_status(workflow_id: str, new_status: str) None[source]
Handle special logic for DONE status.
- _get_tasks_recursive(task_ids: Set[int], direction: jobmon.core.constants.Direction) Set[int][source]
Get all task IDs connected in the specified direction iteratively.
Starting with the given task_ids, the function traverses the dependency graph and returns all tasks found, including the input set. It also verifies that all tasks belong to the same workflow.
- _get_node_dependencies(nodes: set, dag_id: int, direction: jobmon.core.constants.Direction) Set[int][source]
Get all upstream or downstream nodes of a node.
- _get_tasks_from_nodes(workflow_id: int, nodes: List, task_status: List) dict[source]
Get task ids of the given node ids.
- get_task_status(task_ids: int | List[int] | None, status: str | List[str] | None) jobmon.server.web.schemas.task.TaskStatusResponse[source]
Get the status of tasks with filtering.
- get_task_subdag(task_ids: List[int], task_status: List[str]) jobmon.server.web.schemas.task.TaskSubdagResponse[source]
Get the sub DAG of given tasks.
- _get_subdag(node_ids: List[int], dag_id: int) List[int][source]
Get all descendants of given nodes.
- get_task_dependencies(task_id: int) jobmon.server.web.schemas.task.TaskDependenciesResponse[source]
Get task’s downstream and upstream tasks and their status.
- get_task_resource_usage(task_id: int) jobmon.server.web.schemas.task.TaskResourceUsageResponse[source]
Return the resource usage for a given Task ID.
- get_downstream_tasks(task_ids: List[int], dag_id: int, client_version: str | None = None) jobmon.server.web.schemas.task.DownstreamTasksResponse[source]
Get only the direct downstreams of a task.