Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Serve] Json logging format #35118

Merged
merged 8 commits into from
May 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions python/ray/serve/_private/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,3 +373,7 @@ class HTTPProxyStatus(str, Enum):
STARTING = "STARTING"
HEALTHY = "HEALTHY"
UNHEALTHY = "UNHEALTHY"


class ServeComponentType(str, Enum):
DEPLOYMENT = "deployment"
25 changes: 25 additions & 0 deletions python/ray/serve/_private/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,28 @@ class ServeHandleType(str, Enum):
"Please see the documentation for ServeDeploySchema for more details on multi-app "
"config files."
)

# Jsonify the log messages
RAY_SERVE_ENABLE_JSON_LOGGING = os.environ.get("RAY_SERVE_ENABLE_JSON_LOGGING") == "1"
# Logging format attributes
SERVE_LOG_REQUEST_ID = "request_id"
SERVE_LOG_ROUTE = "route"
SERVE_LOG_APPLICATION = "application"
SERVE_LOG_DEPLOYMENT = "deployment"
SERVE_LOG_REPLICA = "replica"
SERVE_LOG_COMPONENT = "component_name"
SERVE_LOG_COMPONENT_ID = "component_id"
SERVE_LOG_MESSAGE = "message"
# This is a reserved for python logging module attribute, it should not be changed.
SERVE_LOG_LEVEL_NAME = "levelname"
SERVE_LOG_TIME = "asctime"

# Logging format with record key to format string dict
SERVE_LOG_RECORD_FORMAT = {
SERVE_LOG_REQUEST_ID: "%(request_id)s",
SERVE_LOG_ROUTE: "%(route)s",
SERVE_LOG_APPLICATION: "%(application)s",
SERVE_LOG_MESSAGE: "%(filename)s:%(lineno)d - %(message)s",
SERVE_LOG_LEVEL_NAME: "%(levelname)s",
SERVE_LOG_TIME: "%(asctime)s",
}
144 changes: 120 additions & 24 deletions python/ray/serve/_private/logging_utils.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,84 @@
import logging
import os
from typing import Optional
import json
import copy

import ray
from ray.serve._private.constants import DEBUG_LOG_ENV_VAR, SERVE_LOGGER_NAME
from ray.serve._private.constants import (
DEBUG_LOG_ENV_VAR,
SERVE_LOGGER_NAME,
RAY_SERVE_ENABLE_JSON_LOGGING,
SERVE_LOG_RECORD_FORMAT,
SERVE_LOG_REQUEST_ID,
SERVE_LOG_ROUTE,
SERVE_LOG_APPLICATION,
SERVE_LOG_MESSAGE,
SERVE_LOG_DEPLOYMENT,
SERVE_LOG_COMPONENT,
SERVE_LOG_COMPONENT_ID,
SERVE_LOG_TIME,
SERVE_LOG_LEVEL_NAME,
SERVE_LOG_REPLICA,
)
from ray.serve._private.common import ServeComponentType


LOG_FILE_FMT = "{component_name}_{component_id}.log"
COMPONENT_LOG_FMT = (
"%(levelname)s %(asctime)s {component_name} {component_id} " # noqa:E501
)
MESSAGE_FMT = "%(filename)s:%(lineno)d - %(message)s"
REQUEST_ID_FMT = "%(request_id)s "
ROUTE_FMT = "%(route)s "


class ServeJSONFormatter(logging.Formatter):
"""Serve Logging Json Formatter

The formatter will generate the json log format on the fly
based on the field of record.
"""

def __init__(
self,
component_name: str,
component_id: str,
component_type: Optional[ServeComponentType] = None,
):
self.component_log_fmt = {
SERVE_LOG_LEVEL_NAME: SERVE_LOG_RECORD_FORMAT[SERVE_LOG_LEVEL_NAME],
SERVE_LOG_TIME: SERVE_LOG_RECORD_FORMAT[SERVE_LOG_TIME],
}
if component_type and component_type == ServeComponentType.DEPLOYMENT:
self.component_log_fmt[SERVE_LOG_DEPLOYMENT] = component_name
self.component_log_fmt[SERVE_LOG_REPLICA] = component_id
else:
self.component_log_fmt[SERVE_LOG_COMPONENT] = component_name
self.component_log_fmt[SERVE_LOG_COMPONENT_ID] = component_id

def format(self, record: logging.LogRecord) -> str:
"""Format the log record into json format.

Args:
record: The log record to be formatted.

Returns:
The formatted log record in json format.
"""
record_format = copy.deepcopy(self.component_log_fmt)
if SERVE_LOG_REQUEST_ID in record.__dict__:
record_format[SERVE_LOG_REQUEST_ID] = SERVE_LOG_RECORD_FORMAT[
SERVE_LOG_REQUEST_ID
]
if SERVE_LOG_ROUTE in record.__dict__:
record_format[SERVE_LOG_ROUTE] = SERVE_LOG_RECORD_FORMAT[SERVE_LOG_ROUTE]
if SERVE_LOG_APPLICATION in record.__dict__:
record_format[SERVE_LOG_APPLICATION] = SERVE_LOG_RECORD_FORMAT[
SERVE_LOG_APPLICATION
]

record_format[SERVE_LOG_MESSAGE] = SERVE_LOG_RECORD_FORMAT[SERVE_LOG_MESSAGE]

# create a formatter using the format string
formatter = logging.Formatter(json.dumps(record_format))

# format the log record using the formatter
return formatter.format(record)


