Skip to content

Commit

Permalink
Moving lock factory used in publisher client to the Batch implementat…
Browse files Browse the repository at this point in the history
…ion. (#4628)
  • Loading branch information
dhermes authored Dec 19, 2017
1 parent 9a18400 commit 56864fc
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 5 deletions.
10 changes: 10 additions & 0 deletions pubsub/google/cloud/pubsub_v1/publisher/batch/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ def __len__(self):
"""Return the number of messages currently in the batch."""
return len(self.messages)

@staticmethod
@abc.abstractmethod
def make_lock():
"""Return a lock in the chosen concurrency model.
Returns:
ContextManager: A newly created lock.
"""
raise NotImplementedError

@property
@abc.abstractmethod
def messages(self):
Expand Down
9 changes: 9 additions & 0 deletions pubsub/google/cloud/pubsub_v1/publisher/batch/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,15 @@ def __init__(self, client, topic, settings, autocommit=True):
)
self._thread.start()

@staticmethod
def make_lock():
"""Return a threading lock.
Returns:
_thread.Lock: A newly created lock.
"""
return threading.Lock()

@property
def client(self):
"""~.pubsub_v1.client.PublisherClient: A publisher client."""
Expand Down
12 changes: 8 additions & 4 deletions pubsub/google/cloud/pubsub_v1/publisher/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import copy
import os
import pkg_resources
import threading

import grpc
import six
Expand All @@ -44,16 +43,21 @@ class Client(object):
Args:
batch_settings (~google.cloud.pubsub_v1.types.BatchSettings): The
settings for batch publishing.
batch_class (class): A class that describes how to handle
batch_class (Optional[type]): A class that describes how to handle
batches. You may subclass the
:class:`.pubsub_v1.publisher.batch.base.BaseBatch` class in
order to define your own batcher. This is primarily provided to
allow use of different concurrency models; the default
is based on :class:`threading.Thread`.
is based on :class:`threading.Thread`. This class should also have
a class method (or static method) that takes no arguments and
produces a lock that can be used as a context manager.
kwargs (dict): Any additional arguments provided are sent as keyword
arguments to the underlying
:class:`~.gapic.pubsub.v1.publisher_client.PublisherClient`.
Generally, you should not need to set additional keyword arguments.
Before being passed along to the GAPIC constructor, a channel may
be added if ``credentials`` are passed explicitly or if the
Pub / Sub emulator is detected as running.
"""
def __init__(self, batch_settings=(), batch_class=thread.Batch, **kwargs):
# Sanity check: Is our goal to use the emulator?
Expand Down Expand Up @@ -86,7 +90,7 @@ def __init__(self, batch_settings=(), batch_class=thread.Batch, **kwargs):
# The batches on the publisher client are responsible for holding
# messages. One batch exists for each topic.
self._batch_class = batch_class
self._batch_lock = threading.Lock()
self._batch_lock = batch_class.make_lock()
self._batches = {}

@property
Expand Down
7 changes: 7 additions & 0 deletions pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ def test_init_infinite_latency():
assert batch._thread is None


@mock.patch.object(threading, 'Lock')
def test_make_lock(Lock):
lock = Batch.make_lock()
assert lock is Lock.return_value
Lock.assert_called_once_with()


def test_client():
client = create_client()
settings = types.BatchSettings()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
import os

from google.auth import credentials
import mock

import mock
import pytest

from google.cloud.pubsub_v1.gapic import publisher_client
Expand Down

0 comments on commit 56864fc

Please sign in to comment.