"""Start up distributing process."""
from typing import Optional
from jobmon.core.cluster import Cluster
from jobmon.core.requester import Requester
from jobmon.worker_node.worker_node_task_instance import WorkerNodeTaskInstance
[docs]
class WorkerNodeFactory:
def __init__(
self, cluster_name: str, requester: Optional[Requester] = None
) -> None:
"""Initialization of the WorkerNode Factory."""
[docs]
self._cluster_name = cluster_name
cluster = Cluster.get_cluster(cluster_name)
[docs]
self._worker_node_interface = cluster.get_worker_node()
[docs]
def get_job_task_instance(
self,
task_instance_id: int,
) -> WorkerNodeTaskInstance:
"""Set up and return WorkerNodeTaskInstance object."""
worker_node_task_instance = WorkerNodeTaskInstance(
cluster_interface=self._worker_node_interface,
task_instance_id=task_instance_id,
)
return worker_node_task_instance
[docs]
def get_array_task_instance(
self, array_id: int, batch_number: int
) -> WorkerNodeTaskInstance:
"""Set up and return WorkerNodeTaskInstance object."""
requester = Requester.from_defaults()
# Always assumed to be a value in the range [1, len(array)]
array_step_id = self._worker_node_interface.array_step_id
# Fetch from the database
app_route = (
f"/get_array_task_instance_id/{array_id}/{batch_number}/{array_step_id}"
)
_, resp = requester.send_request(
app_route=app_route, message={}, request_type="get"
)
task_instance_id = resp["task_instance_id"]
worker_node_task_instance = WorkerNodeTaskInstance(
cluster_interface=self._worker_node_interface,
task_instance_id=task_instance_id,
)
return worker_node_task_instance