server.web.repositories.task_repository ======================================= .. py:module:: server.web.repositories.task_repository .. autoapi-nested-parse:: Repository for Task operations. Attributes ---------- .. autoapisummary:: server.web.repositories.task_repository.logger server.web.repositories.task_repository._task_instance_label_mapping server.web.repositories.task_repository._reversed_task_instance_label_mapping Classes ------- .. autoapisummary:: server.web.repositories.task_repository.TaskRepository Module Contents --------------- .. py:data:: logger .. py:data:: _task_instance_label_mapping .. py:data:: _reversed_task_instance_label_mapping .. py:class:: TaskRepository(session: sqlalchemy.orm.Session) .. py:attribute:: session .. py:method:: update_task_statuses(workflow_id: str, recursive: bool, workflow_status: Optional[str], task_ids: Union[List[int], str], new_status: str) -> None Update the status of tasks with business logic. Description: - When task_ids='all', it updates all tasks in the workflow with recursive=False. This improves performance. - When recursive=True, it updates the tasks and it's dependencies all the way up or down the DAG. - When recursive=False, it updates only the tasks in the task_ids list. - When workflow_status is None, it gets the workflow status from the db. - After updating the tasks, it checks the workflow status and updates it. .. py:method:: _get_all_task_ids(workflow_id: str) -> List[int] Get all task IDs for a workflow. .. py:method:: _get_recursive_task_ids(task_ids: List[int], new_status: str) -> List[int] Get task IDs including dependencies based on status direction. .. py:method:: _update_task_statuses_in_db(task_ids: List[int], new_status: str) -> None Update task statuses in the database. .. py:method:: _get_workflow_run(workflow_id: str) -> jobmon.server.web.models.workflow_run.WorkflowRun | None Get the latest workflow run for a workflow. .. py:method:: _kill_active_task_instances(task_ids: List[int], workflow_run_id: int) -> None Kill active task instances for the given tasks. .. py:method:: _handle_registering_status(workflow_id: str, task_ids: List[int], workflow_status: Optional[str]) -> None Handle special logic for REGISTERING status. .. py:method:: _handle_done_status(workflow_id: str, new_status: str) -> None Handle special logic for DONE status. .. py:method:: _get_tasks_recursive(task_ids: Set[int], direction: jobmon.core.constants.Direction) -> Set[int] Get all task IDs connected in the specified direction iteratively. Starting with the given task_ids, the function traverses the dependency graph and returns all tasks found, including the input set. It also verifies that all tasks belong to the same workflow. :param task_ids: Initial set of task IDs. :type task_ids: Set[int] :param direction: Either Direction.UP or Direction.DOWN. :type direction: Direction :returns: The complete set of task IDs connected in the specified direction. :rtype: Set[int] .. py:method:: _get_node_dependencies(nodes: set, dag_id: int, direction: jobmon.core.constants.Direction) -> Set[int] Get all upstream or downstream nodes of a node. :param nodes: set of nodes :type nodes: set :param dag_id: ID of DAG :type dag_id: int :param direction: either up or down :type direction: Direction .. py:method:: _get_tasks_from_nodes(workflow_id: int, nodes: List, task_status: List) -> dict Get task ids of the given node ids. :param workflow_id: ID of the workflow :type workflow_id: int :param nodes: list of nodes :type nodes: list :param task_status: list of task statuses :type task_status: list .. py:method:: get_task_status(task_ids: Optional[Union[int, List[int]]], status: Optional[Union[str, List[str]]]) -> jobmon.server.web.schemas.task.TaskStatusResponse Get the status of tasks with filtering. .. py:method:: get_task_subdag(task_ids: List[int], task_status: List[str]) -> jobmon.server.web.schemas.task.TaskSubdagResponse Get the sub DAG of given tasks. .. py:method:: _get_subdag(node_ids: List[int], dag_id: int) -> List[int] Get all descendants of given nodes. :param node_ids: list of node IDs :type node_ids: list :param dag_id: ID of DAG :type dag_id: int .. py:method:: get_task_dependencies(task_id: int) -> jobmon.server.web.schemas.task.TaskDependenciesResponse Get task's downstream and upstream tasks and their status. .. py:method:: _get_dag_and_wf_id(task_id: int) -> tuple Get DAG ID, workflow ID, and node ID for a task. .. py:method:: get_task_resource_usage(task_id: int) -> jobmon.server.web.schemas.task.TaskResourceUsageResponse Return the resource usage for a given Task ID. .. py:method:: get_downstream_tasks(task_ids: List[int], dag_id: int, client_version: Optional[str] = None) -> jobmon.server.web.schemas.task.DownstreamTasksResponse Get only the direct downstreams of a task. .. py:method:: get_task_instance_details(task_id: int) -> jobmon.server.web.schemas.task.TaskInstanceDetailsResponse Get information about TaskInstances associated with specific Task ID. .. py:method:: get_task_details_viz(task_id: int) -> jobmon.server.web.schemas.task.TaskDetailsResponse Get status of Task from Task ID.