Skip to content

Commit

Permalink
Merge develop to master for release 6.4.0 (#231)
Browse files Browse the repository at this point in the history
* Fix jinja template bug under SA-Eventgen app

* Feature timeMultiple (#141)

* Adjusting interval with timeMultiple

* Update issue templates

add the bug report and feature request issue templates

* changing the stanzas to produce data

* Windbag generator/count + end=0 edge cases (#145)

* Generator handling for count = -1, end = 0 working properly

* Update docs (#146)

* Updated docs, added release notes to changelog

* Bump version

* add python code style lint and format

* [Docs] update contribute docs (#148)

* [Docs] update contribute docs

* [Docs] update the contribute

* Fix make docs bug and summary anchor link error

* init unittest

* [Build] add ut for timeparser functions

* Add more UT for config module

* Add more unit tests for config module

* Pep8 (#151)

* Format to code standards, addressed linter errors/warnings

* Update docs

* Post-PEP8 Fixes (#157)

* skip sort, gitignore 3rd party libs

* fixed yapf ignore, ran format across repo

* Issue 160 (#163)

* Fixed timer and token

* Added a conditional for end=-1

* Update eventgentimer.py

* Fixed timer and token (#162)

* add extendIndexes feature (#154)

* add extendIndexes feature

* set extendIndexes as a list value

* correct log level

* upate doc, change num to weight

* Test fix (#168)

* Add sample functional test for replay mode

* Add token replacement functional tests

* skip failed case

* Added a timeout

* created a results dir

* Update version.json

* Fix previous pep8 format regression bug (#171)

* Fix merge conflict bug

* Delete tool orca related part (#178)

* add test for jinja template test (#177)

* clean up the logic about sampleDir and jinja_template_dir setting
Add functional test for jinja template generator
Fixes #174
Fixes #167

* revert the change about token, sample mode test cases

* merge the change from develop branch

* merge test conf from develop branch

* use urllib 1.24.2

* format with pep8

* fix bug in template dir resolution

* update test case to address multiline event

* fix a issue that when setup eventgen with 200+ indexers, any exceptions will terminate setup and re-writing default configure files

* Fix bug #179 urlparams is not set (#181)

* fix issue-183

* add modinput ft (#185)

* Issue 159 (#186)

* Fixed timer and token

* Added a conditional for end=-1

* Added timeMultiple logic

* Time Multiple Fix

* Add default pull request reviewers (#190)

* Default to -1 (#189)

* Changed verbose -> verbosity (#191)

* Update README.md (#195)

Like other Splunk products - Splunk Enterprise Security, Splunk Business Workflow ... - Splunk Event Generator does not need a definitive article "The" before the product name.

* update doc for friendly reading and add backward capability section. (#193)

* Update index.md (#194)

Update index.md

* change the sampleDir setting in test cases (#196)

* return status=2 when all tasks finished. (#182)

* return status=2 when all tasks finished.

* Change release verion to 6.3.6 (#200)

* Change release verion to 6.3.6

* Fix flaky tests (#204)

* Fix flaky tests

* Rename modinput to mod_input to avoid functional test fail

* when hit an exception, make sure egx logs it to files

* Fix circleci status badges (#208)

* Clean up and interval error (#211)

* Fix generatorWorks not work issue (#207)

* Fix navigation error with installed with splunk stream (#214)

* add metric_httpevent plugin

* update log content in metric_httpevent

* add doc for metric_httpevent

* Add 3rd lib in app (#210)

* Add 3rd lib in app

* Bugfix/197 multiprocess not working (#218)

* Fix issue 197 multiprocess not working

* fix issue 219 (#220)

* define httpevent_core and add fix eventgen-httpeventout log handler

* restore samples file

* restore unit test file

* add metric_httpevent plugin

* update log content in metric_httpevent

* add doc for metric_httpevent

* define httpevent_core and add fix eventgen-httpeventout log handler

* restore samples file

* restore unit test file

* Add license credits (#222)

* Feature/multi indexes (#224)

* Fix jinja template bug under SA-Eventgen app

* Update docs (#146)

* Updated docs, added release notes to changelog

* Bump version

* add python code style lint and format

* Add more unit tests for config module

* Pep8 (#151)

* Format to code standards, addressed linter errors/warnings

* add extendIndexes feature

* set extendIndexes as a list value

* upate doc, change num to weight

* calculate generate rate and use sequential index replacement

* update dockerfile

* randomize index replacement in each batch

* Fix jinja template bug under SA-Eventgen app

* Update docs (#146)

* Updated docs, added release notes to changelog

* add python code style lint and format

* Add more unit tests for config module

* calculate generate rate and use sequential index replacement

* update dockerfile

* randomize index replacement in each batch

* fix no len dict out of range

* clean duplicate code

* Revert "Metrics output plugin" (#226)

* [issue 217]disable logging queue in multiprocess mode (#223)

* disable logging queue in multiprocess mode

* Fixed fileName (#229)

* Issue 201 (#221)

* Removing unavailable servers

* Removed nonresponding httpserver

* Added a counter

* fixed a bug

* Added docs and spec

* Addressing unused var

* fixing url reference

* fixed currentreadsize

* Httpevent str formatting

* fix #166 (#192)
  • Loading branch information
li-wu authored Jun 5, 2019
1 parent 6868a8a commit 13a1747
Show file tree
Hide file tree
Showing 25 changed files with 349 additions and 190 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ eventgen_wsgi.conf
.cache
*.xml
!tests/large/splunk/input.xml
!splunk_eventgen/splunk_app/default/data/ui/nav/default.xml
dist
_book
*.result
Expand Down
56 changes: 55 additions & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@
same "printed page" as the copyright notice for easier
identification within third-party archives.

Copyright [yyyy] [name of copyright owner]
Copyright 2019 Splunk, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -200,3 +200,57 @@
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

=======================================================================
Splunk Event Generator

Splunk Event Generator project contains subcomponents with separate copyright
notices and license terms. Your use of the source code for the these
subcomponents is subject to the terms and conditions of the following
licenses.

========================================================================
Apache License 2.0
========================================================================
The following components are provided under the Apache License 2.0. See project link for details.

(Apache License 2.0) boto3 (https://github.com/boto/boto3/blob/master/LICENSE)
(Apache License 2.0) requests (https://github.com/kennethreitz/requests/blob/master/LICENSE)
(Apache License 2.0) pyOpenSSL (https://github.com/pyca/pyopenssl/blob/master/LICENSE)
(Apache License 2.0) docker (https://github.com/docker/docker-py/blob/master/LICENSE)
(Apache License 2.0) requests-futures (https://github.com/ross/requests-futures/blob/master/LICENSE)
(Apache License 2.0) nameko (https://github.com/nameko/nameko/blob/master/LICENSE.txt)

========================================================================
MIT licenses
========================================================================
The following components are provided under the MIT License. See project link for details.

(MIT License) pyyaml (https://github.com/yaml/pyyaml/blob/master/LICENSE)
(MIT License) httplib2 (https://github.com/httplib2/httplib2/blob/master/LICENSE)
(MIT License) urllib3 (https://github.com/urllib3/urllib3/blob/master/LICENSE.txt)
(MIT License) isort (https://github.com/timothycrosley/isort/blob/develop/LICENSE)
(MIT License) flake8 (https://gitlab.com/pycqa/flake8/blob/master/LICENSE)
(MIT License) pytest (https://github.com/pytest-dev/pytest/blob/master/LICENSE)
(MIT License) pytest-mock (https://github.com/pytest-dev/pytest-mock/blob/master/LICENSE)
(MIT License) pytest-xdist (https://github.com/pytest-dev/pytest-xdist/blob/master/LICENSE)
(MIT License) pytest-cov (https://github.com/pytest-dev/pytest-cov/blob/master/LICENSE)

========================================================================
BSD-style licenses
========================================================================

The following components are provided under a BSD-style license. See project link for details.

(BSD 2-Clause "Simplified" License) mock (https://github.com/testing-cabal/mock/blob/master/LICENSE.txt)
(BSD 3-Clause) pyrabbit (https://github.com/bkjones/pyrabbit/blob/master/LICENSE)
(BSD 3-Clause) logutils (https://opensource.org/licenses/BSD-3-Clause)
(BSD 3-Clause) jinja2 (https://github.com/pallets/jinja/blob/master/LICENSE)
(BSD 3-Clause) ujson(https://github.com/esnme/ultrajson/blob/master/LICENSE.txt)

========================================================================
PSF licenses
========================================================================

The following components are provided under a PSF license. See project link for details.
(PSD License) futures (https://github.com/agronholm/pythonfutures/blob/master/LICENSE)
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Splunk Event Generator (Eventgen)

### Status
[![CircleCI](https://circleci.com/gh/splunk/eventgen/tree/develop.svg?style=svg)](https://circleci.com/gh/splunk/eventgen/tree/develop)
[![CircleCI](https://circleci.com/gh/splunk/eventgen/tree/develop.svg?style=svg&circle-token=15e952a75e368102d8cebc6d9445af87e6c7d57e)](https://circleci.com/gh/splunk/eventgen/tree/develop)

### Introduction

Expand Down
4 changes: 2 additions & 2 deletions dockerfiles/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ RUN apk --no-cache upgrade && \
erlang-asn1 \
erlang-inets \
erlang-os-mon \
erlang-xmerl \
erlang-xmerl \
erlang-eldap \
erlang-syntax-tools \
pwgen \
xz \
curl \
bash && \
bash && \
rm -rf /var/cache/apk/* && \
curl -sL https://www.rabbitmq.com/releases/rabbitmq-server/v${RABBITMQ_VERSION}/rabbitmq-server-generic-unix-${RABBITMQ_VERSION}.tar.xz | tar -xJ -C /usr/local && \
ln -s /usr/local/rabbitmq_server-${RABBITMQ_VERSION}/sbin/rabbitmq-server /usr/sbin/rabbitmq-server && \
Expand Down
3 changes: 3 additions & 0 deletions docs/CONFIGURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,9 @@ specifically be supported by all plugins. Plugins that write to files like spool
httpeventWaitResponse = <bool>
* wait for all responses on a generator output before returning the outputter.
* Defaults to true.
---
httpeventAllowFailureCount = <int>
* Number of transmission failure allowed for a certain httpserver before we remove that server from the pool. For example, 100 means that we will no longer include a specific httpserver after 100 failures. Even after some failures, if we see a success for the server, we will reset the count and continue the transmission.

###### spool
spoolDir = <spool directory>
Expand Down
9 changes: 9 additions & 0 deletions docs/REFERENCE.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ maxQueueLength = 0
autotimestamps = [ <jsonlist> ]
autotimestamp = false
outputCounter = false
disableLoggingQueue = true
[<sample file name>]
Expand Down Expand Up @@ -105,6 +106,11 @@ useOutputQueue = true | false
for instance if you're outputting to a file or to stdout/modular input.
* Default value depends on the output plugin being used.
disableLoggingQueue = true | false
* Disable the logging queue for process mode
* In process mode, logs in each process will be collected via a logging queue
* Default is true which will disable the logging queue
#############################
## OUTPUT RELATED SETTINGS ##
#############################
Expand Down Expand Up @@ -161,6 +167,9 @@ httpeventWaitResponse = <bool>
* wait for all responses on a generator output before returning the outputter.
* Defaults to true.
httpeventAllowFailureCount = <int>
* Number of transmission failure allowed for a certain httpserver before we remove that server from the pool. For example, 100 means that we will no longer include a specific httpserver after 100 failures. Even after some failures, if we see a success for the server, we will reset the count and continue the transmission.
spoolDir = <spool directory>
* Spool directory is the generated files destination directory.
* Only valid in spool outputMode.
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ each_dict_entry_on_separate_line = false

[isort]
# isort/yapf solutions to below files are not compatible
skip = splunk_eventgen/lib/concurrent,splunk_eventgen/lib/requests_futures
skip = splunk_eventgen/lib/concurrent,splunk_eventgen/lib/requests_futures
13 changes: 13 additions & 0 deletions splunk_eventgen/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ def parse_args():
help="Use multiprocesing instead of threading")
generate_subparser.add_argument("--profiler", action="store_true", help="Turn on cProfiler")
generate_subparser.add_argument("--log-path", type=str, default="{0}/logs".format(FILE_LOCATION))
generate_subparser.add_argument(
"--generator-queue-size", type=int, default=500, help="the max queue size for the "
"generator queue, timer object puts all the generator tasks into this queue, default max size is 500")
# Build subparser
build_subparser = subparsers.add_parser('build', help="Will build different forms of sa-eventgen")
build_subparser.add_argument("--mode", type=str, default="splunk-app",
Expand Down Expand Up @@ -308,6 +311,16 @@ def build_splunk_app(dest, source=os.getcwd(), remove=True):
directory_default_dir = os.path.join(directory, 'default', 'eventgen.conf')
eventgen_conf = os.path.join(module_path, 'default', 'eventgen.conf')
shutil.copyfile(eventgen_conf, directory_default_dir)

# install 3rd lib dependencies
install_target = os.path.join(directory, 'lib')
install_cmd = "pip install --requirement splunk_eventgen/lib/requirements.txt --upgrade --no-compile " + \
"--no-binary :all: --target " + install_target
return_code = os.system(install_cmd)
if return_code != 0:
print("Failed to install dependencies via pip. Please check whether pip is installed.")
os.system("rm -rf " + os.path.join(install_target, "*.egg-info"))

make_tarfile(target_file, directory)
shutil.rmtree(splunk_app_samples)
if remove:
Expand Down
1 change: 1 addition & 0 deletions splunk_eventgen/default/eventgen.conf
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,4 @@ useOutputQueue = false
autotimestamps = [["\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}", "%Y-%m-%d %H:%M:%S"], ["\\d{1,2}\\/\\w{3}\\/\\d{4}\\s\\d{2}:\\d{2}:\\d{2}:\\d{1,3}", "%d/%b/%Y %H:%M:%S:%f"], ["\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}", "%Y-%m-%dT%H:%M:%S.%f"], ["\\d{1,2}/\\w{3}/\\d{4}\\s\\d{2}:\\d{2}:\\d{2}:\\d{1,3}", "%d/%b/%Y %H:%M:%S:%f"], ["\\d{1,2}/\\d{2}/\\d{2}\\s\\d{1,2}:\\d{2}:\\d{2}", "%m/%d/%y %H:%M:%S"], ["\\d{2}-\\d{2}-\\d{4} \\d{2}:\\d{2}:\\d{2}", "%m-%d-%Y %H:%M:%S"], ["\\w{3} \\w{3} +\\d{1,2} \\d{2}:\\d{2}:\\d{2}", "%a %b %d %H:%M:%S"], ["\\w{3} \\w{3} \\d{2} \\d{4} \\d{2}:\\d{2}:\\d{2}", "%a %b %d %Y %H:%M:%S"], ["^(\\w{3}\\s+\\d{1,2}\\s\\d{2}:\\d{2}:\\d{2})", "%b %d %H:%M:%S"], ["(\\w{3}\\s+\\d{1,2}\\s\\d{1,2}:\\d{1,2}:\\d{1,2})", "%b %d %H:%M:%S"], ["(\\w{3}\\s\\d{1,2}\\s\\d{1,4}\\s\\d{1,2}:\\d{1,2}:\\d{1,2})", "%b %d %Y %H:%M:%S"], ["\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}\\.\\d{3}", "%Y-%m-%d %H:%M:%S.%f"], ["\\,\\d{2}\\/\\d{2}\\/\\d{2,4}\\s+\\d{2}:\\d{2}:\\d{2}\\s+[AaPp][Mm]\\,", ",%m/%d/%Y %I:%M:%S %p,"], ["^\\w{3}\\s+\\d{2}\\s+\\d{2}:\\d{2}:\\d{2}", "%b %d %H:%M:%S"], ["\\d{2}/\\d{2}/\\d{4} \\d{2}:\\d{2}:\\d{2}", "%m/%d/%Y %H:%M:%S"], ["^\\d{2}\\/\\d{2}\\/\\d{2,4}\\s+\\d{2}:\\d{2}:\\d{2}\\s+[AaPp][Mm]", "%m/%d/%Y %I:%M:%S %p"], ["\\d{2}\\/\\d{2}\\/\\d{4}\\s\\d{2}:\\d{2}:\\d{2}", "%m-%d-%Y %H:%M:%S"], ["\\\"timestamp\\\":\\s\\\"(\\d+)", "%s"], ["\\d{2}\\/\\w+\\/\\d{4}\\s\\d{2}:\\d{2}:\\d{2}:\\d{3}", "%d-%b-%Y %H:%M:%S:%f"], ["\\\"created\\\":\\s(\\d+)", "%s"], ["\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}", "%Y-%m-%dT%H:%M:%S"], ["\\d{1,2}/\\w{3}/\\d{4}:\\d{2}:\\d{2}:\\d{2}:\\d{1,3}", "%d/%b/%Y:%H:%M:%S:%f"], ["\\d{1,2}/\\w{3}/\\d{4}:\\d{2}:\\d{2}:\\d{2}", "%d/%b/%Y:%H:%M:%S"]]
autotimestamp = false
httpeventWaitResponse = true
disableLoggingQueue = true
72 changes: 38 additions & 34 deletions splunk_eventgen/eventgen_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,15 @@ def __init__(self, args=None):
# attach to the logging queue
self.logger.info("Logging Setup Complete.")

self._generator_queue_size = getattr(self.args, 'generator_queue_size', 500)
if self._generator_queue_size < 0:
self._generator_queue_size = 0
self.logger.info("set generator queue size to %d", self._generator_queue_size)

if self.args and 'configfile' in self.args and self.args.configfile:
self._load_config(self.args.configfile, args=args)


def _load_config(self, configfile, **kwargs):
'''
This method will use a configfile and set self.confg as a processeded config object,
Expand Down Expand Up @@ -115,8 +121,13 @@ def _load_config(self, configfile, **kwargs):
self.config = Config(configfile, **new_args)
self.config.parse()
self._reload_plugins()
if "args" in kwargs and getattr(kwargs["args"], "generators"):
generator_worker_count = kwargs["args"].generators
else:
generator_worker_count = self.config.generatorWorkers

# TODO: Probably should destroy pools better so processes are cleaned.
self._setup_pools()
self._setup_pools(generator_worker_count)

def _reload_plugins(self):
# Initialize plugins
Expand All @@ -133,7 +144,7 @@ def _reload_plugins(self):
os.path.join(file_path, 'lib', 'plugins', 'rater'), self.config.plugins, 'rater')
self.config._complexSettings['rater'] = plugins
except Exception as e:
self.logger.exception(e)
self.logger.exception(str(e))

def _load_custom_plugins(self, PluginNotLoadedException):
plugintype = PluginNotLoadedException.type
Expand All @@ -147,7 +158,7 @@ def _load_custom_plugins(self, PluginNotLoadedException):
# APPPERF-263: be greedy when scanning plugin dir (eat all the pys)
self._initializePlugins(plugindir, pluginsdict, plugintype)

def _setup_pools(self):
def _setup_pools(self, generator_worker_count):
'''
This method is an internal method called on init to generate pools needed for processing.
Expand All @@ -157,7 +168,7 @@ def _setup_pools(self):
self._create_generator_pool()
self._create_timer_threadpool()
self._create_output_threadpool()
self._create_generator_workers()
self._create_generator_workers(generator_worker_count)

def _create_timer_threadpool(self, threadcount=100):
'''
Expand Down Expand Up @@ -212,15 +223,19 @@ def _create_generator_pool(self, workercount=20):
if self.args.multiprocess:
import multiprocessing
self.manager = multiprocessing.Manager()
self.loggingQueue = self.manager.Queue()
self.logging_pool = Thread(target=self.logger_thread, args=(self.loggingQueue, ), name="LoggerThread")
self.logging_pool.start()
if self.config.disableLoggingQueue:
self.loggingQueue = None
else:
# TODO crash caused by logging Thread https://github.com/splunk/eventgen/issues/217
self.loggingQueue = self.manager.Queue()
self.logging_pool = Thread(target=self.logger_thread, args=(self.loggingQueue, ), name="LoggerThread")
self.logging_pool.start()
# since we're now in multiprocess, we need to use better queues.
self.workerQueue = multiprocessing.JoinableQueue(maxsize=500)
self.workerQueue = multiprocessing.JoinableQueue(maxsize=self._generator_queue_size)
self.genconfig = self.manager.dict()
self.genconfig["stopping"] = False
else:
self.workerQueue = Queue(maxsize=500)
self.workerQueue = Queue(maxsize=self._generator_queue_size)
worker_threads = workercount
if hasattr(self.config, 'outputCounter') and self.config.outputCounter:
self.output_counters = []
Expand Down Expand Up @@ -354,13 +369,13 @@ def _worker_do_work(self, work_queue, logging_queue):
startTime = time.time()
item.run()
totalTime = time.time() - startTime
if totalTime > self.config.interval:
if totalTime > self.config.interval and self.config.end != 1:
self.logger.warning("work took longer than current interval, queue/threading throughput limitation")
work_queue.task_done()
except Empty:
pass
except Exception as e:
self.logger.exception(e)
self.logger.exception(str(e))
raise e

def _generator_do_work(self, work_queue, logging_queue, output_counter=None):
Expand All @@ -370,23 +385,27 @@ def _generator_do_work(self, work_queue, logging_queue, output_counter=None):
startTime = time.time()
item.run(output_counter=output_counter)
totalTime = time.time() - startTime
if totalTime > self.config.interval:
if totalTime > self.config.interval and item._sample.end != 1:
self.logger.warning("work took longer than current interval, queue/threading throughput limitation")
work_queue.task_done()
except Empty:
pass
except Exception as e:
self.logger.exception(e)
self.logger.exception(str(e))
raise e

@staticmethod
def _proc_worker_do_work(work_queue, logging_queue, config):
genconfig = config
stopping = genconfig['stopping']
qh = logutils.queue.QueueHandler(logging_queue)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
root.addHandler(qh)
if logging_queue is not None:
# TODO https://github.com/splunk/eventgen/issues/217
qh = logutils.queue.QueueHandler(logging_queue)
root.addHandler(qh)
else:
root.addHandler(logging.StreamHandler())
while not stopping:
try:
root.info("Checking for work")
Expand Down Expand Up @@ -418,7 +437,7 @@ def logger_thread(self, loggingQueue):
except Empty:
pass
except Exception as e:
self.logger.exception(e)
self.logger.exception(str(e))
raise e

def _initializePlugins(self, dirname, plugins, plugintype, name=None):
Expand Down Expand Up @@ -493,7 +512,7 @@ def _initializePlugins(self, dirname, plugins, plugintype, name=None):
self.logger.warn("Could not load plugin: %s, skipping" % mod_name.name)
self.logger.exception(ie)
except Exception as e:
self.logger.exception(e)
self.logger.exception(str(e))
raise e
return ret

Expand Down Expand Up @@ -521,21 +540,6 @@ def start(self, join_after_start=True):
if join_after_start:
self.logger.info("All timers started, joining queue until it's empty.")
self.join_process()
# Only need to start timers once
# Every 5 seconds, get values and output basic statistics about our operations
# TODO: Figure out how to do this better...
# generatorsPerSec = (generatorDecrements - generatorQueueCounter) / 5
# outputtersPerSec = (outputDecrements - outputQueueCounter) / 5
# outputQueueCounter = outputDecrements
# generatorQueueCounter = generatorDecrements
# self.logger.info('OutputQueueDepth=%d GeneratorQueueDepth=%d GeneratorsPerSec=%d OutputtersPerSec=%d' %
# (self.config.outputQueueSize.value(), self.config.generatorQueueSize.value(),
# generatorsPerSec, outputtersPerSec))
# kiloBytesPerSec = self.config.bytesSent.valueAndClear() / 5 / 1024
# gbPerDay = (kiloBytesPerSec / 1024 / 1024) * 60 * 60 * 24
# eventsPerSec = self.config.eventsSent.valueAndClear() / 5
# self.logger.info('GlobalEventsPerSec=%s KilobytesPerSec=%1f GigabytesPerDay=%1f' %
# (eventsPerSec, kiloBytesPerSec, gbPerDay))

def join_process(self):
'''
Expand All @@ -550,7 +554,7 @@ def join_process(self):
self.logger.info("All timers have finished, signalling workers to exit.")
self.stop()
except Exception as e:
self.logger.exception(e)
self.logger.exception(str(e))
raise e

def stop(self):
Expand All @@ -577,7 +581,7 @@ def stop(self):
count += 1
self.logger.info("All generators working/exited, joining output queue until it's empty.")
self.outputQueue.join()
self.logger.info("All items fully processed, stopping.")
self.logger.info("All items fully processed. Cleaning up internal processes.")
self.started = False
self.stopping = False

Expand Down
Loading

0 comments on commit 13a1747

Please sign in to comment.