Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature timeMultiple #141

Merged
merged 5 commits into from
Mar 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docs/REFERENCE.md
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,6 @@ autotimestamp = <boolean>
* Will enable autotimestamp feature which detects most common forms of timestamps in your samples with no configuration.

timeMultiple = <float>
* Only valid in mode = replay
* Will slow down the replay of events by <float> factor. For example, allows a 10 minute sample
to play out over 20 minutes with a timeMultiple of 2, or 60 minutes with a timeMultiple of 6.
By the converse, make timeMultiple 0.5 will make the events run twice as fast.
Expand Down
36 changes: 14 additions & 22 deletions splunk_eventgen/lib/eventgensamples.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ class Sample(object):
queueable = None
autotimestamp = None


# Internal fields
sampleLines = None
sampleDict = None
Expand Down Expand Up @@ -184,7 +183,7 @@ def getTSFromEvent(self, event, passed_token=None):
# self.logger.debug("Testing '%s' as a time string against '%s'" % (timeString, timeFormat))
if timeFormat == "%s":
ts = float(timeString) if len(timeString) < 10 else float(timeString) / (10**(len(timeString)-10))
# self.logger.debugv("Getting time for timestamp '%s'" % ts)
# self.logger.debug("Getting time for timestamp '%s'" % ts)
currentTime = datetime.datetime.fromtimestamp(ts)
else:
# self.logger.debugv("Getting time for timeFormat '%s' and timeString '%s'" % (timeFormat, timeString))
Expand All @@ -198,7 +197,7 @@ def getTSFromEvent(self, event, passed_token=None):
currentTime = datetime.datetime.strptime(timeString, timeFormat)
except AttributeError:
pass
self.logger.debugv("Match '%s' Format '%s' result: '%s'" % (timeString, timeFormat, currentTime))
self.logger.debug("Match '%s' Format '%s' result: '%s'" % (timeString, timeFormat, currentTime))
if type(currentTime) == datetime.datetime:
break
except ValueError:
Expand Down Expand Up @@ -272,7 +271,7 @@ def earliestTime(self):
# as an offset of now if they're relative times
if self._earliestParsed != None:
earliestTime = self.now() - self._earliestParsed
self.logger.debugv("Using cached earliest time: %s" % earliestTime)
self.logger.debug("Using cached earliest time: %s" % earliestTime)
else:
if self.earliest.strip()[0:1] == '+' or \
self.earliest.strip()[0:1] == '-' or \
Expand All @@ -281,18 +280,16 @@ def earliestTime(self):
temptd = self.now(realnow=True) - tempearliest
self._earliestParsed = datetime.timedelta(days=temptd.days, seconds=temptd.seconds)
earliestTime = self.now() - self._earliestParsed
self.logger.debugv("Calulating earliestParsed as '%s' with earliestTime as '%s' and self.sample.earliest as '%s'" % (self._earliestParsed, earliestTime, tempearliest))
self.logger.debug("Calulating earliestParsed as '%s' with earliestTime as '%s' and self.sample.earliest as '%s'" % (self._earliestParsed, earliestTime, tempearliest))
else:
earliestTime = timeParser(self.earliest, timezone=self.timezone)
self.logger.debugv("earliestTime as absolute time '%s'" % earliestTime)

self.logger.debug("earliestTime as absolute time '%s'" % earliestTime)
return earliestTime


def latestTime(self):
if self._latestParsed != None:
latestTime = self.now() - self._latestParsed
self.logger.debugv("Using cached latestTime: %s" % latestTime)
self.logger.debug("Using cached latestTime: %s" % latestTime)
else:
if self.latest.strip()[0:1] == '+' or \
self.latest.strip()[0:1] == '-' or \
Expand All @@ -301,22 +298,21 @@ def latestTime(self):
temptd = self.now(realnow=True) - templatest
self._latestParsed = datetime.timedelta(days=temptd.days, seconds=temptd.seconds)
latestTime = self.now() - self._latestParsed
self.logger.debugv("Calulating latestParsed as '%s' with latestTime as '%s' and self.sample.latest as '%s'" % (self._latestParsed, latestTime, templatest))
self.logger.debug("Calulating latestParsed as '%s' with latestTime as '%s' and self.sample.latest as '%s'" % (self._latestParsed, latestTime, templatest))
else:
latestTime = timeParser(self.latest, timezone=self.timezone)
self.logger.debugv("latstTime as absolute time '%s'" % latestTime)

self.logger.debug("latstTime as absolute time '%s'" % latestTime)
return latestTime

def utcnow(self):
return self.now(utcnow=True)

def _openSampleFile(self):
self.logger.debugv("Opening sample '%s' in app '%s'" % (self.name, self.app))
self.logger.debug("Opening sample '%s' in app '%s'" % (self.name, self.app))
self._sampleFH = open(self.filePath, 'rU')

def _closeSampleFile(self):
self.logger.debugv("Closing sample '%s' in app '%s'" % (self.name, self.app))
self.logger.debug("Closing sample '%s' in app '%s'" % (self.name, self.app))
self._sampleFH.close()

