worker_node.worker_node_task_instance ===================================== .. py:module:: worker_node.worker_node_task_instance .. autoapi-nested-parse:: The Task Instance Object once it has been submitted to run on a worker node. Attributes ---------- .. autoapisummary:: worker_node.worker_node_task_instance.logger Classes ------- .. autoapisummary:: worker_node.worker_node_task_instance.WorkerNodeTaskInstance Module Contents --------------- .. py:data:: logger .. py:class:: WorkerNodeTaskInstance(cluster_interface: jobmon.core.cluster_protocol.ClusterWorkerNode, task_instance_id: int, task_instance_heartbeat_interval: Optional[int] = None, heartbeat_report_by_buffer: Optional[float] = None, command_interrupt_timeout: Optional[int] = None, requester: Optional[jobmon.core.requester.Requester] = None) The Task Instance object once it has been submitted to run on a worker node. A mechanism whereby a running task_instance can communicate back to the JSM. Logs its status, errors, usage details, etc. :param cluster_interface: interface that gathers executor info in the execution_wrapper. :param task_instance_id: the id of the TaskInstance that is reporting back. :param task_instance_heartbeat_interval: how ofter to log a report by with the db :param heartbeat_report_by_buffer: multiplier for report by date in case we miss a few. :param command_interrupt_timeout: the amount of time to wait for the child process to terminate. :param requester: communicate with the flask services. .. py:attribute:: requester :value: None .. py:attribute:: cluster_interface .. py:property:: task_instance_id :type: int Returns a task instance ID if it's been bound. .. py:property:: distributor_id :type: Optional[str] Executor id given from the executor it is being run on. .. py:property:: nodename :type: Optional[str] Node it is being run on. .. py:property:: process_group_id :type: Optional[int] Process group to track parent and child processes. .. py:property:: status :type: str Returns the last known status of the task instance. .. py:property:: stdout :type: str .. py:property:: stderr :type: str .. py:property:: command :type: str Returns the command this task instance will run. .. py:property:: command_add_env :type: Dict[str, str] Returns the command this task instance will run. .. py:property:: command_returncode :type: int Returns the exit code of the command that was run. .. py:property:: command_stdout :type: str Returns the last 10k characters of the commands stdout. .. py:property:: command_stderr :type: str Returns the last 10k characters of the commands stderr. .. py:method:: log_done() -> None Tell the JobStateManager that this task_instance is done. .. py:method:: log_error(error_state: str, description: str) -> None Tell the JobStateManager that this task_instance has errored. .. py:method:: log_running() -> None Tell the JobStateManager that this task_instance is running. Update the report_by_date to be further in the future in case it gets reconciled immediately. .. py:method:: log_report_by(session: aiohttp.ClientSession) -> None :async: Log the heartbeat to show that the task instance is still alive. Uses the async requester with an aiohttp ClientSession so heartbeats can run concurrently with other asyncio work (subprocess pipe drain, lazy log-directory creation in ``_communicate``). Running on the event loop instead of the default thread-pool avoids the executor pressure that ``asyncio.to_thread`` would add when multiple slow I/O paths are in flight at the same time. .. py:method:: run() -> None This script executes on the target node and wraps the target application. Could be in any language, anything that can execute on linux. Similar to a stub or a container set ENV variables in case tasks need to access them. .. py:method:: set_command_output(returncode: int, stdout: str, stderr: str) -> None