Source code for server.workflow_reaper.notifiers

"""Places to notify upon certain events (ex. slack to notify of unhealthy workflow)."""

import logging
from typing import Optional

import requests

from jobmon.core.configuration import JobmonConfig

[docs] logger = logging.getLogger(__name__)
[docs] class SlackNotifier(object): """Send notifications via slack.""" def __init__( self, api_url: str = "", token: str = "", channel_default: str = "" ) -> None: """Container for connection with Slack. Args: api_url (str): url to Slack. token (str): token gotten from your app in api.slack.com. channel_default (str): name of channel to which you want to post. """
[docs] config = JobmonConfig()
if not api_url: api_url = config.get("reaper", "slack_api_url") if not token: token = config.get("reaper", "slack_token") if not channel_default: channel_default = config.get("reaper", "slack_channel_default")
[docs] self.api_url = api_url
[docs] self._token = token
[docs] self.channel_default = channel_default
[docs] def send(self, msg: str, channel: Optional[str] = None) -> None: """Send message to Slack using requests.post.""" if channel is None: channel = self.channel_default resp = requests.post( self.api_url, headers={"Authorization": "Bearer {}".format(self._token)}, json={"channel": channel, "text": msg}, ) logger.debug(resp) if resp.status_code != requests.codes.OK: error = "Could not send Slack message. {!r}".format(resp.content) # To raise an exception here causes the docker container stop, and # becomes hard to restart. # Log the error instead. So we can enter the container to fix # issues when necessary. # Log the status code so that it's easier to identify the cause. logger.error(resp.status_code) logger.error(error)