Skip to content

Commit

Permalink
Merge pull request #136 from splunk/count_all
Browse files Browse the repository at this point in the history
Changing default count and count parsing
  • Loading branch information
jmeixensperger authored Mar 5, 2019
2 parents 453d7fd + 6feb77e commit 88b0df7
Show file tree
Hide file tree
Showing 9 changed files with 33 additions and 58 deletions.
9 changes: 4 additions & 5 deletions docs/REFERENCE.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ sampletype = raw
interval = 60
delay = 0
timeMultiple = 1
## 0 means all lines in sample
count = 0
count = -1
## earliest/latest = now means timestamp replacements default to current time
earliest = now
latest = now
Expand Down Expand Up @@ -323,8 +322,8 @@ backfillSearchUrl = <url>
count = <integer>
* Maximum number of events to generate per sample file
* 0 means replay the entire sample.
* Defaults to 0.
* -1 means replay the entire sample.
* Defaults to -1.
perDayVolume = <float>
* This is used in place of count. The perDayVolume is a size supplied in GB per Day. This value will allow
Expand All @@ -333,7 +332,7 @@ perDayVolume = <float>
bundlelines = true | false
* For outside use cases where you need to take all the lines in a sample file and pretend they are
one event, but count = 0 will not work because you want to replay all the lines more than once.
one event.
Also, please note you can also use breaker=\r*\n\r*\n to break the sample file into multi-line
transactions that would work better than this as well. This is also useful where you want to bring
in sampletype = csv and bundle that multiple times.
Expand Down
3 changes: 1 addition & 2 deletions splunk_eventgen/default/eventgen.conf
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ sampletype = raw
interval = 60
delay = 0
timeMultiple = 1
## 0 means all lines in sample
count = 0
count = -1
## earliest/latest = now means timestamp replacements default to current time
earliest = now
latest = now
Expand Down
42 changes: 9 additions & 33 deletions splunk_eventgen/lib/eventgenconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,6 @@ def __init__(self, configfile=None, sample=None, override_outputter=False, overr

self._complexSettings['timezone'] = self._validateTimezone

self._complexSettings['count'] = self._validateCount

self._complexSettings['seed'] = self._validateSeed

self.stopping = False
Expand Down Expand Up @@ -251,7 +249,6 @@ def getSplunkUrl(self, s):
self.logger.debug("Getting Splunk URL: %s Method: %s Host: %s Port: %s" % (splunkUrl, splunkMethod, splunkHost, splunkPort))
return (splunkUrl, splunkMethod, splunkHost, splunkPort)


def parse(self):
"""Parse configs from Splunk REST Handler or from files.
We get called manually instead of in __init__ because we need find out if we're Splunk embedded before
Expand Down Expand Up @@ -684,7 +681,7 @@ def parse(self):
# Check for _time field, if it exists, add a timestamp to support it
if len(s.sampleDict) > 0:
if '_time' in s.sampleDict[0]:
self.logger.debugv("Found _time field, checking if default timestamp exists")
self.logger.debug("Found _time field, checking if default timestamp exists")
t = Token()
t.token = "\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}"
t.replacementType = "timestamp"
Expand All @@ -697,16 +694,16 @@ def parse(self):
found_token = True
break
if not found_token:
self.logger.debugv("Found _time adding timestamp to support")
self.logger.debug("Found _time adding timestamp to support")
s.tokens.append(t)
else:
self.logger.debugv("_time field exists and timestamp already configured")
self.logger.debug("_time field exists and timestamp already configured")

for e in s.sampleDict:
# Run punct against the line, make sure we haven't seen this same pattern
# Not totally exact but good enough for Rock'N'Roll
p = self._punct(e['_raw'])
# self.logger.debugv("Got punct of '%s' for event '%s'" % (p, e[s.timeField]))
self.logger.debug("Got punct of '%s' for event '%s'" % (p, e[s.timeField]))
if p not in line_puncts:
for x in at:
t = Token()
Expand All @@ -715,7 +712,7 @@ def parse(self):
t.replacement = x[1]

try:
# self.logger.debugv("Trying regex '%s' for format '%s' on '%s'" % (x[0], x[1], e[s.timeField]))
self.logger.debug("Trying regex '%s' for format '%s' on '%s'" % (x[0], x[1], e[s.timeField]))
ts = s.getTSFromEvent(e['_raw'], t)
if type(ts) == datetime.datetime:
found_token = False
Expand All @@ -725,7 +722,7 @@ def parse(self):
found_token = True
break
if not found_token:
self.logger.debugv("Found timestamp '%s', extending token with format '%s'" % (x[0], x[1]))
self.logger.debug("Found timestamp '%s', extending token with format '%s'" % (x[0], x[1]))
s.tokens.append(t)
# Drop this pattern from ones we try in the future
at = [ z for z in at if z[0] != x[0] ]
Expand All @@ -735,7 +732,6 @@ def parse(self):
line_puncts.append(p)
self.logger.debug("Finished parsing")


def _punct(self, string):
"""Quick method of attempting to normalize like events"""
string = string.replace('\\', '\\\\')
Expand All @@ -746,12 +742,11 @@ def _punct(self, string):
string = re.sub("[^,;\-#\$%&+./:=\?@\\\'|*\n\r\"(){}<>\[\]\^!]", "", string, flags=re.M)
return string


def _validateSetting(self, stanza, key, value):
"""Validates settings to ensure they won't cause errors further down the line.
Returns a parsed value (if the value is something other than a string).
If we've read a token, which is a complex config, returns a tuple of parsed values."""
self.logger.debugv("Validating setting for '%s' with value '%s' in stanza '%s'" % (key, value, stanza))
self.logger.debug("Validating setting for '%s' with value '%s' in stanza '%s'" % (key, value, stanza))
if key.find('token.') > -1:
results = re.match('token\.(\d+)\.(\w+)', key)
if results != None:
Expand Down Expand Up @@ -810,10 +805,10 @@ def _validateSetting(self, stanza, key, value):
# which will parse the value or raise a ValueError if it is unparseable
elif key in self._complexSettings:
complexSetting = self._complexSettings[key]
self.logger.debugv("Complex setting for '%s' in stanza '%s'" % (key, stanza))
self.logger.debug("Complex setting for '%s' in stanza '%s'" % (key, stanza))
# Set value to result of callback, e.g. parsed, or the function should raise an error
if isinstance(complexSetting, types.FunctionType) or isinstance(complexSetting, types.MethodType):
self.logger.debugv("Calling function for setting '%s' with value '%s'" % (key, value))
self.logger.debug("Calling function for setting '%s' with value '%s'" % (key, value))
value = complexSetting(value)
elif isinstance(complexSetting, list):
if not value in complexSetting:
Expand Down Expand Up @@ -844,23 +839,6 @@ def _validateTimezone(self, value):
self.logger.debug("Parsed timezone {}".format(value))
return value

def _validateCount(self, value):
"""Callback to override count to -1 if set to 0 in the config, otherwise return int"""
self.logger.debug("Validating count of {}".format(value))
# 5/13/14 CS Hack to take a zero count in the config and set it to a value which signifies
# the special condition rather than simply being zero events, setting to -1
try:
value = int(value)
except:
self.logger.error("Could not parse int for count {}".format(value))
raise ValueError("Could not parse int for count {}".format(value))

if value == 0:
value = -1
self.logger.debug("Count set to {}".format(value))

return value

def _validateSeed(self, value):
"""Callback to set random seed"""
self.logger.debug("Validating random seed {}".format(value))
Expand All @@ -873,8 +851,6 @@ def _validateSeed(self, value):
self.logger.info("Using random seed {}".format(value))
random.seed(value)



def _buildConfDict(self):
"""Build configuration dictionary that we will use """

Expand Down
13 changes: 6 additions & 7 deletions splunk_eventgen/lib/eventgentimer.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,21 +162,20 @@ def real_run(self):
lt = self.sample.latestTime()

try:
if count < 1:
if count < 1 and count != -1:
self.logger.info("There is no data to be generated in worker {0} because the count is {1}.".format(self.sample.config.generatorWorkers, count))
else:
# Spawn workers at the beginning of job rather than wait for next interval
self.logger.info("Start '%d' generatorWorkers for sample '%s'" % (
self.sample.config.generatorWorkers, self.sample.name))
for worker_id in range(self.config.generatorWorkers):
# self.generatorPlugin is only an instance, now we need a real plugin.
# make a copy of the sample so if it's mutated by another process, it won't mess up geeneration
# for this generator.
# self.generatorPlugin is only an instance, now we need a real plugin. Make a copy of
# of the sample in case another generator corrupts it.
copy_sample = copy.copy(self.sample)
tokens = copy.deepcopy(self.sample.tokens)
copy_sample.tokens = tokens
genPlugin = self.generatorPlugin(sample=copy_sample)
# need to make sure we set the queue right if we're using multiprocessing or thread modes
# Adjust queue for threading mode
genPlugin.updateConfig(config=self.config, outqueue=self.outputQueue)
genPlugin.updateCounts(count=count,
start_time=et,
Expand All @@ -198,11 +197,11 @@ def real_run(self):
self.executions += 1

# 8/20/15 CS Adding support for ending generation at a certain time
if self.end != None:
if self.end:
# 3/16/16 CS Adding support for ending on a number of executions instead of time
# Should be fine with storing state in this sample object since each sample has it's own unique
# timer thread
if self.endts == None:
if not self.endts:
if self.executions >= int(self.end):
self.logger.info("End executions %d reached, ending generation of sample '%s'" % (int(self.end), self.sample.name))
self.stopping = True
Expand Down
16 changes: 10 additions & 6 deletions splunk_eventgen/lib/plugins/rater/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,18 @@ def _setup_logging(self):
self.logger = logging.getLogger('eventgen')

def rate(self):
self._sample.count = int(self._sample.count)
if self._sample.count == -1:
self._sample.count = len(self._sample.sampleDict)
self._generatorWorkers = int(self._generatorWorkers)
count = self._sample.count/self._generatorWorkers
# 5/8/12 CS We've requested not the whole file, so we should adjust count based on
# hourOfDay, dayOfWeek and randomizeCount configs
rateFactor = 1.0
if self._sample.randomizeCount != 0 and self._sample.randomizeCount != None:
if self._sample.randomizeCount:
try:
self.logger.debugv("randomizeCount for sample '%s' in app '%s' is %s" \
% (self._sample.name, self._sample.app, self._sample.randomizeCount))
self.logger.debug("randomizeCount for sample '%s' in app '%s' is %s"
% (self._sample.name, self._sample.app, self._sample.randomizeCount))
# If we say we're going to be 20% variable, then that means we
# can be .1% high or .1% low. Math below does that.
randBound = round(self._sample.randomizeCount * 1000, 0)
Expand All @@ -60,16 +64,16 @@ def rate(self):
rateFactor *= randFactor
except:
import traceback
stack = traceback.format_exc()
stack = traceback.format_exc()
self.logger.error("Randomize count failed for sample '%s'. Stacktrace %s" % (self._sample.name, stack))
if type(self._sample.hourOfDayRate) == dict:
try:
rate = self._sample.hourOfDayRate[str(self._sample.now().hour)]
self.logger.debugv("hourOfDayRate for sample '%s' in app '%s' is %s" % (self._sample.name, self._sample.app, rate))
self.logger.debug("hourOfDayRate for sample '%s' in app '%s' is %s" % (self._sample.name, self._sample.app, rate))
rateFactor *= rate
except KeyError:
import traceback
stack = traceback.format_exc()
stack = traceback.format_exc()
self.logger.error("Hour of day rate failed for sample '%s'. Stacktrace %s" % (self._sample.name, stack))
if type(self._sample.dayOfWeekRate) == dict:
try:
Expand Down
3 changes: 2 additions & 1 deletion tests/medium/plugins/test_file_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@

import os
import sys
from mock import MagicMock, patch
from mock import patch
from splunk_eventgen.__main__ import parse_args
from splunk_eventgen.eventgen_core import EventGenerator

FILE_DIR = os.path.dirname(os.path.abspath(__file__))


class TestFileOutputPlugin(object):

def test_output_data_to_file(self):
Expand Down
3 changes: 1 addition & 2 deletions tests/medium/plugins/test_syslog_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@

import os
import sys
from mock import MagicMock, patch, call

from mock import MagicMock, patch
from splunk_eventgen.__main__ import parse_args
from splunk_eventgen.eventgen_core import EventGenerator
from splunk_eventgen.lib.plugins.output.syslogout import SyslogOutOutputPlugin
Expand Down
1 change: 0 additions & 1 deletion tests/medium/plugins/test_tcp_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import os
import sys
from mock import MagicMock, patch

from splunk_eventgen.__main__ import parse_args
from splunk_eventgen.eventgen_core import EventGenerator
from splunk_eventgen.lib.plugins.output.tcpout import TcpOutputPlugin
Expand Down
1 change: 0 additions & 1 deletion tests/medium/plugins/test_udp_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import os
import sys
from mock import MagicMock, patch

from splunk_eventgen.__main__ import parse_args
from splunk_eventgen.eventgen_core import EventGenerator
from splunk_eventgen.lib.plugins.output.udpout import UdpOutputPlugin
Expand Down

0 comments on commit 88b0df7

Please sign in to comment.