Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Overhaul logging background thread transport #3407

Merged
merged 6 commits into from
May 12, 2017
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion logging/google/cloud/logging/handlers/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@

DEFAULT_LOGGER_NAME = 'python'

EXCLUDED_LOGGER_DEFAULTS = ('google.cloud', 'oauth2client')
EXCLUDED_LOGGER_DEFAULTS = (

This comment was marked as spam.

This comment was marked as spam.

'google.cloud',
'google.auth',
'google_auth_httplib2',
)


class CloudLoggingHandler(logging.StreamHandler):
Expand Down
271 changes: 179 additions & 92 deletions logging/google/cloud/logging/handlers/transports/background_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,137 +19,224 @@

import atexit

This comment was marked as spam.

This comment was marked as spam.

import copy
import logging
import threading

from six.moves import queue

from google.cloud.logging.handlers.transports.base import Transport

_WORKER_THREAD_NAME = 'google.cloud.logging.handlers.transport.Worker'
_DEFAULT_GRACE_PERIOD = 5.0 # Seconds
_DEFAULT_MAX_BATCH_SIZE = 10
_WORKER_THREAD_NAME = 'google.cloud.logging.Worker'
_WORKER_TERMINATOR = object()

This comment was marked as spam.

_LOGGER = logging.getLogger(__name__)


class _Worker(object):
"""A threaded worker that writes batches of log entries
def _get_many(q, max_items=None):

This comment was marked as spam.

This comment was marked as spam.

"""Get multiple items from a Queue.

Gets at least one (blocking) and at most :param:`max_items` items

This comment was marked as spam.

This comment was marked as spam.

(non-blocking) from a given Queue. Does not mark the items as done.

Writes entries to the logger API.
:type q: ~queue.Queue

This comment was marked as spam.

This comment was marked as spam.

:param q: The Queue to get items from.

This class reuses a single :class:`Batch` method to write successive
entries.
:type max_items: int
:param max_items: The maximum number of items to get. If ``None``, then all
available items in the queue are returned.

Currently, the only public methods are constructing it (which also starts
it) and enqueuing :class:`Logger` (record, message) pairs.
:rtype: Sequence
:returns: A sequence of items retrieved from the queue.
"""
# Always return at least one item.
items = [q.get()]

This comment was marked as spam.

This comment was marked as spam.

while max_items is None or len(items) < max_items:

This comment was marked as spam.

This comment was marked as spam.

try:
items.append(q.get_nowait())
except queue.Empty:
break
return items

def __init__(self, logger):
self.started = False
self.stopping = False
self.stopped = False

# _entries_condition is used to signal from the main thread whether
# there are any waiting queued logger entries to be written
self._entries_condition = threading.Condition()
class _Worker(object):
"""A background thread that writes batches of log entries."""

# _stop_condition is used to signal from the worker thread to the
# main thread that it's finished its last entries
self._stop_condition = threading.Condition()
def __init__(self, cloud_logger, grace_period=_DEFAULT_GRACE_PERIOD,
max_batch_size=_DEFAULT_MAX_BATCH_SIZE):
"""
The background thread is started automatically.

This comment was marked as spam.

This comment was marked as spam.


# This object continually reuses the same :class:`Batch` object to
# write multiple entries at the same time.
self.logger = logger
self.batch = self.logger.batch()
:type cloud_logger: ~google.cloud.logging.logger.Logger
:param cloud_logger: The logger to send entries to.

self._thread = None
:type grace_period: float
:param grace_period: The amount of time to wait for pending logs to
be submitted when the process is shutting down.

# Number in seconds of how long to wait for worker to send remaining
self._stop_timeout = 5
:type max_batch_size: int
:param max_batch_size: The maximum number of items to send at a time
in the background thread.
"""
self._cloud_logger = cloud_logger
self._grace_period = grace_period
self._max_batch_size = max_batch_size
self._queue = queue.Queue(-1)

This comment was marked as spam.

This comment was marked as spam.

self._operational_lock = threading.Lock()
self._thread = None
self.start()

This comment was marked as spam.

This comment was marked as spam.


self._start()
@property
def is_alive(self):
"""Returns True is the background thread is running."""
return self._thread and self._thread.is_alive()

This comment was marked as spam.

This comment was marked as spam.


def _run(self):
def _thread_main(self):

This comment was marked as spam.

This comment was marked as spam.

"""The entry point for the worker thread.

Loops until ``stopping`` is set to :data:`True`, and commits batch
entries written during :meth:`enqueue`.
Pulls pending log entries off the queue and writes them in batches to
the Cloud Logger.
"""
try:
self._entries_condition.acquire()
self.started = True
while not self.stopping:
if len(self.batch.entries) == 0:
# branch coverage of this code extremely flaky
self._entries_condition.wait() # pragma: NO COVER

if len(self.batch.entries) > 0:
self.batch.commit()
finally:
self._entries_condition.release()

# main thread may be waiting for worker thread to finish writing its
# final entries. here we signal that it's done.
self._stop_condition.acquire()
self._stop_condition.notify()
self._stop_condition.release()

def _start(self):
"""Called by this class's constructor

This method is responsible for starting the thread and registering
the exit handlers.
_LOGGER.debug('Background thread started.')

quit = False

This comment was marked as spam.

This comment was marked as spam.

while True:
batch = self._cloud_logger.batch()
items = _get_many(self._queue, max_items=self._max_batch_size)

for item in items:
if item is _WORKER_TERMINATOR:
quit = True
# Continue, don't break, try to process all items we got
# back before quitting.
continue

This comment was marked as spam.

This comment was marked as spam.


batch.log_struct(**item)

total_logs = len(batch.entries)

try:

This comment was marked as spam.

This comment was marked as spam.

if total_logs:

This comment was marked as spam.

