client.status_commands ====================== .. py:module:: client.status_commands .. autoapi-nested-parse:: Commands to check for workflow and task status (from CLI). Attributes ---------- .. autoapisummary:: client.status_commands.logger Functions --------- .. autoapisummary:: client.status_commands.update_config_value client.status_commands.workflow_status client.status_commands.workflow_tasks client.status_commands.task_template_resources client.status_commands.task_status client.status_commands.concurrency_limit client.status_commands._chunk_ids client.status_commands.update_task_status client.status_commands.validate_username client.status_commands.validate_workflow client.status_commands.get_sub_task_tree client.status_commands.get_task_dependencies client.status_commands.workflow_reset client.status_commands._get_yaml_data client.status_commands._create_yaml client.status_commands.create_resource_yaml client.status_commands.get_filepaths client.status_commands.resume_workflow_from_id Module Contents --------------- .. py:data:: logger .. py:function:: update_config_value(key: str, value: str, config_file: Optional[str] = None) -> str Update a configuration value in the defaults.yaml file using dot notation. :param key: Dot-notated key (e.g., 'http.retries_attempts', 'distributor.poll_interval') :param value: New value to set :param config_file: Optional path to specific config file to update :returns: Success message indicating what was updated :raises ValueError: If the key doesn't exist in the current configuration .. py:function:: workflow_status(workflow_id: Optional[List[int]] = None, user: Optional[List[str]] = None, json: bool = False, limit: Optional[int] = 5, requester: Optional[jobmon.core.requester.Requester] = None) -> pandas.DataFrame Get metadata about workflow progress. :param workflow_id: workflow_id/s to retrieve info for. If not specified will pull all workflows by user :param user: user/s to retrieve info for. If not specified will return for current user. :param limit: return # of records order by wf id desc. Return 5 if not provided; return all if [], [<0]. :param json: Flag to return data as JSON :param requester: object to communicate with the flask services :returns: dataframe of all workflows and their status .. py:function:: workflow_tasks(workflow_id: int, status: Optional[List[str]] = None, json: bool = False, limit: int = 5, requester: Optional[jobmon.core.requester.Requester] = None) -> pandas.DataFrame Get metadata about task state for a given workflow. :param workflow_id: workflow_id/s to retrieve info for :param status: limit task state to one of [PENDING, RUNNING, DONE, FATAL] tasks :param json: Flag to return data as JSON :param limit: return # of records order by wf id desc. Return 5 if not provided :param requester: object to communicate with the flask services :returns: Dataframe of tasks for a given workflow .. py:function:: task_template_resources(task_template_version: int, workflows: Optional[list] = None, node_args: Optional[Dict] = None, ci: Optional[float] = None, requester: Optional[jobmon.core.requester.Requester] = None) -> Optional[Dict] Get aggregate resource usage data for a given TaskTemplateVersion. :param task_template_version: The task template version ID the user wants to find the resource usage of. :param workflows: list of workflows a user wants query by. :param node_args: dictionary of node arguments a user wants to query by. :param ci: confidence interval. Not calculate if None. :param requester: object to communicate with the flask services :returns: Dataframe of TaskTemplate resource usage .. py:function:: task_status(task_ids: List[int], status: Optional[List[str]] = None, json: bool = False, requester: Optional[jobmon.core.requester.Requester] = None) -> Union[dict, pandas.DataFrame] Get metadata about a task and its task instances. :param task_ids: a list of task_ids to retrieve task_instance metadata for. :param status: a list of statuses to check for. :param json: Flag to return data as JSON. :param requester: object to communicate with the flask services :returns: Task status and task_instance metadata .. py:function:: concurrency_limit(workflow_id: int, max_tasks: int, requester: Optional[jobmon.core.requester.Requester] = None) -> str Update a workflow's max_concurrently_running field in the database. Used to dynamically adjust the allowed number of jobs concurrently running. :param workflow_id: ID of the running workflow whose max_running value needs to be reset :type workflow_id: int :param max_tasks: new allowed value of parallel tasks :type max_tasks: int :param requester: object to communicate with the flask services Returns: string displaying success or failure of the update. .. py:function:: _chunk_ids(ids: List[int], chunk_size: int = 100) -> List[List[int]] Chunk the ids into a list of 100 ids list. :param ids: list of ids :param chunk_size: the size of each chunk; default to 100 Returns: a list of list .. py:function:: update_task_status(task_ids: List[int], workflow_id: int, new_status: str, force: bool = False, recursive: bool = False, requester: Optional[jobmon.core.requester.Requester] = None) -> Any Set the specified task IDs to the new status, pending validation. :param task_ids: List of task IDs to reset in the database :param workflow_id: The workflow to which each task belongs. Users can only self-service 1 workflow at a time for the moment. :param new_status: the status to set tasks to :param force: if true, allow all source statuses and all workflow statuses. :param recursive: if true and force, apply recursive update_status downstream or upstream depending on new_status (upstream if new_status == 'D'; downstream if new_status == 'G'). :param requester: object to communicate with the flask services .. py:function:: validate_username(workflow_id: int, username: str, requester: jobmon.core.requester.Requester) -> None Validate that the user is approved to make these changes. .. py:function:: validate_workflow(task_ids: List[int], requester: jobmon.core.requester.Requester) -> jobmon.core.constants.WorkflowStatus Validate workflow. The task_ids provided belong to the expected workflow, and the workflow status is in expected status unless we want to force it through. .. py:function:: get_sub_task_tree(task_ids: list, task_status: Optional[list] = None, requester: Optional[jobmon.core.requester.Requester] = None) -> dict Get the sub_tree from tasks to ensure that they end up in the right states. .. py:function:: get_task_dependencies(task_id: int, requester: Optional[jobmon.core.requester.Requester] = None) -> dict Get the upstream and down stream of a task. .. py:function:: workflow_reset(workflow_id: int, partial_reset: bool = False, requester: Optional[jobmon.core.requester.Requester] = None) -> str Workflow reset. :returns: A string to indicate the workflow_reset result. :param workflow_id: the workflow id to be reset. :param partial_reset: if False, tasks in D state will be reset as well :param requester: http server interface. .. py:function:: _get_yaml_data(wfid: int, tid: int, v_mem: str, v_core: str, v_runtime: str, requester: jobmon.core.requester.Requester) -> Dict .. py:function:: _create_yaml(data: Optional[Dict] = None, clusters: Optional[List] = None) -> str .. py:function:: create_resource_yaml(wfid: int, tid: int, v_mem: str, v_core: str, v_runtime: str, clusters: List, requester: Optional[jobmon.core.requester.Requester] = None) -> str The method to create resource yaml. .. py:function:: get_filepaths(workflow_id: int, array_name: str = '', job_name: str = '', limit: int = 5, requester: Optional[jobmon.core.requester.Requester] = None) -> dict Allows users to get the stdout/stderr paths of their tasks. .. py:function:: resume_workflow_from_id(workflow_id: int, cluster_name: str, reset_if_running: bool = True, log: bool = True, timeout: int = 180, seconds_until_timeout: int = 36000) -> None Given a workflow ID, resume the workflow. Raise an error if the workflow is not completed successfully on resume.