class ServeFormatter(logging.Formatter):
Expand All @@ -21,22 +87,39 @@ class ServeFormatter(logging.Formatter):
The formatter will generate the log format on the fly based on the field of record.
"""

def __init__(self, component_name: str, component_id: str):
self.component_log_fmt = COMPONENT_LOG_FMT.format(
COMPONENT_LOG_FMT = f"%({SERVE_LOG_LEVEL_NAME})s %({SERVE_LOG_TIME})s {{{SERVE_LOG_COMPONENT}}} {{{SERVE_LOG_COMPONENT_ID}}} " # noqa:E501

def __init__(
self,
component_name: str,
component_id: str,
):
self.component_log_fmt = ServeFormatter.COMPONENT_LOG_FMT.format(
component_name=component_name, component_id=component_id
)

def format(self, record):
# generate a format string based on the record field.
cur_format = self.component_log_fmt
if "request_id" in record.__dict__:
cur_format += REQUEST_ID_FMT
if "route" in record.__dict__:
cur_format += ROUTE_FMT
cur_format += MESSAGE_FMT
def format(self, record: logging.LogRecord) -> str:
"""Format the log record into the format string.

Args:
record: The log record to be formatted.

Returns:
The formatted log record in string format.
"""
record_format = self.component_log_fmt
record_formats_attrs = []
if SERVE_LOG_REQUEST_ID in record.__dict__:
record_formats_attrs.append(SERVE_LOG_RECORD_FORMAT[SERVE_LOG_REQUEST_ID])
if SERVE_LOG_ROUTE in record.__dict__:
record_formats_attrs.append(SERVE_LOG_RECORD_FORMAT[SERVE_LOG_ROUTE])
if SERVE_LOG_APPLICATION in record.__dict__:
record_formats_attrs.append(SERVE_LOG_RECORD_FORMAT[SERVE_LOG_APPLICATION])
record_formats_attrs.append(SERVE_LOG_RECORD_FORMAT[SERVE_LOG_MESSAGE])
record_format += " ".join(record_formats_attrs)

# create a formatter using the format string
formatter = logging.Formatter(cur_format)
formatter = logging.Formatter(record_format)

# format the log record using the formatter
return formatter.format(record)
Expand Down Expand Up @@ -75,7 +158,7 @@ def configure_component_logger(
*,
component_name: str,
component_id: str,
component_type: Optional[str] = None,
component_type: Optional[ServeComponentType] = None,
log_level: int = logging.INFO,
max_bytes: Optional[int] = None,
backup_count: Optional[int] = None,
Expand All @@ -99,9 +182,11 @@ def record_factory(*args, **kwargs):
request_context = ray.serve.context._serve_request_context.get()
record = factory(*args, **kwargs)
if request_context.route:
record.route = request_context.route
setattr(record, SERVE_LOG_ROUTE, request_context.route)
if request_context.request_id:
record.request_id = request_context.request_id
setattr(record, SERVE_LOG_REQUEST_ID, request_context.request_id)
if request_context.app_name:
setattr(record, SERVE_LOG_APPLICATION, request_context.app_name)
return record

logging.setLogRecordFactory(record_factory)
Expand All @@ -119,17 +204,28 @@ def record_factory(*args, **kwargs):
max_bytes = ray._private.worker._global_node.max_bytes
if backup_count is None:
backup_count = ray._private.worker._global_node.backup_count

# For DEPLOYMENT component type, we want to log the deployment name
# instead of adding the component type to the component name.
component_log_file_name = component_name
if component_type is not None:
component_name = f"{component_type}_{component_name}"
component_log_file_name = f"{component_type}_{component_name}"
if component_type != ServeComponentType.DEPLOYMENT:
component_name = f"{component_type}_{component_name}"
log_file_name = LOG_FILE_FMT.format(
component_name=component_name, component_id=component_id
component_name=component_log_file_name, component_id=component_id
)
file_handler = logging.handlers.RotatingFileHandler(
os.path.join(logs_dir, log_file_name),
maxBytes=max_bytes,
backupCount=backup_count,
)
file_handler.setFormatter(ServeFormatter(component_name, component_id))
if RAY_SERVE_ENABLE_JSON_LOGGING:
file_handler.setFormatter(
ServeJSONFormatter(component_name, component_id, component_type)
)
else:
file_handler.setFormatter(ServeFormatter(component_name, component_id))
logger.addHandler(file_handler)


Expand Down
10 changes: 7 additions & 3 deletions python/ray/serve/_private/replica.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@
from ray._private.async_compat import sync_to_async

from ray.serve._private.autoscaling_metrics import start_metrics_pusher
from ray.serve._private.common import HEALTH_CHECK_CONCURRENCY_GROUP, ReplicaTag
from ray.serve._private.common import (
HEALTH_CHECK_CONCURRENCY_GROUP,
ReplicaTag,
ServeComponentType,
)
from ray.serve.config import DeploymentConfig
from ray.serve._private.constants import (
HEALTH_CHECK_METHOD,
Expand Down Expand Up @@ -76,7 +80,7 @@ async def __init__(
app_name: str = None,
):
configure_component_logger(
component_type="deployment",
component_type=ServeComponentType.DEPLOYMENT,
component_name=deployment_name,
component_id=replica_tag,
)
Expand Down Expand Up @@ -521,7 +525,7 @@ async def handle_request(self, request: Query) -> asyncio.Future:
# handle can pass the correct request context to subsequent replicas.
ray.serve.context._serve_request_context.set(
ray.serve.context.RequestContext(
request.metadata.route, request.metadata.request_id
request.metadata.route, request.metadata.request_id, self.app_name
)
)

Expand Down
Loading