Skip to content

Commit

Permalink
broaden third party library exclude, revert changes to libs
Browse files Browse the repository at this point in the history
  • Loading branch information
jmeixensperger committed Apr 4, 2019
1 parent 4cf6e42 commit 374c0c8
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 87 deletions.
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ each_dict_entry_on_separate_line = false

[isort]
# isort/yapf solutions to below files are not compatible
skip = splunk_eventgen/lib/concurrent/futures,splunk_eventgen/lib/requests_futures
skip = splunk_eventgen/lib/concurrent,splunk_eventgen/lib/requests_futures
14 changes: 10 additions & 4 deletions splunk_eventgen/lib/concurrent/futures/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
# -*- coding: utf-8 -*-

# Copyright 2009 Brian Quinlan. All Rights Reserved.
# Licensed to PSF under a Contributor Agreement.

"""Execute computations asynchronously using threads or processes."""

__author__ = 'Brian Quinlan (brian@sweetapp.com)'

from concurrent.futures._base import (ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION, CancelledError, Executor, Future,
TimeoutError, as_completed, wait)
from concurrent.futures._base import (FIRST_COMPLETED,
FIRST_EXCEPTION,
ALL_COMPLETED,
CancelledError,
TimeoutError,
Future,
Executor,
wait,
as_completed)
from concurrent.futures.thread import ThreadPoolExecutor

try:
Expand Down
75 changes: 42 additions & 33 deletions splunk_eventgen/lib/concurrent/futures/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
# Licensed to PSF under a Contributor Agreement.

import collections
import itertools
import logging
import threading
import itertools
import time

__author__ = 'Brian Quinlan (brian@sweetapp.com)'
Expand All @@ -23,34 +23,39 @@
CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
FINISHED = 'FINISHED'

_FUTURE_STATES = [PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
_FUTURE_STATES = [
PENDING,
RUNNING,
CANCELLED,
CANCELLED_AND_NOTIFIED,
FINISHED
]

_STATE_TO_DESCRIPTION_MAP = {
PENDING: "pending", RUNNING: "running", CANCELLED: "cancelled", CANCELLED_AND_NOTIFIED: "cancelled", FINISHED:
"finished"}
PENDING: "pending",
RUNNING: "running",
CANCELLED: "cancelled",
CANCELLED_AND_NOTIFIED: "cancelled",
FINISHED: "finished"
}

# Logger for internal use by the futures package.
LOGGER = logging.getLogger("concurrent.futures")


class Error(Exception):
"""Base class for all future-related exceptions."""
pass


class CancelledError(Error):
"""The Future was cancelled."""
pass


class TimeoutError(Error):
"""The operation exceeded the given deadline."""
pass


class _Waiter(object):
"""Provides the event that wait() and as_completed() block on."""

def __init__(self):
self.event = threading.Event()
self.finished_futures = []
Expand All @@ -64,7 +69,6 @@ def add_exception(self, future):
def add_cancelled(self, future):
self.finished_futures.append(future)


class _AsCompletedWaiter(_Waiter):
"""Used by as_completed()."""

Expand All @@ -87,7 +91,6 @@ def add_cancelled(self, future):
super(_AsCompletedWaiter, self).add_cancelled(future)
self.event.set()


class _FirstCompletedWaiter(_Waiter):
"""Used by wait(return_when=FIRST_COMPLETED)."""

Expand All @@ -103,7 +106,6 @@ def add_cancelled(self, future):
super(_FirstCompletedWaiter, self).add_cancelled(future)
self.event.set()


class _AllCompletedWaiter(_Waiter):
"""Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED)."""

Expand Down Expand Up @@ -134,7 +136,6 @@ def add_cancelled(self, future):
super(_AllCompletedWaiter, self).add_cancelled(future)
self._decrement_pending_calls()


class _AcquireFutures(object):
"""A context manager that does an ordered acquire of Future conditions."""

Expand All @@ -149,14 +150,14 @@ def __exit__(self, *args):
for future in self.futures:
future._condition.release()


def _create_and_install_waiters(fs, return_when):
if return_when == _AS_COMPLETED:
waiter = _AsCompletedWaiter()
elif return_when == FIRST_COMPLETED:
waiter = _FirstCompletedWaiter()
else:
pending_count = sum(f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs)
pending_count = sum(
f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs)

if return_when == FIRST_EXCEPTION:
waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True)
Expand All @@ -170,7 +171,6 @@ def _create_and_install_waiters(fs, return_when):

