server.web.repositories.workflow_repository =========================================== .. py:module:: server.web.repositories.workflow_repository Attributes ---------- .. autoapisummary:: server.web.repositories.workflow_repository.logger server.web.repositories.workflow_repository._cli_label_mapping server.web.repositories.workflow_repository._reversed_cli_label_mapping server.web.repositories.workflow_repository._cli_order Classes ------- .. autoapisummary:: server.web.repositories.workflow_repository.WorkflowRepository Module Contents --------------- .. py:data:: logger .. py:data:: _cli_label_mapping .. py:data:: _reversed_cli_label_mapping .. py:data:: _cli_order :value: ['PENDING', 'SCHEDULED', 'RUNNING', 'DONE', 'FATAL'] .. py:class:: WorkflowRepository(session: sqlalchemy.orm.Session) .. py:attribute:: session .. py:method:: get_workflow_validation_status(task_ids: List[int]) -> jobmon.server.web.schemas.workflow.WorkflowValidationResponse Check if workflow is valid. .. py:method:: get_workflow_tasks(workflow_id: int, limit: int, status: Optional[List[str]] = None) -> jobmon.server.web.schemas.workflow.WorkflowTasksResponse Get the tasks for a given workflow. .. py:method:: get_workflow_user_validation(workflow_id: int, username: str) -> jobmon.server.web.schemas.workflow.WorkflowUserValidationResponse Return all usernames associated with a given workflow_id's workflow runs. .. py:method:: get_workflow_run_for_reset(workflow_id: int, username: str) -> jobmon.server.web.schemas.workflow.WorkflowRunForResetResponse Get last workflow_run_id for workflow reset validation. .. py:method:: reset_workflow(workflow_id: int, partial_reset: bool = False) -> None Update the workflow's status, all its tasks' statuses to 'G'. .. py:method:: get_workflow_status(workflow_id: Optional[Union[int, str, List[Union[int, str]]]] = None, limit: Optional[int] = None, user: Optional[List[str]] = None) -> jobmon.server.web.schemas.workflow.WorkflowStatusResponse Get the status of the workflow. .. py:method:: get_workflow_status_viz(workflow_ids: List[int]) -> Dict[int, Any] Get the status of the workflows for GUI. .. py:method:: _add_multi_value_filter(value: Optional[str], column: str, param_name: str, where_clauses: list, substitution_dict: dict) -> None Add a filter that supports comma-separated values with OR logic. .. py:method:: get_workflow_overview(user: Optional[str] = None, tool: Optional[str] = None, wf_name: Optional[str] = None, wf_args: Optional[str] = None, wf_attribute_value: Optional[str] = None, wf_attribute_key: Optional[str] = None, wf_id: Optional[str] = None, date_submitted: Optional[str] = None, date_submitted_end: Optional[str] = None, status: Optional[str] = None) -> jobmon.server.web.schemas.workflow.WorkflowOverviewResponse Fetch associated workflows and workflow runs by username. .. py:method:: get_task_details_by_workflow_id(workflow_id: int, tt_name: str) -> jobmon.server.web.schemas.workflow.TaskTableResponse Fetch Task details associated with Workflow ID and TaskTemplate name. .. py:method:: get_workflow_details_by_id(workflow_id: int) -> List[jobmon.server.web.schemas.workflow.WorkflowDetailsItem] Fetch name, args, dates, tool for a Workflow provided WF ID.