core.cluster_protocol ===================== .. py:module:: core.cluster_protocol Classes ------- .. autoapisummary:: core.cluster_protocol.ClusterQueue core.cluster_protocol.ClusterDistributor core.cluster_protocol.ClusterWorkerNode Module Contents --------------- .. py:class:: ClusterQueue(queue_id: int, queue_name: str, parameters: Dict) Bases: :py:obj:`Protocol` The protocol class for queues on a cluster. Initialization of ClusterQueue. .. py:method:: validate_resources(strict: bool = False, **kwargs: Union[str, int, float]) -> Tuple[bool, str] :abstractmethod: Ensures that requested resources aren't greater than what's available. .. py:method:: coerce_resources(**kwargs: Union[str, int, float]) -> Dict :abstractmethod: Ensures that requested resources aren't greater than what's available. .. py:property:: parameters :type: Dict :abstractmethod: Returns the dictionary of parameters. .. py:property:: queue_name :type: str :abstractmethod: Returns the name of the queue. .. py:property:: queue_id :type: int :abstractmethod: Returns the ID of the queue. .. py:property:: required_resources :type: List :abstractmethod: Returns the list of resources that are required. .. py:class:: ClusterDistributor(cluster_name: str, *args: str, **kwargs: str) Bases: :py:obj:`Protocol` The protocol class for cluster distributors. Initialization of ClusterQueue. .. py:property:: worker_node_entry_point :type: str :abstractmethod: Path to jobmon worker_node_entry_point. .. py:property:: cluster_name :type: str :abstractmethod: Return the name of the cluster type. .. py:method:: start() -> None :abstractmethod: Start the distributor. .. py:method:: stop() -> None :abstractmethod: Stop the distributor. .. py:method:: get_queueing_errors(distributor_ids: List[str]) -> Dict[str, str] :abstractmethod: Get the task instances that have errored out. .. py:method:: get_submitted_or_running(distributor_ids: Optional[List[str]] = None) -> Set[str] :abstractmethod: Check which task instances are active. Returns: a set strings .. py:method:: terminate_task_instances(distributor_ids: List[str]) -> None :abstractmethod: Terminate task instances. If implemented, return a list of (task_instance_id, hostname) tuples for any task_instances that are terminated. .. py:method:: get_remote_exit_info(distributor_id: str) -> Union[jobmon.core.exit_info.RemoteExitInfo, Tuple[str, str]] :abstractmethod: Get the exit info about the task instance once it is done running. :returns: RemoteExitInfo with error_state, error_message, and finalized flag. Legacy plugins may return a (error_state, error_message) tuple, which is treated as finalized=True. .. py:method:: submit_to_batch_distributor(command: str, name: str, requested_resources: Dict[str, Any]) -> str :abstractmethod: Submit the command on the cluster technology and return a distributor_id. The distributor_id can be used to identify the associated TaskInstance, terminate it, monitor for missingness, or collect usage statistics. If an exception is raised by this method the task instance will move to "W" state and the exception will be logged in the database under the task_instance_error_log table. :param command: command to be run :param name: name of task :param requested_resources: resource requests sent to distributor API :returns: A tuple indicating the distributor id, the full output file location, and full error location. .. py:method:: submit_array_to_batch_distributor(command: str, name: str, requested_resources: Dict[str, Any], array_length: int) -> Dict[int, str] :abstractmethod: Submit an array task to the underlying distributor and return a distributor_id. The distributor ID represents the ID of the overall array job, sub-tasks will have their own associated IDs. :param command: the array worker node command to run :param name: name of the array :param requested_resources: resources with which to run the array :param array_length: how many tasks associated with the array :returns: a mapping of array_step_id to distributor_id, output location, and error location. .. py:method:: build_worker_node_command(task_instance_id: Optional[int] = None, array_id: Optional[int] = None, batch_number: Optional[int] = None) -> str Build a command that can be executed by the worker_node. :param task_instance_id: id for the given instance of this task :param array_id: id for the array if using an array strategy :param batch_number: if array strategy is used, the submission counter index to use :returns: (str) unwrappable command .. py:class:: ClusterWorkerNode Bases: :py:obj:`Protocol` Base class defining interface for gathering executor info in the execution_wrapper. While not required, implementing get_usage_stats() will allow collection of CPU/memory utilization stats for each job. Get exit info is used to determine the error type if the task hits a system error of some variety. .. py:property:: distributor_id :type: Optional[str] :abstractmethod: Executor specific id assigned to a task instance. .. py:method:: get_usage_stats() -> Dict :abstractmethod: Return resource usage with server-ready keys. Expected keys (all stringified): maxrss – peak resident set size in bytes cpu – total CPU seconds (user + system) usage_str – JSON blob of the full raw stats .. py:method:: get_exit_info(exit_code: int, error_msg: str) -> Tuple[str, str] :abstractmethod: Error and exit code info from the executor. .. py:method:: initialize_logfile(log_type: str, log_dir: str, name: str) -> str :abstractmethod: Error and exit code info from the executor. .. py:property:: array_step_id :type: Optional[int] :abstractmethod: The step id in each batch. For each array task instance, array_id, array_batch_num, and array_step_id should uniquely identify a subtask_id. It depends on the plug in whether you can generate the subtask_id using array_step_id.