server.web.repositories.workflow_repository

Attributes

logger

Classes

WorkflowRepository

Initialize the workflow repository.

Module Contents

server.web.repositories.workflow_repository.logger
class server.web.repositories.workflow_repository.WorkflowRepository(session: sqlalchemy.orm.Session)

Initialize the workflow repository.

session
get_workflow_validation_status(task_ids: List[int]) jobmon.server.web.schemas.workflow.WorkflowValidationResponse

Check if workflow is valid.

get_workflow_tasks(workflow_id: int, limit: int, status: List[str] | None = None) jobmon.server.web.schemas.workflow.WorkflowTasksResponse

Get the tasks for a given workflow.

get_workflow_user_validation(workflow_id: int, username: str) jobmon.server.web.schemas.workflow.WorkflowUserValidationResponse

Return all usernames associated with a given workflow_id’s workflow runs.

get_workflow_run_for_reset(workflow_id: int, username: str) jobmon.server.web.schemas.workflow.WorkflowRunForResetResponse

Get last workflow_run_id for workflow reset validation.

reset_workflow(workflow_id: int, partial_reset: bool = False) None

Update the workflow’s status, all its tasks’ statuses to ‘G’.

get_workflow_status(workflow_id: int | str | List[int | str] | None = None, limit: int | None = None, user: List[str] | None = None) jobmon.server.web.schemas.workflow.WorkflowStatusResponse

Get the status of the workflow.

get_workflow_status_viz(workflow_ids: List[int]) Dict[int, Any]

Get the status of the workflows for GUI.

get_workflow_overview(user: str | None = None, tool: str | None = None, wf_name: str | None = None, wf_args: str | None = None, wf_attribute_value: str | None = None, wf_attribute_key: str | None = None, wf_id: str | None = None, date_submitted: str | None = None, date_submitted_end: str | None = None, status: str | None = None, user_exclude: str | None = None, tool_exclude: str | None = None, status_exclude: str | None = None) jobmon.server.web.schemas.workflow.WorkflowOverviewResponse

Fetch associated workflows and workflow runs by username.

get_task_details_by_workflow_id(workflow_id: int, tt_name: str) jobmon.server.web.schemas.workflow.TaskTableResponse

Fetch Task details associated with Workflow ID and TaskTemplate name.

get_workflow_details_by_id(workflow_id: int) List[jobmon.server.web.schemas.workflow.WorkflowDetailsItem]

Fetch name, args, dates, tool for a Workflow provided WF ID.