Skip to content

Commit

Permalink
[serve] Move num_queued_queries logic out of ReplicaSet and into …
Browse files Browse the repository at this point in the history
…`Router` (ray-project#35436)

Centralizes all of the metrics definitions in one place, makes the `ReplicaSet` simpler. I plan to implement a new `ReplicaSet` for streaming and don't want to duplicate the metrics handling.

Also fixes a metrics bug: the `route` is dynamic and we are setting it as a tag for a global gauge. Removed that tag.
  • Loading branch information
edoakes authored May 23, 2023
1 parent 26cae68 commit 89e16dd
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 44 deletions.
77 changes: 37 additions & 40 deletions python/ray/serve/_private/router.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import asyncio
from collections import defaultdict
from dataclasses import dataclass
import itertools
import logging
import pickle
import random
import sys
from typing import Any, Dict, List, Optional
from collections import defaultdict

import ray
from ray.actor import ActorHandle
Expand Down Expand Up @@ -77,13 +77,9 @@ async def resolve_async_tasks(self):
class ReplicaSet:
"""Data structure representing a set of replica actor handles"""

def __init__(
self,
deployment_name,
event_loop: asyncio.AbstractEventLoop,
):
self.deployment_name = deployment_name
def __init__(self, event_loop: asyncio.AbstractEventLoop):
self.in_flight_queries: Dict[RunningReplicaInfo, set] = dict()

# The iterator used for load balancing among replicas. Using itertools
# cycle, we implements a round-robin policy, skipping overloaded
# replicas.
Expand All @@ -103,19 +99,6 @@ def __init__(
else:
self.config_updated_event = asyncio.Event(loop=event_loop)

self.num_queued_queries = 0
self.num_queued_queries_gauge = metrics.Gauge(
"serve_deployment_queued_queries",
description=(
"The current number of queries to this deployment waiting"
" to be assigned to a replica."
),
tag_keys=("deployment", "route", "application"),
)
self.num_queued_queries_gauge.set_default_tags(
{"deployment": self.deployment_name}
)

# A map from multiplexed model id to a list of replicas that have the
# model loaded.
self.multiplexed_replicas_table: Dict[
Expand Down Expand Up @@ -301,14 +284,6 @@ async def assign_replica(self, query: Query) -> ray.ObjectRef:
and only send a query to available replicas (determined by the
max_concurrent_quries value.)
"""
self.num_queued_queries += 1
self.num_queued_queries_gauge.set(
self.num_queued_queries,
tags={
"route": query.metadata.route,
"application": query.metadata.app_name,
},
)
await query.resolve_async_tasks()
assigned_ref = self._try_assign_replica(query)
while assigned_ref is None: # Can't assign a replica right now.
Expand All @@ -331,14 +306,7 @@ async def assign_replica(self, query: Query) -> ray.ObjectRef:
# We are pretty sure a free replica is ready now, let's recurse and
# assign this query a replica.
assigned_ref = self._try_assign_replica(query)
self.num_queued_queries -= 1
self.num_queued_queries_gauge.set(
self.num_queued_queries,
tags={
"route": query.metadata.route,
"application": query.metadata.app_name,
},
)

return assigned_ref


Expand All @@ -355,7 +323,7 @@ def __init__(
controller_handle: The controller handle.
"""
self._event_loop = event_loop
self._replica_set = ReplicaSet(deployment_name, event_loop)
self._replica_set = ReplicaSet(event_loop)

# -- Metrics Registration -- #
self.num_router_requests = metrics.Counter(
Expand All @@ -365,6 +333,17 @@ def __init__(
)
self.num_router_requests.set_default_tags({"deployment": deployment_name})

self.num_queued_queries = 0
self.num_queued_queries_gauge = metrics.Gauge(
"serve_deployment_queued_queries",
description=(
"The current number of queries to this deployment waiting"
" to be assigned to a replica."
),
tag_keys=("deployment", "application"),
)
self.num_queued_queries_gauge.set_default_tags({"deployment": deployment_name})

self.long_poll_client = LongPollClient(
controller_handle,
{
Expand Down Expand Up @@ -394,23 +373,41 @@ def _collect_handle_queue_metrics(self) -> Dict[str, int]:
return {self.deployment_name: self.get_num_queued_queries()}

def get_num_queued_queries(self):
return self._replica_set.num_queued_queries
return self.num_queued_queries

async def assign_request(
self,
request_meta: RequestMetadata,
*request_args,
**request_kwargs,
):
) -> ray.ObjectRef:
"""Assign a query and returns an object ref represent the result"""

self.num_router_requests.inc(
tags={"route": request_meta.route, "application": request_meta.app_name}
)
return await self._replica_set.assign_replica(
self.num_queued_queries += 1
self.num_queued_queries_gauge.set(
self.num_queued_queries,
tags={
"application": request_meta.app_name,
},
)

result: ray.ObjectRef = await self._replica_set.assign_replica(
Query(
args=list(request_args),
kwargs=request_kwargs,
metadata=request_meta,
)
)

self.num_queued_queries -= 1
self.num_queued_queries_gauge.set(
self.num_queued_queries,
tags={
"application": request_meta.app_name,
},
)

return result
5 changes: 1 addition & 4 deletions python/ray/serve/tests/test_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,7 @@ async def num_queries(self):
return self._num_queries

# We will test a scenario with two replicas in the replica set.
rs = ReplicaSet(
"my_deployment",
get_or_create_event_loop(),
)
rs = ReplicaSet(get_or_create_event_loop())
replicas = [
RunningReplicaInfo(
deployment_name="my_deployment",
Expand Down

0 comments on commit 89e16dd

Please sign in to comment.