def loadSample(self):
Expand All @@ -329,11 +325,11 @@ def loadSample(self):
if self.sampleDict == None:
self._openSampleFile()
if self.breaker == self.config.breaker:
self.logger.debugv("Reading raw sample '%s' in app '%s'" % (self.name, self.app))
self.logger.debug("Reading raw sample '%s' in app '%s'" % (self.name, self.app))
self.sampleLines = self._sampleFH.readlines()
# 1/5/14 CS Moving to using only sampleDict and doing the breaking up into events at load time instead of on every generation
else:
self.logger.debugv("Non-default breaker '%s' detected for sample '%s' in app '%s'" \
self.logger.debug("Non-default breaker '%s' detected for sample '%s' in app '%s'" \
% (self.breaker, self.name, self.app) )

sampleData = self._sampleFH.read()
Expand All @@ -355,7 +351,7 @@ def loadSample(self):
searchpos = 0
breakerMatch = breakerRE.search(sampleData, searchpos)
while breakerMatch:
self.logger.debugv("Breaker found at: %d, %d" % (breakerMatch.span()[0], breakerMatch.span()[1]))
self.logger.debug("Breaker found at: %d, %d" % (breakerMatch.span()[0], breakerMatch.span()[1]))
# Ignore matches at the beginning of the file
if breakerMatch.span()[0] != 0:
self.sampleLines.append(sampleData[extractpos:breakerMatch.span()[0]])
Expand All @@ -374,7 +370,7 @@ def loadSample(self):
elif self.sampletype == 'csv':
if self.sampleDict == None:
self._openSampleFile()
self.logger.debugv("Reading csv sample '%s' in app '%s'" % (self.name, self.app))
self.logger.debug("Reading csv sample '%s' in app '%s'" % (self.name, self.app))
self.sampleDict = [ ]
self.sampleLines = [ ]
# Fix to load large csv files, work with python 2.5 onwards
Expand Down Expand Up @@ -415,7 +411,3 @@ def get_loaded_sample(self):
else:
self.loadSample()
return self.sampleLines




20 changes: 13 additions & 7 deletions splunk_eventgen/lib/eventgentimer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
import datetime, time
import time
import copy
from timeparser import timeParserTimeMath
from Queue import Full
Expand All @@ -20,8 +20,6 @@ class Timer(object):
Non-Queueable plugins, the Timer class calls the generator method of the plugin directly, tracks the amount of time
the plugin takes to generate and sleeps the remaining interval before calling generate again.
"""


time = None
countdown = None

Expand All @@ -39,6 +37,7 @@ def __init__(self, time, sample=None, config=None, genqueue=None, outputqueue=No
self.time = time
self.stopping = False
self.countdown = 0
self.interval = getattr(self.sample, "interval", config.interval)
#enable the logger
self._setup_logging()
self.logger.debug('Initializing timer for %s' % sample.name if sample is not None else "None")
Expand All @@ -48,6 +47,13 @@ def __init__(self, time, sample=None, config=None, genqueue=None, outputqueue=No
self.rater = rater_class(self.sample)
self.generatorPlugin = self.config.getPlugin('generator.' + self.sample.generator, self.sample)
self.outputPlugin = self.config.getPlugin('output.' + self.sample.outputMode, self.sample)
if self.sample.timeMultiple < 0:
self.logger.error("Invalid setting for timeMultiple: {}, value should be positive".format(
self.sample.timeMultiple))
elif self.sample.timeMultiple != 1:
self.interval = self.sample.interval * self.sample.timeMultiple
self.logger.debug("Adjusting interval {} with timeMultiple {}, new interval: {}".format(
self.sample.interval, self.sample.timeMultiple, self.interval))
self.logger.info("Start '%s' generatorWorkers for sample '%s'" % (self.sample.config.generatorWorkers, self.sample.name))

# loggers can't be pickled due to the lock object, remove them before we try to pickle anything.
Expand Down Expand Up @@ -92,7 +98,7 @@ def real_run(self):
if self.sample.delay > 0:
self.logger.info("Sample set to delay %s, sleeping." % self.sample.delay)
time.sleep(self.sample.delay)

self.logger.debug("Timer creating plugin for '%s'" % self.sample.name)

self.executions = 0
Expand Down Expand Up @@ -127,7 +133,7 @@ def real_run(self):
ret=realtime)
while backfillearliest < realtime:
et = backfillearliest
lt = timeParserTimeMath(plusminus="+", num=self.sample.interval, unit="s", ret=et)
lt = timeParserTimeMath(plusminus="+", num=self.interval, unit="s", ret=et)
genPlugin = self.generatorPlugin(sample=self.sample)
# need to make sure we set the queue right if we're using multiprocessing or thread modes
genPlugin.updateConfig(config=self.config, outqueue=self.outputQueue)
Expand All @@ -150,7 +156,7 @@ def real_run(self):
"current interval size is {}, which is smaller than a raw event size {}. wait for the next turn.".format(
count, raw_event_size))
previous_count_left = count
self.countdown = self.sample.interval
self.countdown = self.interval
self.executions += 1
continue
else:
Expand Down Expand Up @@ -193,7 +199,7 @@ def real_run(self):
pass

# Sleep until we're supposed to wake up and generate more events
self.countdown = self.sample.interval
self.countdown = self.interval
self.executions += 1

# 8/20/15 CS Adding support for ending generation at a certain time
Expand Down