"""The client Task Resources with the resources initiation and binding to Task ID."""
from __future__ import annotations
import hashlib
import json
import numbers
import re
from http import HTTPStatus as StatusCodes
from math import ceil
from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Union
import structlog
from jobmon.client.units import MemUnit, TimeUnit
from jobmon.core.cluster_protocol import ClusterQueue
from jobmon.core.exceptions import InvalidResponse
from jobmon.core.requester import Requester
[docs]
logger = structlog.get_logger(__name__)
[docs]
class TaskResources:
"""An object representing the resources for a specific task."""
def __init__(
self,
requested_resources: Dict[str, Any],
queue: ClusterQueue,
requester: Optional[Requester] = None,
) -> None:
"""Initialize the task resource object."""
for resource, value in requested_resources.items():
if resource == "memory":
requested_resources[resource] = self.convert_memory_to_gib(value)
if resource == "runtime":
requested_resources[resource] = self.convert_runtime_to_s(value)
[docs]
self.requested_resources = requested_resources
if requester is None:
requester = Requester.from_defaults()
[docs]
self.requester = requester
@property
[docs]
def is_bound(self) -> bool:
"""If the TaskResources has been bound to the database."""
return hasattr(self, "_id")
@property
[docs]
def id(self) -> int:
"""If the task resources has been bound to the database."""
if not self.is_bound:
raise AttributeError(
"Cannot access id until TaskResources is bound to database"
)
return self._id
[docs]
def bind(self) -> None:
"""Bind TaskResources to the database."""
# Check if it's already been bound
if self.is_bound:
logger.debug(
"This task resource has already been bound, and assigned"
f"task_resources_id {self.id}"
)
return
app_route = "/task/bind_resources"
msg = {
"queue_id": self.queue.queue_id,
"task_resources_type_id": "O",
"requested_resources": self.requested_resources,
}
return_code, response = self.requester.send_request(
app_route=app_route, message=msg, request_type="post"
)
if return_code != StatusCodes.OK:
raise InvalidResponse(
f"Unexpected status code {return_code} from POST "
f"request through route {app_route}. Expected "
f"code 200. Response content: {response}"
)
self._id = response
[docs]
def validate_resources(
self: TaskResources, strict: bool = False
) -> Tuple[bool, str]:
is_valid, msg = self.queue.validate_resources(
strict, **self.requested_resources
)
return is_valid, msg
[docs]
def coerce_resources(self: TaskResources) -> TaskResources:
"""Coerce TaskResources to fit on queue. If resources change return a new object."""
valid_resources = self.queue.coerce_resources(**self.requested_resources)
coerced_task_resources = self.__class__(valid_resources, self.queue)
if coerced_task_resources != self:
return coerced_task_resources
else:
return self
[docs]
def adjust_resources(
self: TaskResources,
resource_scales: Dict[
str, Union[numbers.Number, Callable, Iterator[numbers.Number]]
],
fallback_queues: Optional[List[ClusterQueue]] = None,
) -> TaskResources:
"""Adjust TaskResources after a resource error, returning a new object if it changed.
Args:
resource_scales: Specifies how much to scale the failed Task's resources by.
Scale factor can be a numeric value, a Callable that will be applied
to the existing resources, or an Iterator. Any Callable should take
a single numeric value as its sole argument. Any Iterator should only yield
numeric values. Any Iterable can be easily converted to an Iterator by using
the iter() built-in (e.g. iter([80, 160, 190])).
fallback_queues: list of queues that users specify. If their jobs exceed the
resources of a given queue, Jobmon will try to run their jobs on the fallback
queues.
"""
if fallback_queues is None:
fallback_queues = []
existing_resources = self.requested_resources.copy()
resource_updates: Dict[str, Any] = {}
# Only cores, memory, and runtime get scaled
for resource, scaler in resource_scales.items():
if resource in existing_resources.keys():
if isinstance(scaler, numbers.Number):
new_resource_value = self.scale_val(
existing_resources[resource], scaler # type: ignore
)
elif callable(scaler):
new_resource_value = scaler(existing_resources[resource])
elif isinstance(scaler, Iterator):
try:
new_resource_value = next(scaler) # type: ignore
except StopIteration:
logger.warning(
"Not enough elements left in Iterator, re-using previous value "
f"for {resource}: {existing_resources[resource]}"
)
new_resource_value = existing_resources[resource]
else:
raise ValueError(
"Keys in the resource_scales dictionary must be either numeric "
f"values, Iterators, or Python Callables; found {scaler}, type "
f"{type(scaler)} instead."
)
if not isinstance(new_resource_value, numbers.Number):
raise ValueError(
"Attemping to update resource to a non-numeric value, "
f"{new_resource_value}. If passing an Iterator, elements must be "
"numeric values. If passing a Callable, return value must be a "
"numeric value."
)
resource_updates[resource] = new_resource_value
scaled_resources = dict(existing_resources, **resource_updates)
# If it fails, try the fallback queues.
queues = [self.queue] + fallback_queues
while queues:
next_queue = queues.pop(0)
is_valid, _ = next_queue.validate_resources(strict=True, **scaled_resources)
if is_valid:
valid_resources = scaled_resources
break
else: # no break
# We've run out of queues so use the final queue and coerce
valid_resources = next_queue.coerce_resources(**scaled_resources)
adjust_resources = self.__class__(valid_resources, next_queue)
if adjust_resources != self:
return adjust_resources
else:
return self
@staticmethod
[docs]
def convert_memory_to_gib(memory_str: str) -> int:
"""Given a memory request with a unit suffix, convert to GiB."""
try:
# User could pass in a raw value for memory, assume to be in GiB.
# This is also the path taken by adjust
return int(memory_str)
except ValueError:
return MemUnit.convert(memory_str, to="G")
@staticmethod
[docs]
def convert_runtime_to_s(time_str: Union[str, float, int]) -> int:
"""Given a runtime request, coerce to seconds for recording in the DB."""
try:
# If a numeric is provided, assumed to be in seconds
return int(time_str)
except ValueError:
time_str = str(time_str).lower()
# convert to seconds if its datetime with a supported format
try:
hours, minutes, seconds = time_str.split(":")
time_seconds = (
TimeUnit.hour_to_sec(int(hours))
+ TimeUnit.min_to_sec(int(minutes))
+ int(seconds)
)
return time_seconds
except ValueError:
try:
raw_value, unit = re.findall(r"[A-Za-z]+|\d+", time_str)
except ValueError:
# Raised if there are not exactly 2 values to unpack from above regex
raise ValueError(
"The provided runtime request must be in a format of numbers "
"followed by one or two characters indicating the unit. "
"E.g. 1h, 60m, 3600s."
)
if "h" in unit:
# Hours provided
return TimeUnit.hour_to_sec(int(raw_value))
elif "m" in unit:
# Minutes provided
return TimeUnit.min_to_sec(int(raw_value))
elif "s" in unit:
return int(raw_value)
else:
raise ValueError("Expected one of h, m, s as the suffixed unit.")
@staticmethod
[docs]
def scale_val(val: int, scaling_factor: float) -> float:
"""Used ceil instead of round or floor, to handle case when resources is 1.
For example, if runtime was 1, resource scales was 0.2. Then the resource would adjust
to 1.2, which would be truncated to 1 again if using floor/round.
"""
return int(ceil(val * (1 + scaling_factor)))
[docs]
def __hash__(self) -> int:
"""Determine the hash of a task resources object."""
# Note: this algorithm assumes all keys and values in the resources dict are
# JSON-serializable. Since that's a requirement for logging in the database,
# this assumption should be safe.
# Uniqueness is determined by queue name and the resources parameter.
if not hasattr(self, "_hash_val"):
hashval = hashlib.sha256()
hashval.update(bytes(str(hash(self.queue.queue_name)).encode("utf-8")))
resources_str = str(
hash(json.dumps(self.requested_resources, sort_keys=True))
)
hashval.update(bytes(resources_str.encode("utf-8")))
self._hash_val = int(hashval.hexdigest(), 16)
return self._hash_val
[docs]
def __eq__(self, other: object) -> bool:
"""Check equality of task resources objects."""
if not isinstance(other, TaskResources):
return False
return hash(self) == hash(other)
[docs]
def __repr__(self) -> str:
"""A representation string for a TaskResources instance."""
repr_string = (
f"TaskResources(queue={self.queue.queue_name}, "
f"requested_resources={self.requested_resources}"
)
try:
repr_string += f", id={self.id})"
except AttributeError:
repr_string += ")"
return repr_string