Skip to content

Commit

Permalink
Issue 201 (#221)
Browse files Browse the repository at this point in the history
* Removing unavailable servers

* Removed nonresponding httpserver

* Added a counter

* fixed a bug

* Added docs and spec

* Addressing unused var

* fixing url reference

* fixed currentreadsize

* Httpevent str formatting
  • Loading branch information
arctan5x authored Jun 5, 2019
1 parent a84b4c5 commit 57f4549
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 64 deletions.
3 changes: 3 additions & 0 deletions docs/CONFIGURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,9 @@ specifically be supported by all plugins. Plugins that write to files like spool
httpeventWaitResponse = <bool>
* wait for all responses on a generator output before returning the outputter.
* Defaults to true.
---
httpeventAllowFailureCount = <int>
* Number of transmission failure allowed for a certain httpserver before we remove that server from the pool. For example, 100 means that we will no longer include a specific httpserver after 100 failures. Even after some failures, if we see a success for the server, we will reset the count and continue the transmission.

###### spool
spoolDir = <spool directory>
Expand Down
3 changes: 3 additions & 0 deletions docs/REFERENCE.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ httpeventWaitResponse = <bool>
* wait for all responses on a generator output before returning the outputter.
* Defaults to true.
httpeventAllowFailureCount = <int>
* Number of transmission failure allowed for a certain httpserver before we remove that server from the pool. For example, 100 means that we will no longer include a specific httpserver after 100 failures. Even after some failures, if we see a success for the server, we will reset the count and continue the transmission.
spoolDir = <spool directory>
* Spool directory is the generated files destination directory.
* Only valid in spool outputMode.
Expand Down
4 changes: 2 additions & 2 deletions splunk_eventgen/eventgen_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ def join_process(self):
try:
while not self.sampleQueue.empty() or self.sampleQueue.unfinished_tasks > 0 or not self.workerQueue.empty(
) or self.workerQueue.unfinished_tasks > 0:
time.sleep(10)
time.sleep(5)
self.logger.info("All timers have finished, signalling workers to exit.")
self.stop()
except Exception as e:
Expand Down Expand Up @@ -575,7 +575,7 @@ def stop(self):
count += 1
self.logger.info("All generators working/exited, joining output queue until it's empty.")
self.outputQueue.join()
self.logger.info("All items fully processed, stopping.")
self.logger.info("All items fully processed. Cleaning up internal processes.")
self.started = False
self.stopping = False

Expand Down
6 changes: 3 additions & 3 deletions splunk_eventgen/lib/eventgentimer.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def real_run(self):
self.sample.config.generatorWorkers, count))
else:
# Spawn workers at the beginning of job rather than wait for next interval
self.logger.info("Start '%d' generatorWorkers for sample '%s'" %
self.logger.info("Starting '%d' generatorWorkers for sample '%s'" %
(self.sample.config.generatorWorkers, self.sample.name))
for worker_id in range(self.config.generatorWorkers):
# self.generatorPlugin is only an instance, now we need a real plugin. Make a copy of
Expand All @@ -199,8 +199,8 @@ def real_run(self):
try:
self.generatorQueue.put(genPlugin)
self.executions += 1
self.logger.info(("Worker# {0}: Put {1} MB of events in queue for sample '{2}'" +
"with et '{3}' and lt '{4}'").format(
self.logger.debug(("Worker# {0}: Put {1} MB of events in queue for sample '{2}'" +
"with et '{3}' and lt '{4}'").format(
worker_id, round((count / 1024.0 / 1024), 4),
self.sample.name, et, lt))
except Full:
Expand Down
136 changes: 77 additions & 59 deletions splunk_eventgen/lib/plugins/output/httpevent.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def _setup_REST_workers(self, session=None, workers=10):
if not session:
session = Session()
self.session = FuturesSession(session=session, executor=ThreadPoolExecutor(max_workers=workers))
self.active_sessions = []
self.active_session_info = []

@staticmethod
def _urlencode(value):
Expand Down Expand Up @@ -98,28 +98,37 @@ def updateConfig(self, config):
'outputMode httpevent but httpeventServers not specified for sample %s' % self._sample.name)
# set default output mode to round robin
if hasattr(self.config, 'httpeventOutputMode') and self.config.httpeventOutputMode:
self.httpeventoutputmode = config.httpeventOutputMode
self.httpeventoutputmode = self.config.httpeventOutputMode
else:
if hasattr(self._sample, 'httpeventOutputMode') and self._sample.httpeventOutputMode:
self.httpeventoutputmode = self._sample.httpeventOutputMode
else:
self.httpeventoutputmode = 'roundrobin'

if hasattr(self.config, 'httpeventMaxPayloadSize') and self.config.httpeventMaxPayloadSize:
self.httpeventmaxsize = self.config.httpeventMaxPayloadSize
else:
if hasattr(self._sample, 'httpeventMaxPayloadSize') and self._sample.httpeventMaxPayloadSize:
self.httpeventmaxsize = self._sample.httpeventMaxPayloadSize
else:
self.httpeventmaxsize = 10000

if hasattr(self.config, 'httpeventAllowFailureCount') and self.config.httpeventAllowFailureCount:
self.httpeventAllowFailureCount = int(self.config.httpeventAllowFailureCount)
else:
if hasattr(self._sample, 'httpeventAllowFailureCount') and self._sample.httpeventAllowFailureCount:
self.httpeventAllowFailureCount = int(self._sample.httpeventAllowFailureCount)
else:
self.httpeventAllowFailureCount = 100

self.logger.debug("Currentmax size: %s " % self.httpeventmaxsize)
if isinstance(config.httpeventServers, str):
self.httpeventServers = json.loads(config.httpeventServers)
else:
self.httpeventServers = config.httpeventServers
self.logger.debug("Setting up the connection pool for %s in %s" % (self._sample.name, self._app))
self.createConnections()
self.logger.debug("Pool created.")
self.logger.debug("Finished init of httpevent plugin.")
self.logger.debug("Pool created and finished init of httpevent plugin.")
except Exception as e:
self.logger.exception(str(e))

Expand All @@ -128,31 +137,20 @@ def createConnections(self):
if self.httpeventServers:
for server in self.httpeventServers.get('servers'):
if not server.get('address'):
self.logger.error(
'requested a connection to a httpevent server, but no address specified for sample %s' %
self._sample.name)
raise ValueError(
'requested a connection to a httpevent server, but no address specified for sample %s' %
self._sample.name)
if not server.get('port'):
self.logger.error(
'requested a connection to a httpevent server, but no port specified for server %s' % server)
raise ValueError(
'requested a connection to a httpevent server, but no port specified for server %s' % server)
if not server.get('key'):
self.logger.error(
'requested a connection to a httpevent server, but no key specified for server %s' % server)
raise ValueError(
'requested a connection to a httpevent server, but no key specified for server %s' % server)
if not ((server.get('protocol') == 'http') or (server.get('protocol') == 'https')):
self.logger.error(
'requested a connection to a httpevent server, but no protocol specified for server %s' %
server)
raise ValueError(
'requested a connection to a httpevent server, but no protocol specified for server %s' %
server)
self.logger.debug(
"Validation Passed, Creating a requests object for server: %s" % server.get('address'))
self.logger.debug("Validation Passed, Creating a requests object for server: %s" % server.get('address'))

setserver = {}
setserver['url'] = "%s://%s:%s/services/collector" % (server.get('protocol'), server.get('address'),
Expand Down Expand Up @@ -182,50 +180,75 @@ def _sendHTTPEvents(self, payload):
self.logger.debugv("stringpayload: %s " % stringpayload)
else:
self.logger.debug("Max size for payload hit, sending to splunk then continuing.")
try:
self._transmitEvents(stringpayload)
totalbytessent += len(stringpayload)
currentreadsize = 0
stringpayload = targetline
except Exception as e:
self.logger.exception(str(e))
raise e
else:
try:
totalbytessent += len(stringpayload)
self.logger.debug(
"End of for loop hit for sending events to splunk, total bytes sent: %s ---- out of %s -----" %
(totalbytessent, totalbytesexpected))
self._transmitEvents(stringpayload)
except Exception as e:
self.logger.exception(str(e))
raise e
totalbytessent += len(stringpayload)
currentreadsize = targetlinesize
stringpayload = targetline

totalbytessent += len(stringpayload)
self.logger.debug(
"End of for loop hit for sending events to splunk, total bytes sent: %s ---- out of %s -----" %
(totalbytessent, totalbytesexpected))
self._transmitEvents(stringpayload)

def _transmitEvents(self, payloadstring):
targetServer = []
self.logger.debugv("Transmission called with payloadstring: %s " % payloadstring)
self.logger.debug("Transmission called with payloadstring length: %s " % len(payloadstring))

if not self.serverPool:
raise Exception("No available servers exist. Please check your httpServers.")

if self.httpeventoutputmode == "mirror":
targetServer = self.serverPool
else:
targetServer.append(random.choice(self.serverPool))

for server in targetServer:
self.logger.debug("Selected targetServer object: %s" % targetServer)
url = server['url']
headers = {}
headers['Authorization'] = server['header']
headers['content-type'] = 'application/json'
try:
payloadsize = len(payloadstring)
# response = requests.post(url, data=payloadstring, headers=headers, verify=False)
self.active_sessions.append(
self.session.post(url=url, data=payloadstring, headers=headers, verify=False))
session_info = list()
session_info.append(url)
session_info.append(self.session.post(url=url, data=payloadstring, headers=headers, verify=False))
self.active_session_info.append(session_info)
except Exception as e:
self.logger.error("Failed for exception: %s" % e)
self.logger.error("Failed sending events to url: %s sourcetype: %s size: %s" %
(url, self.lastsourcetype, payloadsize))
self.logger.debug(
"Failed sending events to url: %s headers: %s payload: %s" % (url, headers, payloadstring))
raise e
(url, self.lastsourcetype, len(payloadstring)))

def reset_count(self, url):
try:
self.config.httpeventServersCountdownMap[url] = self.httpeventAllowFailureCount
except:
pass

def remove_requets_target(self, url):
httpeventServers = json.loads(self.config.httpeventServers)

# If url fail more than specified count, we completely remove it from the pool.
try:
countdown_map = self.config.httpeventServersCountdownMap
except:
self.config.httpeventServersCountdownMap = {}
for i, server_info in enumerate(self.serverPool):
# URL is in format: https://2.2.2.2:8088/services/collector
self.config.httpeventServersCountdownMap[server_info.get('url', '')] = self.httpeventAllowFailureCount
countdown_map = self.config.httpeventServersCountdownMap

for i, server_info in enumerate(httpeventServers.get('servers', [])):
target_url = '{}://{}:{}'.format(server_info.get('protocol', ''), server_info.get('address', ''), server_info.get('port', ''))
if target_url in url:
if countdown_map[url] <= 0:
del httpeventServers.get('servers')[i]
self.logger.warning("Cannot reach {}. Removing from the server pool".format(url))
else:
countdown_map[url] -= 1
self.logger.debug("Cannot reach {}. Lowering countdown to {}".format(url, countdown_map[url]))
self.config.httpeventServers = json.dumps(httpeventServers)
self._sample.httpeventServers = httpeventServers
self.config.httpeventServersCountdownMap = countdown_map

def flush(self, q):
self.logger.debug("Flush called on httpevent plugin")
Expand Down Expand Up @@ -267,24 +290,19 @@ def flush(self, q):
payload.append(payloadFragment)
self.logger.debug("Finished processing events, sending all to splunk")
self._sendHTTPEvents(payload)
if self.config.httpeventWaitResponse:
for session in self.active_sessions:
response = session.result()
if not response.raise_for_status():
self.logger.debug("Payload successfully sent to httpevent server.")
else:
self.logger.error("Server returned an error while trying to send, response code: %s" %
response.status_code)
raise BadConnection(
"Server returned an error while sending, response code: %s" % response.status_code)
else:
if not self.config.httpeventWaitResponse:
self.logger.debug("Ignoring response from HTTP server, leaving httpevent outputter")
else:
for session_info in self.active_session_info:
url, session = session_info[0], session_info[1]
try:
response = session.result(5)
self.reset_count(url)
self.logger.debug("Payload successfully sent to " + url)
except Exception as e:
self.remove_requets_target(url)
except Exception as e:
self.logger.error('failed indexing events, reason: %s ' % e)

def _setup_logging(self):
self.logger = logging.getLogger('eventgen_httpeventout')

self.logger.error('Failed sending events, reason: %s ' % e)

def load():
"""Returns an instance of the plugin"""
Expand Down

0 comments on commit 57f4549

Please sign in to comment.