Source code for distributor.cli

"""Command line interface for Execution."""

import argparse
from typing import Optional

import structlog

from jobmon.core.cli import CLI
from jobmon.core.cluster import Cluster
from jobmon.core.logging import set_jobmon_context
from jobmon.distributor.api import DistributorService

[docs] logger = structlog.get_logger(__name__)
[docs] class DistributorCLI(CLI): """Command line interface for Distributor with automatic logging.""" def __init__(self) -> None: """Initialization of distributor CLI with automatic component logging.""" # Enable automatic logging for distributor component super().__init__(component_name="distributor")
[docs] self.parser = argparse.ArgumentParser()
[docs] self._subparsers = self.parser.add_subparsers( dest="sub_command", parser_class=argparse.ArgumentParser )
self._add_distributor_parser() @staticmethod
[docs] def run_distributor(args: argparse.Namespace) -> None: """Start the distributor service for a workflow run.""" # Bind global context for this distributor instance set_jobmon_context(workflow_run_id=args.workflow_run_id) logger.info("Distributor starting") cluster = Cluster.get_cluster(args.cluster_name) cluster_interface = cluster.get_distributor() distributor_service = DistributorService(cluster_interface) distributor_service.set_workflow_run(args.workflow_run_id) distributor_service.run()
[docs] def _add_distributor_parser(self) -> None: distributor_parser = self._subparsers.add_parser("start") distributor_parser.set_defaults(func=self.run_distributor) distributor_parser.add_argument( "--cluster_name", type=str, help="cluster_name to distribute jobs onto.", required=True, ) distributor_parser.add_argument( "--workflow_run_id", type=int, help="workflow_run_id to distribute jobs for.", required=True, )
[docs] def main(argstr: Optional[str] = None) -> None: """Entrypoint to create Executor CLI.""" cli = DistributorCLI() cli.main(argstr)
if __name__ == "__main__": main()