Source code for ray.serve.api

import inspect
from functools import wraps
from tempfile import mkstemp

from multiprocessing import cpu_count

import ray
from ray.serve.constants import (DEFAULT_HTTP_HOST, DEFAULT_HTTP_PORT,
                                 SERVE_MASTER_NAME)
from ray.serve.master import ServeMaster
from ray.serve.handle import RayServeHandle
from ray.serve.kv_store_service import SQLiteKVStore
from ray.serve.task_runner import RayServeMixin, TaskRunnerActor
from ray.serve.utils import block_until_http_ready
from ray.serve.exceptions import RayServeException, batch_annotation_not_found
from ray.serve.backend_config import BackendConfig
from ray.serve.policy import RoutePolicy
from ray.serve.queues import Query
from ray.serve.request_params import RequestMetadata

master_actor = None


def _get_master_actor():
    """Used for internal purpose because using just import serve.global_state
    will always reference the original None object.
    """
    return master_actor


def _ensure_connected(f):
    @wraps(f)
    def check(*args, **kwargs):
        if _get_master_actor() is None:
            raise RayServeException("Please run serve.init to initialize or "
                                    "connect to existing ray serve cluster.")
        return f(*args, **kwargs)

    return check


[docs]def accept_batch(f): """Annotation to mark a serving function that batch is accepted. This annotation need to be used to mark a function expect all arguments to be passed into a list. Example: >>> @serve.accept_batch def serving_func(flask_request): assert isinstance(flask_request, list) ... >>> class ServingActor: @serve.accept_batch def __call__(self, *, python_arg=None): assert isinstance(python_arg, list) """ f.serve_accept_batch = True return f
[docs]def init( kv_store_connector=None, kv_store_path=None, blocking=False, start_server=True, http_host=DEFAULT_HTTP_HOST, http_port=DEFAULT_HTTP_PORT, ray_init_kwargs={ "object_store_memory": int(1e8), "num_cpus": max(cpu_count(), 8) }, gc_window_seconds=3600, queueing_policy=RoutePolicy.Random, policy_kwargs={}, ): """Initialize a serve cluster. If serve cluster has already initialized, this function will just return. Calling `ray.init` before `serve.init` is optional. When there is not a ray cluster initialized, serve will call `ray.init` with `object_store_memory` requirement. Args: kv_store_connector (callable): Function of (namespace) => TableObject. We will use a SQLite connector that stores to /tmp by default. kv_store_path (str, path): Path to the SQLite table. blocking (bool): If true, the function will wait for the HTTP server to be healthy, and other components to be ready before returns. start_server (bool): If true, `serve.init` starts http server. (Default: True) http_host (str): Host for HTTP server. Default to "0.0.0.0". http_port (int): Port for HTTP server. Default to 8000. ray_init_kwargs (dict): Argument passed to ray.init, if there is no ray connection. Default to {"object_store_memory": int(1e8)} for performance stability reason gc_window_seconds(int): How long will we keep the metric data in memory. Data older than the gc_window will be deleted. The default is 3600 seconds, which is 1 hour. queueing_policy(RoutePolicy): Define the queueing policy for selecting the backend for a service. (Default: RoutePolicy.Random) policy_kwargs: Arguments required to instantiate a queueing policy """ global master_actor if master_actor is not None: return # Initialize ray if needed. if not ray.is_initialized(): ray.init(**ray_init_kwargs) # Register serialization context once ray.register_custom_serializer(Query, Query.ray_serialize, Query.ray_deserialize) # Try to get serve master actor if it exists try: master_actor = ray.util.get_actor(SERVE_MASTER_NAME) return except ValueError: pass # Register serialization context once ray.register_custom_serializer(Query, Query.ray_serialize, Query.ray_deserialize) ray.register_custom_serializer(RequestMetadata, RequestMetadata.ray_serialize, RequestMetadata.ray_deserialize) if kv_store_path is None: _, kv_store_path = mkstemp() # Serve has not been initialized, perform init sequence # TODO move the db to session_dir. # ray.worker._global_node.address_info["session_dir"] def kv_store_connector(namespace): return SQLiteKVStore(namespace, db_path=kv_store_path) master_actor = ServeMaster.options( detached=True, name=SERVE_MASTER_NAME).remote(kv_store_connector) ray.get( master_actor.start_router.remote(queueing_policy.value, policy_kwargs)) ray.get(master_actor.start_metric_monitor.remote(gc_window_seconds)) if start_server: ray.get(master_actor.start_http_proxy.remote(http_host, http_port)) if start_server and blocking: block_until_http_ready("http://{}:{}/-/routes".format( http_host, http_port))
[docs]@_ensure_connected def create_endpoint(endpoint_name, route=None, methods=["GET"]): """Create a service endpoint given route_expression. Args: endpoint_name (str): A name to associate to the endpoint. It will be used as key to set traffic policy. route (str): A string begin with "/". HTTP server will use the string to match the path. blocking (bool): If true, the function will wait for service to be registered before returning """ ray.get( master_actor.create_endpoint.remote(route, endpoint_name, [m.upper() for m in methods]))
[docs]@_ensure_connected def set_backend_config(backend_tag, backend_config): """Set a backend configuration for a backend tag Args: backend_tag(str): A registered backend. backend_config(BackendConfig) : Desired backend configuration. """ ray.get( master_actor.set_backend_config.remote(backend_tag, backend_config))
[docs]@_ensure_connected def get_backend_config(backend_tag): """Get the backend configuration for a backend tag. Args: backend_tag(str): A registered backend. """ return ray.get(master_actor.get_backend_config.remote(backend_tag))
def _backend_accept_batch(func_or_class): if inspect.isfunction(func_or_class): return hasattr(func_or_class, "serve_accept_batch") elif inspect.isclass(func_or_class): return hasattr(func_or_class.__call__, "serve_accept_batch")
[docs]@_ensure_connected def create_backend(func_or_class, backend_tag, *actor_init_args, backend_config=None): """Create a backend using func_or_class and assign backend_tag. Args: func_or_class (callable, class): a function or a class implements __call__ protocol. backend_tag (str): a unique tag assign to this backend. It will be used to associate services in traffic policy. backend_config (BackendConfig): An object defining backend properties for starting a backend. *actor_init_args (optional): the argument to pass to the class initialization method. """ # Configure backend_config if backend_config is None: backend_config = BackendConfig() assert isinstance(backend_config, BackendConfig), ("backend_config must be" " of instance BackendConfig") # Make sure the batch size is correct should_accept_batch = backend_config.max_batch_size is not None if should_accept_batch and not _backend_accept_batch(func_or_class): raise batch_annotation_not_found if _backend_accept_batch(func_or_class): backend_config.has_accept_batch_annotation = True arg_list = [] if inspect.isfunction(func_or_class): # arg list for a fn is function itself arg_list = [func_or_class] # ignore lint on lambda expression creator = lambda kwrgs: TaskRunnerActor._remote(**kwrgs) # noqa: E731 elif inspect.isclass(func_or_class): # Python inheritance order is right-to-left. We put RayServeMixin # on the left to make sure its methods are not overriden. @ray.remote class CustomActor(RayServeMixin, func_or_class): @wraps(func_or_class.__init__) def __init__(self, *args, **kwargs): # Initialize serve so it can be used in backends. init() super().__init__(*args, **kwargs) arg_list = actor_init_args # ignore lint on lambda expression creator = lambda kwargs: CustomActor._remote(**kwargs) # noqa: E731 else: raise TypeError( "Backend must be a function or class, it is {}.".format( type(func_or_class))) ray.get( master_actor.create_backend.remote(backend_tag, creator, backend_config, arg_list))
[docs]@_ensure_connected def split(endpoint_name, traffic_policy_dictionary): """Associate a service endpoint with traffic policy. Example: >>> serve.split("service-name", { "backend:v1": 0.5, "backend:v2": 0.5 }) Args: endpoint_name (str): A registered service endpoint. traffic_policy_dictionary (dict): a dictionary maps backend names to their traffic weights. The weights must sum to 1. """ ray.get( master_actor.split_traffic.remote(endpoint_name, traffic_policy_dictionary))
[docs]@_ensure_connected def get_handle(endpoint_name, relative_slo_ms=None, absolute_slo_ms=None, missing_ok=False): """Retrieve RayServeHandle for service endpoint to invoke it from Python. Args: endpoint_name (str): A registered service endpoint. relative_slo_ms(float): Specify relative deadline in milliseconds for queries fired using this handle. (Default: None) absolute_slo_ms(float): Specify absolute deadline in milliseconds for queries fired using this handle. (Default: None) missing_ok (bool): If true, skip the check for the endpoint existence. It can be useful when the endpoint has not been registered. Returns: RayServeHandle """ if not missing_ok: assert endpoint_name in ray.get( master_actor.get_all_endpoints.remote()) return RayServeHandle( ray.get(master_actor.get_router.remote())[0], endpoint_name, relative_slo_ms, absolute_slo_ms, )
[docs]@_ensure_connected def stat(percentiles=[50, 90, 95], agg_windows_seconds=[10, 60, 300, 600, 3600]): """Retrieve metric statistics about ray serve system. Args: percentiles(List[int]): The percentiles for aggregation operations. Default is 50th, 90th, 95th percentile. agg_windows_seconds(List[int]): The aggregation windows in seconds. The longest aggregation window must be shorter or equal to the gc_window_seconds. """ [monitor] = ray.get(master_actor.get_metric_monitor.remote()) return ray.get(monitor.collect.remote(percentiles, agg_windows_seconds))
[docs]class route: """Convient method to create a backend and link to service. When called, the following will happen: - An endpoint is created with the same of the function - A backend is created and instantiate the function - The endpoint and backend are linked together - The handle is returned .. code-block:: python @serve.route("/path") def my_handler(flask_request): ... """ def __init__(self, url_route): self.route = url_route def __call__(self, func_or_class): name = func_or_class.__name__ backend_tag = "{}:v0".format(name) create_backend(func_or_class, backend_tag) create_endpoint(name, self.route) link(name, backend_tag) return get_handle(name)