Skip to content

Commit

Permalink
Apply code review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
LinZhihao-723 committed Feb 7, 2025
1 parent e854fa1 commit b310993
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 337 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ readme = "README.md"
requires-python = ">=3.7"
dependencies = [
"backports.zoneinfo >= 0.2.1; python_version < '3.9'",
"clp-ffi-py == 0.1.0b1",
"clp-ffi-py == 0.0.14",
"typing-extensions >= 3.7.4",
"tzlocal == 5.1; python_version < '3.8'",
"tzlocal >= 5.2; python_version >= '3.8'",
Expand Down
54 changes: 15 additions & 39 deletions src/clp_logging/auto_generated_kv_pairs_utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import logging
from typing import Any, Dict, Optional
from typing import Any, Dict

ZONED_TIMESTAMP_KEY: str = "zoned_timestamp"
ZONED_TIMESTAMP_UTC_EPOCH_MS_KEY: str = "utc_epoch_ms"
ZONED_TIMESTAMP_TZ_KEY: str = "timezone"
from clp_logging.utils import Timestamp

TIMESTAMP_KEY: str = "timestamp"
TIMESTAMP_UNIX_TS_MS: str = "unix_ts_ms"
TIMESTAMP_UTC_OFFSET_SEC: str = "utc_offset_sec"

LEVEL_KEY: str = "level"
LEVEL_NO_KEY: str = "no"
Expand All @@ -13,23 +15,19 @@
SOURCE_CONTEXT_PATH_KEY: str = "path"
SOURCE_CONTEXT_LINE_KEY: str = "line"

LOGLIB_GENERATED_MSG_KEY: str = "loglib_generated_msg"


class AutoGeneratedKeyValuePairsBuffer:
"""
A reusable buffer for auto-generated key-value pairs.
This buffer maintains a predefined dictionary for common metadata fields,
to enable efficient reuse without creating new dictionaries for each log
event.
This buffer maintains a predefined dictionary for common metadata fields, to
enable efficient reuse without creating new dictionaries for each log event.
"""

def __init__(self) -> None:
self._buf: Dict[str, Any] = {
ZONED_TIMESTAMP_KEY: {
ZONED_TIMESTAMP_UTC_EPOCH_MS_KEY: None,
ZONED_TIMESTAMP_TZ_KEY: None,
TIMESTAMP_KEY: {
TIMESTAMP_UNIX_TS_MS: None,
TIMESTAMP_UTC_OFFSET_SEC: None,
},
LEVEL_KEY: {
LEVEL_NO_KEY: None,
Expand All @@ -41,22 +39,19 @@ def __init__(self) -> None:
},
}

def generate(
self, timestamp: int, timezone: Optional[str], record: logging.LogRecord
) -> Dict[str, Any]:
def generate(self, ts: Timestamp, record: logging.LogRecord) -> Dict[str, Any]:
"""
Generates the auto-generated key-value pairs by populating the
underlying buffer with the given log event metadata.
:param timestamp: The log event's millisecond Unix timestamp.
:param timezone: The timezone of the log event, or `None` if UTC.
:param ts: The timestamp assigned to the log event.
:param record: The LogRecord containing metadata for the log event.
:return: The populated underlying buffer as the auto-generated key-value
pairs.
"""

self._buf[ZONED_TIMESTAMP_KEY][ZONED_TIMESTAMP_UTC_EPOCH_MS_KEY] = timestamp
self._buf[ZONED_TIMESTAMP_KEY][ZONED_TIMESTAMP_TZ_KEY] = timezone
self._buf[TIMESTAMP_KEY][TIMESTAMP_UNIX_TS_MS] = ts.get_unix_ts()
self._buf[TIMESTAMP_KEY][TIMESTAMP_UTC_OFFSET_SEC] = ts.get_utc_offset()

# NOTE: We don't add all the metadata contained in `record`. Instead, we only add the
# following fields:
Expand All @@ -70,22 +65,3 @@ def generate(
self._buf[SOURCE_CONTEXT_KEY][SOURCE_CONTEXT_LINE_KEY] = record.lineno

return self._buf


def create_loglib_generated_log_event_as_auto_generated_kv_pairs(
timestamp: int, timezone: Optional[str], msg: str
) -> Dict[str, Any]:
"""
:param timestamp: The log event's millisecond Unix timestamp.
:param timezone: The timezone of the log event, or `None` if UTC.
:param msg: The log message generated by the logging library.
:return: The auto-generated key-value pairs that represent a log event generated by the logging
library itself.
"""
return {
ZONED_TIMESTAMP_KEY: {
ZONED_TIMESTAMP_UTC_EPOCH_MS_KEY: timestamp,
ZONED_TIMESTAMP_TZ_KEY: timezone,
},
LOGLIB_GENERATED_MSG_KEY: msg,
}
105 changes: 52 additions & 53 deletions src/clp_logging/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import time
import warnings
from abc import ABCMeta, abstractmethod
from contextlib import nullcontext
from math import floor
from pathlib import Path
from queue import Empty, Queue
Expand All @@ -18,10 +19,7 @@
from clp_ffi_py.utils import serialize_dict_to_msgpack
from zstandard import FLUSH_FRAME, ZstdCompressionWriter, ZstdCompressor

from clp_logging.auto_generated_kv_pairs_utils import (
AutoGeneratedKeyValuePairsBuffer,
create_loglib_generated_log_event_as_auto_generated_kv_pairs,
)
from clp_logging.auto_generated_kv_pairs_utils import AutoGeneratedKeyValuePairsBuffer
from clp_logging.protocol import (
BYTE_ORDER,
EOF_CHAR,
Expand All @@ -31,6 +29,7 @@
UINT_MAX,
ULONG_MAX,
)
from clp_logging.utils import Timestamp

# TODO: lock writes to zstream if GIL ever goes away
# Note: no need to quote "Queue[Tuple[int, bytes]]" in python 3.9
Expand Down Expand Up @@ -228,7 +227,7 @@ def __init__(
self.hard_timeout_thread: Optional[Timer] = None
self.soft_timeout_thread: Optional[Timer] = None

def set_ostream(self, ostream: Union[ZstdCompressionWriter, IO[bytes], Serializer]) -> None:
def set_ostream(self, ostream: Union[ZstdCompressionWriter, IO[bytes]]) -> None:
self.ostream = ostream

def timeout(self) -> None:
Expand Down Expand Up @@ -829,26 +828,21 @@ class ClpKeyValuePairStreamHandler(logging.Handler):
:param stream: A writable byte output stream to which the handler will write the serialized IR
byte sequences.
:param enable_compression: Whether to compress the serialized IR byte sequences using zstd.
:param loglevel_timeout: Customized timeout configuration.
"""

def __init__(
self,
stream: IO[bytes],
enable_compression: bool = True,
timezone: Optional[str] = None,
loglevel_timeout: Optional[CLPLogLevelTimeout] = None,
) -> None:
super().__init__()

self._enable_compression: bool = enable_compression
self._tz: Optional[str] = timezone
self._loglevel_timeout: Optional[CLPLogLevelTimeout] = loglevel_timeout
self._serializer: Optional[Serializer] = None
self._formatter: Optional[logging.Formatter] = None
self._ostream: IO[bytes] = stream

self._auto_generated_kv_pairs_buf: AutoGeneratedKeyValuePairsBuffer = (
self._auto_gen_kv_pairs_buf: AutoGeneratedKeyValuePairsBuffer = (
AutoGeneratedKeyValuePairsBuffer()
)

Expand Down Expand Up @@ -878,90 +872,95 @@ def emit(self, record: logging.LogRecord) -> None:
except Exception:
self.handleError(record)

# Added to `logging.StreamHandler` in Python 3.7
# override
def setStream(self, stream: IO[bytes]) -> Optional[IO[bytes]]:
"""
Sets the instance’s stream to the specified value, if it is different. The old stream is
flushed before the new stream is set.
NOTE: The old stream will be closed by calling this method.
:param stream: A writable byte output stream to which the handler will write the serialized
IR byte sequences.
:return: The old stream if the stream was changed, or `None` if it wasn't.
"""

# NOTE: This function is implemented by mirroring CPython's implementation.

if stream is self._ostream:
return None

old_stream: IO[bytes] = self._ostream
self._ostream = stream
# TODO: The following call will close the old stream. However, `logging.StreamHandler`'s
# implementation will only flush the stream but leave it opened. To support this behaviour,
# we need `clp_ffi_py.ir.Serializer` to allow closing the serializer without closing the
# underlying output stream.
self._init_new_serializer(stream)
with self.lock if self.lock else nullcontext():
# TODO: The following call will close the old stream. However, `logging.StreamHandler`'s
# implementation will only flush the stream but leave it opened. To support this
# behaviour, we need `clp_ffi_py.ir.Serializer` to allow closing the serializer without
# closing the underlying output stream.
self._init_new_serializer(stream)
self._ostream = stream
return old_stream

# override
def close(self) -> None:
if self._serializer is None:
if self._is_closed():
return
if self._loglevel_timeout:
self._loglevel_timeout.timeout()
# NOTE: Closing the serializer will ensure that any buffered results are flushed and the
# underlying output stream is properly closed.
self._serializer.close()
self._serializer = None
self._close_serializer()
super().close()

def _is_closed(self) -> bool:
return self._serializer is None

def _close_serializer(self) -> None:
"""
Closes the current serializer if it has been set.
NOTE: The underlying output stream will also be closed.
"""
if self._is_closed():
return
assert self._serializer is not None
self._serializer.close()
self._serializer = None

def _init_new_serializer(self, stream: IO[bytes]) -> None:
"""
Initializes a new serializer that writes to the given stream.
:param stream: The stream that the underlying serializer will write to.
"""
cctx: ZstdCompressor = ZstdCompressor()
self._close_serializer()
self._serializer = Serializer(
cctx.stream_writer(stream) if self._enable_compression else stream
ZstdCompressor().stream_writer(stream) if self._enable_compression else stream
)

def _write(self, record: logging.LogRecord) -> None:
"""
Writes the log event into the underlying serializer.
:param record: The log event to serialize.
:raise IOError: If the handler has been already closed.
:raise RuntimeError: If the handler has been already closed.
:raise TypeError: If `record.msg` is not a Python dictionary.
"""
if self._is_closed():
raise IOError("Stream already closed.")
raise RuntimeError("Stream already closed.")

if not isinstance(record.msg, dict):
raise TypeError("The log msg must be a valid Python dictionary.")

curr_ts: int = floor(time.time() * 1000)

if self._loglevel_timeout is not None:
self._loglevel_timeout.update(record.levelno, curr_ts, self._log_level_timeout_callback)

self._serialize_kv_pair_log_event(
self._auto_generated_kv_pairs_buf.generate(curr_ts, self._tz, record), record.msg
self._auto_gen_kv_pairs_buf.generate(Timestamp.now(), record), record.msg
)

def _serialize_kv_pair_log_event(
self, auto_generated_kv_pairs: Dict[str, Any], user_generated_kv_pairs: Dict[str, Any]
self, auto_gen_kv_pairs: Dict[str, Any], user_gen_kv_pairs: Dict[str, Any]
) -> None:
"""
:param auto_generated_kv_pairs: A dict of auto generated kv pairs.
:param user_generated_kv_pairs: A dict of user generated kv pairs.
:param auto_gen_kv_pairs: A dict of auto generated kv pairs.
:param user_gen_kv_pairs: A dict of user generated kv pairs.
"""
log_event: Dict[str, Any] = {
AUTO_GENERATED_KV_PAIRS_KEY: auto_generated_kv_pairs,
USER_GENERATED_KV_PAIRS_KEY: user_generated_kv_pairs,
}
if self._is_closed():
raise RuntimeError("Stream already closed.")
assert self._serializer is not None
self._serializer.serialize_log_event_from_msgpack_map(serialize_dict_to_msgpack(log_event))

def _log_level_timeout_callback(self, msg: str) -> None:
"""
Callback for `CLPLogLevelTimeout` to log internal errors.
:param msg: The message sent from `CLPLogLevelTimeout`.`
"""
curr_ts: int = floor(time.time() * 1000)
self._serialize_kv_pair_log_event(
create_loglib_generated_log_event_as_auto_generated_kv_pairs(curr_ts, self._tz, msg), {}
self._serializer.serialize_log_event_from_msgpack_map(
serialize_dict_to_msgpack(auto_gen_kv_pairs),
serialize_dict_to_msgpack(user_gen_kv_pairs),
)
45 changes: 45 additions & 0 deletions src/clp_logging/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from __future__ import annotations

import time
from math import floor


class Timestamp:
"""
This class represents a Unix timestamp along with the timezone offset from
UTC.
"""

@staticmethod
def now() -> Timestamp:
"""
:return: A `Timestamp` instance representing the current time.
"""
ts: float = time.time()
return Timestamp(
unix_ts=floor(ts * 1000),
utc_offset=time.localtime(ts).tm_isdst,
)

def __init__(self, unix_ts: int, utc_offset: int):
"""
Initializes a `Timestamp` instance with the current time.
:param unix_ts: Unix timestamp in milliseconds.
:param utc_offset: The number of seconds the timezone is ahead of
(positive) or behind (negative) UTC.
"""
self._utc_offset: int = utc_offset
self._unix_ts: int = unix_ts

def get_unix_ts(self) -> int:
"""
:return: The Unix timestamp (milliseconds since the Unix epoch).
"""
return self._unix_ts

def get_utc_offset(self) -> int:
"""
:return: The number of seconds the timezone is ahead of (positive) or behind (negative) UTC.
"""
return self._utc_offset
Loading

0 comments on commit b310993

Please sign in to comment.