return waiter


def as_completed(fs, timeout=None):
"""An iterator over the given futures that yields each as it completes.
Expand All @@ -194,7 +194,9 @@ def as_completed(fs, timeout=None):

fs = set(fs)
with _AcquireFutures(fs):
finished = set(f for f in fs if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
finished = set(
f for f in fs
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
pending = fs - finished
waiter = _create_and_install_waiters(fs, _AS_COMPLETED)

Expand All @@ -208,7 +210,9 @@ def as_completed(fs, timeout=None):
else:
wait_timeout = end_time - time.time()
if wait_timeout < 0:
raise TimeoutError('%d (of %d) futures unfinished' % (len(pending), len(fs)))
raise TimeoutError(
'%d (of %d) futures unfinished' % (
len(pending), len(fs)))

waiter.event.wait(wait_timeout)

Expand All @@ -226,10 +230,8 @@ def as_completed(fs, timeout=None):
with f._condition:
f._waiters.remove(waiter)


DoneAndNotDoneFutures = collections.namedtuple('DoneAndNotDoneFutures', 'done not_done')


DoneAndNotDoneFutures = collections.namedtuple(
'DoneAndNotDoneFutures', 'done not_done')
def wait(fs, timeout=None, return_when=ALL_COMPLETED):
"""Wait for the futures in the given sequence to complete.
Expand All @@ -255,13 +257,15 @@ def wait(fs, timeout=None, return_when=ALL_COMPLETED):
futures.
"""
with _AcquireFutures(fs):
done = set(f for f in fs if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
done = set(f for f in fs
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
not_done = set(fs) - done

if (return_when == FIRST_COMPLETED) and done:
return DoneAndNotDoneFutures(done, not_done)
elif (return_when == FIRST_EXCEPTION) and done:
if any(f for f in done if not f.cancelled() and f.exception() is not None):
if any(f for f in done
if not f.cancelled() and f.exception() is not None):
return DoneAndNotDoneFutures(done, not_done)

if len(done) == len(fs):
Expand All @@ -277,7 +281,6 @@ def wait(fs, timeout=None, return_when=ALL_COMPLETED):
done.update(waiter.finished_futures)
return DoneAndNotDoneFutures(done, set(fs) - done)


class Future(object):
"""Represents the result of an asynchronous computation."""

Expand All @@ -302,12 +305,18 @@ def __repr__(self):
with self._condition:
if self._state == FINISHED:
if self._exception:
return '<Future at %s state=%s raised %s>' % (hex(id(self)), _STATE_TO_DESCRIPTION_MAP[self._state],
self._exception.__class__.__name__)
return '<Future at %s state=%s raised %s>' % (
hex(id(self)),
_STATE_TO_DESCRIPTION_MAP[self._state],
self._exception.__class__.__name__)
else:
return '<Future at %s state=%s returned %s>' % (hex(
id(self)), _STATE_TO_DESCRIPTION_MAP[self._state], self._result.__class__.__name__)
return '<Future at %s state=%s>' % (hex(id(self)), _STATE_TO_DESCRIPTION_MAP[self._state])
return '<Future at %s state=%s returned %s>' % (
hex(id(self)),
_STATE_TO_DESCRIPTION_MAP[self._state],
self._result.__class__.__name__)
return '<Future at %s state=%s>' % (
hex(id(self)),
_STATE_TO_DESCRIPTION_MAP[self._state])

def cancel(self):
"""Cancel the future if possible.
Expand Down Expand Up @@ -485,7 +494,9 @@ def set_running_or_notify_cancel(self):
self._state = RUNNING
return True
else:
LOGGER.critical('Future %s in unexpected state: %s', id(self), self._state)
LOGGER.critical('Future %s in unexpected state: %s',
id(self),
self._state)
raise RuntimeError('Future in unexpected state')

def set_result(self, result):
Expand Down Expand Up @@ -523,7 +534,6 @@ def set_exception(self, exception):
"""
self.set_exception_info(exception, None)


class Executor(object):
"""This is an abstract base class for concrete asynchronous executors."""

Expand Down Expand Up @@ -574,7 +584,6 @@ def result_iterator():
finally:
for future in fs:
future.cancel()

return result_iterator()

def shutdown(self, wait=True):
Expand Down
Loading

0 comments on commit 374c0c8

Please sign in to comment.