This comment was marked as spam.

batch.commit()
_LOGGER.debug('Submitted %d logs', total_logs)
except Exception:

This comment was marked as spam.

This comment was marked as spam.

_LOGGER.error(
'Failed to submit %d logs.', total_logs, exc_info=True)
finally:
# Mark all collected tasks as done.
for n in range(len(items)):

This comment was marked as spam.

This comment was marked as spam.

self._queue.task_done()

if quit:
break

_LOGGER.debug('Background thread exited gracefully.')

This comment was marked as spam.

This comment was marked as spam.


def start(self):
"""Starts the background thread.

Additionally, this registers a handler for process exit to attempt
to send any pending log entries before shutdown.
"""
try:
self._entries_condition.acquire()
with self._operational_lock:
if self.is_alive:
return

self._thread = threading.Thread(
target=self._run, name=_WORKER_THREAD_NAME)
target=self._thread_main,
name=_WORKER_THREAD_NAME)
self._thread.setDaemon(True)

This comment was marked as spam.

This comment was marked as spam.

self._thread.start()
finally:
self._entries_condition.release()
atexit.register(self._stop)
atexit.register(self._main_thread_terminated)

def stop(self, grace_period=None):
"""Signals the background thread to stop.

def _stop(self):
"""Signals the worker thread to shut down
This does not terminate the background thread. It simply queues the
stop signal. If the main process exits before the background thread
processes the stop signal, it will be terminated without finishing
work. The :param:`grace_period` parameter will give the background
thread some time to finish processing before this function returns.

Also waits for ``stop_timeout`` seconds for the worker to finish.
:type grace_period: float
:param grace_period: If specified, this method will block up to this
many seconds to allow the background thread to finish work before
returning.

This method is called by the ``atexit`` handler registered by
:meth:`start`.
:rtype: bool
:returns: True if the thread terminated. False if the thread is still
running.
"""
if not self.started or self.stopping:
return
if not self.is_alive:
return True

with self._operational_lock:
self._queue.put_nowait(_WORKER_TERMINATOR)

if grace_period is not None:
print('Waiting up to %d seconds.' % grace_period)

This comment was marked as spam.

This comment was marked as spam.


self._thread.join(timeout=grace_period)

# lock the stop condition first so that the worker
# thread can't notify it's finished before we wait
self._stop_condition.acquire()
success = not self.is_alive

This comment was marked as spam.

This comment was marked as spam.

self._thread = None

# now notify the worker thread to shutdown
self._entries_condition.acquire()
self.stopping = True
self._entries_condition.notify()
self._entries_condition.release()
return success

# now wait for it to signal it's finished
self._stop_condition.wait(self._stop_timeout)
self._stop_condition.release()
self.stopped = True
def _main_thread_terminated(self):
"""Callback that attempts to send pending logs before termination."""
if not self.is_alive:
return

if not self._queue.empty():
print(
'Program shutting down, attempting to send %d queued log '
'entries to Stackdriver Logging...' % self._queue.qsize())

This comment was marked as spam.

This comment was marked as spam.


if self.stop(self._grace_period):
print('Sent all pending logs.')
else:
print('Failed to send %d pending logs.' % self._queue.qsize())

This comment was marked as spam.

This comment was marked as spam.


def enqueue(self, record, message):
"""Queues up a log entry to be written by the background thread."""
try:
self._entries_condition.acquire()
if self.stopping:
return
info = {'message': message, 'python_logger': record.name}
self.batch.log_struct(info, severity=record.levelname)
self._entries_condition.notify()
finally:
self._entries_condition.release()
"""Queues a log entry to be written by the background thread.

:type record: :class:`logging.LogRecord`
:param record: Python log record that the handler was called with.

:type message: str
:param message: The message from the ``LogRecord`` after being
formatted by the associated log formatters.
"""
self._queue.put_nowait({

This comment was marked as spam.

This comment was marked as spam.

'info': {
'message': message,
'python_logger': record.name

This comment was marked as spam.

This comment was marked as spam.

},
'severity': record.levelname

This comment was marked as spam.

This comment was marked as spam.

})


class BackgroundThreadTransport(Transport):
"""Aysnchronous transport that uses a background thread.
"""Asynchronous transport that uses a background thread."""

This comment was marked as spam.


Writes logging entries as a batch process.
"""
def __init__(self, client, name, grace_period=_DEFAULT_GRACE_PERIOD,
batch_size=_DEFAULT_MAX_BATCH_SIZE):
"""

This comment was marked as spam.

This comment was marked as spam.

:type client: ~google.cloud.logging.client.Client

This comment was marked as spam.

This comment was marked as spam.

:param client: The Logging client.

:type name: str
:param name: the name of the logger.

def __init__(self, client, name):
:type grace_period: float
:param grace_period: The amount of time to wait for pending logs to
be submitted when the process is shutting down.

:type batch_size: int
:param batch_size: The maximum number of items to send at a time in the
background thread.

This comment was marked as spam.

This comment was marked as spam.

"""
http = copy.deepcopy(client._http)
self.client = client.__class__(
client.project, client._credentials, http)
Expand Down
4 changes: 2 additions & 2 deletions logging/nox.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def unit_tests(session, python_version):
'py.test', '--quiet',
'--cov=google.cloud.logging', '--cov=tests.unit', '--cov-append',
'--cov-config=.coveragerc', '--cov-report=', '--cov-fail-under=97',
'tests/unit',
'tests/unit', *session.posargs
)


Expand All @@ -63,7 +63,7 @@ def system_tests(session, python_version):
session.install('.')

# Run py.test against the system tests.
session.run('py.test', '-vvv', 'tests/system.py')
session.run('py.test', '-vvv', 'tests/system.py', *session.posargs)


@nox.session
Expand Down
Loading