Source code for server.cli

"""Set up server specific CLI config."""

import argparse
import logging
import sys
from typing import Optional

from jobmon.core.cli import CLI

[docs] logger = logging.getLogger(__name__)
[docs] class ServerCLI(CLI): """CLI for Server with automatic logging support.""" def __init__(self) -> None: """Initialize ServerCLI for administrative operations.""" # ServerCLI uses component logging for admin commands (workflow-reaper, init-db, etc.) super().__init__(component_name="server")
[docs] self.parser = argparse.ArgumentParser("jobmon server")
[docs] self._subparsers = self.parser.add_subparsers(dest="sub_command")
# now add specific sub parsers self._add_workflow_reaper_subparser() self._add_init_db_subparser() self._add_terminate_db_subparser() self._add_upgrade_db_subparser()
[docs] def workflow_reaper(self, args: argparse.Namespace) -> None: """Workflow reaper entrypoint logic.""" from jobmon.server.workflow_reaper.api import start_workflow_reaper logging.basicConfig(stream=sys.stdout, level=logging.INFO) if args.command == "start": start_workflow_reaper( service_url=args.service_url, slack_api_url=args.slack_api_url, slack_token=args.slack_token, slack_channel_default=args.slack_channel_default, poll_interval_minutes=args.poll_interval_minutes, ) else: raise ValueError( "Invalid command choice. Options are (start), got " f"({args.command})" )
[docs] def init_db(self, args: argparse.Namespace) -> None: """Entrypoint to initialize new Jobmon database.""" from jobmon.core.configuration import JobmonConfig from jobmon.server.web.config import get_jobmon_config from jobmon.server.web.db import init_db sqlalchemy_database_uri = args.sqlalchemy_database_uri if not sqlalchemy_database_uri: config = JobmonConfig() sqlalchemy_database_uri = config.get("db", "sqlalchemy_database_uri") else: # Override the config singleton with CLI-provided database URI config = JobmonConfig( dict_config={"db": {"sqlalchemy_database_uri": sqlalchemy_database_uri}} ) # Set the singleton to use our overridden config get_jobmon_config(config) init_db(sqlalchemy_database_uri)
[docs] def terminate_db(self, args: argparse.Namespace) -> None: """Entrypoint to terminate a Jobmon database.""" from jobmon.core.configuration import JobmonConfig from jobmon.server.web.config import get_jobmon_config from jobmon.server.web.db import terminate_db sqlalchemy_database_uri = args.sqlalchemy_database_uri if not sqlalchemy_database_uri: config = JobmonConfig() sqlalchemy_database_uri = config.get("db", "sqlalchemy_database_uri") else: # Override the config singleton with CLI-provided database URI config = JobmonConfig( dict_config={"db": {"sqlalchemy_database_uri": sqlalchemy_database_uri}} ) # Set the singleton to use our overridden config get_jobmon_config(config) terminate_db(sqlalchemy_database_uri)
[docs] def upgrade_db(self, args: argparse.Namespace) -> None: """Entrypoint to upgrade a Jobmon database.""" from jobmon.core.configuration import JobmonConfig from jobmon.server.web.config import get_jobmon_config from jobmon.server.web.db import apply_migrations sqlalchemy_database_uri = args.sqlalchemy_database_uri if not sqlalchemy_database_uri: config = JobmonConfig() sqlalchemy_database_uri = config.get("db", "sqlalchemy_database_uri") else: # Override the config singleton with CLI-provided database URI config = JobmonConfig( dict_config={"db": {"sqlalchemy_database_uri": sqlalchemy_database_uri}} ) # Set the singleton to use our overridden config get_jobmon_config(config) apply_migrations(sqlalchemy_database_uri, args.revision)
[docs] def _add_workflow_reaper_subparser(self) -> None: reaper_parser = self._subparsers.add_parser("workflow_reaper") reaper_parser.set_defaults(func=self.workflow_reaper) reaper_parser.add_argument( "command", type=str, choices=["start"], help=( "The workflow_reaper sub-command to run: (start). Start command runs " "workflow_reaper.monitor_forever() method." ), ) reaper_parser.add_argument( "--service_url", type=str, help="Jobmon web service URL", required=False, default="", ) reaper_parser.add_argument( "--slack_api_url", type=str, help="URL to post notifications", required=False, default="", ) reaper_parser.add_argument( "--slack_token", type=str, help="Authentication token for posting updates to slack", required=False, default="", ) reaper_parser.add_argument( "--slack_channel_default", type=str, help="Default channel to post updates to", required=False, default="", ) reaper_parser.add_argument( "--poll_interval_minutes", type=int, help="Duration in minutes to sleep between reaper loops", required=False, default=None, )
[docs] def _add_init_db_subparser(self) -> None: init_db_parser = self._subparsers.add_parser("init_db") init_db_parser.set_defaults(func=self.init_db) init_db_parser.add_argument( "--sqlalchemy_database_uri", type=str, help="The connection string for sqlalchemy to use when running the server.", required=False, default="", )
[docs] def _add_terminate_db_subparser(self) -> None: terminate_db_parser = self._subparsers.add_parser("terminate_db") terminate_db_parser.set_defaults(func=self.terminate_db) terminate_db_parser.add_argument( "--sqlalchemy_database_uri", type=str, help="The connection string for sqlalchemy to use when running the server.", required=False, default="", )
[docs] def _add_upgrade_db_subparser(self) -> None: upgrade_db_parser = self._subparsers.add_parser("upgrade_db") upgrade_db_parser.set_defaults(func=self.upgrade_db) upgrade_db_parser.add_argument( "--sqlalchemy_database_uri", type=str, help="The connection string for sqlalchemy to use when running the server.", required=False, default="", ) upgrade_db_parser.add_argument( "--revision", type=str, help="The revision to upgrade the database to.", required=False, default="head", )
[docs] def main(argstr: Optional[str] = None) -> None: """Create CLI.""" cli = ServerCLI() cli.main(argstr)