Skip to content

Commit

Permalink
Merge pull request locustio#10 from cpennington/bulk-data-export
Browse files Browse the repository at this point in the history
Bulk data export
  • Loading branch information
cpennington committed Aug 20, 2015
2 parents e3fc3c0 + f1f4a2e commit 82ed91d
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 80 deletions.
47 changes: 20 additions & 27 deletions locust/clients.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import re
import time
from collections import defaultdict
from datetime import timedelta
from urlparse import urlparse, urlunparse

Expand Down Expand Up @@ -113,7 +114,8 @@ def request(self, method, url, name=None, catch_response=False, **kwargs):
response = self._send_request_safe_mode(method, url, **kwargs)

# record the consumed time
request_meta["response_time"] = int((time.time() - request_meta["start_time"]) * 1000)
request_meta["end_time"] = time.time()
request_meta["response_time"] = int((request_meta["end_time"] - request_meta["start_time"]) * 1000)


request_meta["name"] = name or (response.history and response.history[0] or response).request.path_url
Expand All @@ -126,25 +128,11 @@ def request(self, method, url, name=None, catch_response=False, **kwargs):
request_meta["content_size"] = len(response.content or "")

if catch_response:
response.locust_request_meta = request_meta
return ResponseContextManager(response)
return ResponseContextManager(response, request_meta)
else:
try:
response.raise_for_status()
except RequestException as e:
events.request_failure.fire(
request_type=request_meta["method"],
name=request_meta["name"],
response_time=request_meta["response_time"],
exception=e,
)
else:
events.request_success.fire(
request_type=request_meta["method"],
name=request_meta["name"],
response_time=request_meta["response_time"],
response_length=request_meta["content_size"],
)
with ResponseContextManager(response, request_meta):
# Let the ResponseContextManager default behavior handle success/failure reporting
pass
return response

def _send_request_safe_mode(self, method, url, **kwargs):
Expand Down Expand Up @@ -177,9 +165,10 @@ class ResponseContextManager(LocustResponse):

_is_reported = False

def __init__(self, response):
def __init__(self, response, request_meta):
# copy data from response to this object
self.__dict__ = response.__dict__
self._request_meta = request_meta

def __enter__(self):
return self
Expand Down Expand Up @@ -215,10 +204,12 @@ def success(self):
response.success()
"""
events.request_success.fire(
request_type=self.locust_request_meta["method"],
name=self.locust_request_meta["name"],
response_time=self.locust_request_meta["response_time"],
response_length=self.locust_request_meta["content_size"],
request_type=self._request_meta["method"],
name=self._request_meta["name"],
response_time=self._request_meta["response_time"],
response_length=self._request_meta["content_size"],
start_time=self._request_meta["start_time"],
end_time=self._request_meta["end_time"],
)
self._is_reported = True

Expand All @@ -239,9 +230,11 @@ def failure(self, exc):
exc = CatchResponseError(exc)

events.request_failure.fire(
request_type=self.locust_request_meta["method"],
name=self.locust_request_meta["name"],
response_time=self.locust_request_meta["response_time"],
request_type=self._request_meta["method"],
name=self._request_meta["name"],
response_time=self._request_meta["response_time"],
exception=exc,
start_time=self._request_meta["start_time"],
end_time=self._request_meta["end_time"],
)
self._is_reported = True
9 changes: 8 additions & 1 deletion locust/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ class EventHook(object):
"""
Simple event class used to provide hooks for different types of events in Locust.
Event handlers should always accept **kwargs to allow for additional data
to be added to the events in the future.
Here's how to use the EventHook class::
my_event = EventHook()
Expand Down Expand Up @@ -30,12 +33,14 @@ def fire(self, **kwargs):
"""
*request_success* is fired when a request is completed successfully.
Listeners should take the following arguments:
Event is fired with the following arguments:
* *request_type*: Request type method used
* *name*: Path to the URL that was called (or override name if it was used in the call to the client)
* *response_time*: Response time in milliseconds
* *response_length*: Content-length of the response
* *start_time*: The time that the request started
* *end_time*: The time that the request completed
"""

request_failure = EventHook()
Expand All @@ -48,6 +53,8 @@ def fire(self, **kwargs):
* *name*: Path to the URL that was called (or override name if it was used in the call to the client)
* *response_time*: Time in milliseconds until exception was thrown
* *exception*: Exception instance that was thrown
* *start_time*: The time that the request started
* *end_time*: The time that the request failed
"""

locust_error = EventHook()
Expand Down
31 changes: 14 additions & 17 deletions locust/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import math
import tablib
from tabulate import tabulate
from collections import namedtuple
from collections import namedtuple, defaultdict
from functools import partial

try:
Expand Down Expand Up @@ -248,27 +248,24 @@ def reset(self):
self.min_response_time = None
self.max_response_time = 0
self.last_request_timestamp = int(time.time())
self.num_reqs_per_sec = {}
self.num_reqs_per_sec = defaultdict(int)
self.total_content_length = 0
self._jittered_response_times = None

def log(self, response_time, content_length):
def log(self, response_time, content_length, end_time):
self.stats.num_requests += 1
self.num_requests += 1

self._log_time_of_request()
self._log_time_of_request(end_time)
self._log_response_time(response_time)

# increase total content-length
self.total_content_length += content_length

self._jittered_response_times = None

def _log_time_of_request(self):
t = int(time.time())
self.num_reqs_per_sec[t] = self.num_reqs_per_sec.setdefault(t, 0) + 1
self.last_request_timestamp = t
self.stats.last_request_timestamp = t
def _log_time_of_request(self, end_time):
end_time = int(end_time)
self.num_reqs_per_sec[end_time] += 1
self.last_request_timestamp = end_time
self.stats.last_request_timestamp = end_time

def _log_response_time(self, response_time):
self.total_response_time += response_time
Expand Down Expand Up @@ -600,22 +597,22 @@ def median_from_dict(total, count):
A global instance for holding the statistics. Should be removed eventually.
"""

def on_request_success(request_type, name, response_time, response_length):
def on_request_success(request_type, name, response_time, response_length, end_time, **kwargs):
if global_stats.max_requests is not None and (global_stats.num_requests + global_stats.num_failures) >= global_stats.max_requests:
raise StopLocust("Maximum number of requests reached")
global_stats.get(name, request_type).log(response_time, response_length)
global_stats.get(name, request_type).log(response_time, response_length, end_time)

def on_request_failure(request_type, name, response_time, exception):
def on_request_failure(request_type, name, response_time, exception, end_time, **kwargs):
if global_stats.max_requests is not None and (global_stats.num_requests + global_stats.num_failures) >= global_stats.max_requests:
raise StopLocust("Maximum number of requests reached")
global_stats.get(name, request_type).log_error(exception)

def on_report_to_master(client_id, data):
def on_report_to_master(client_id, data, **kwargs):
data["stats"] = [global_stats.entries[key].get_stripped_report() for key in global_stats.entries.iterkeys() if not (global_stats.entries[key].num_requests == 0 and global_stats.entries[key].num_failures == 0)]
data["errors"] = dict([(k, e.to_dict()) for k, e in global_stats.errors.iteritems()])
global_stats.errors = {}

def on_slave_report(client_id, data):
def on_slave_report(client_id, data, **kwargs):
for stats_data in data["stats"]:
entry = StatsEntry.unserialize(stats_data)
request_key = (entry.name, entry.method)
Expand Down
2 changes: 1 addition & 1 deletion locust/test/test_locust_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ class MyLocust(HttpLocust):

self.num_failures = 0
self.num_success = 0
def on_failure(request_type, name, response_time, exception):
def on_failure(request_type, name, response_time, exception, **kwargs):
self.num_failures += 1
self.last_failure_exception = exception
def on_success(**kwargs):
Expand Down
6 changes: 3 additions & 3 deletions locust/test/test_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ class MyTestLocust(Locust):
server.mocked_send(Message("client_ready", None, "fake_client"))
sleep(0)

master.stats.get("/", "GET").log(100, 23455)
master.stats.get("/", "GET").log(800, 23455)
master.stats.get("/", "GET").log(700, 23455)
master.stats.get("/", "GET").log(100, 23455, 1439303539.190565)
master.stats.get("/", "GET").log(800, 23455, 1439303540.190565)
master.stats.get("/", "GET").log(700, 23455, 1439303541.190565)

data = {"user_count":1}
events.report_to_master.fire(client_id="fake_client", data=data)
Expand Down
52 changes: 26 additions & 26 deletions locust/test/test_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,21 @@ def setUp(self):
self.stats = RequestStats()
self.stats.start_time = time.time()
self.s = StatsEntry(self.stats, "test_entry", "GET")
self.s.log(45, 0)
self.s.log(135, 0)
self.s.log(44, 0)
self.s.log(45, 0, time.time())
self.s.log(135, 0, time.time())
self.s.log(44, 0, time.time())
self.s.log_error(Exception("dummy fail"))
self.s.log_error(Exception("dummy fail"))
self.s.log(375, 0)
self.s.log(601, 0)
self.s.log(35, 0)
self.s.log(79, 0)
self.s.log(375, 0, time.time())
self.s.log(601, 0, time.time())
self.s.log(35, 0, time.time())
self.s.log(79, 0, time.time())
self.s.log_error(Exception("dummy fail"))

def test_percentile(self):
s = StatsEntry(self.stats, "percentile_test", "GET")
for x in xrange(100):
s.log(x, 0)
s.log(x, 0, time.time())

self.assertEqual(
s.get_response_time_percentiles([0.5, 0.6, 0.95]),
Expand All @@ -42,10 +42,10 @@ def test_total_rps(self):
self.assertEqual(self.s.total_rps, 7)

def test_current_rps(self):
self.stats.last_request_timestamp = int(time.time()) + 4
self.stats.last_request_timestamp += 4
self.assertEqual(self.s.current_rps, 3.5)

self.stats.last_request_timestamp = int(time.time()) + 25
self.stats.last_request_timestamp += 25
self.assertEqual(self.s.current_rps, 0)

def test_num_reqs_fails(self):
Expand All @@ -57,9 +57,9 @@ def test_avg(self):

def test_reset(self):
self.s.reset()
self.s.log(756, 0)
self.s.log(756, 0, 1439303539.1)
self.s.log_error(Exception("dummy fail after reset"))
self.s.log(85, 0)
self.s.log(85, 0, 1439303539.2)

self.assertEqual(self.s.total_rps, 2)
self.assertEqual(self.s.num_requests, 2)
Expand All @@ -69,26 +69,26 @@ def test_reset(self):

def test_reset_min_response_time(self):
self.s.reset()
self.s.log(756, 0)
self.s.log(756, 0, 0)
self.assertEqual(756, self.s.min_response_time)

def test_aggregation(self):
s1 = StatsEntry(self.stats, "aggregate me!", "GET")
s1.log(12, 0)
s1.log(12, 0)
s1.log(38, 0)
s1.log(12, 0, 0)
s1.log(12, 0, 0)
s1.log(38, 0, 0)
s1.log_error("Dummy exzeption")

s2 = StatsEntry(self.stats, "aggregate me!", "GET")
s2.log_error("Dummy exzeption")
s2.log_error("Dummy exzeption")
s2.log(12, 0)
s2.log(99, 0)
s2.log(14, 0)
s2.log(55, 0)
s2.log(38, 0)
s2.log(55, 0)
s2.log(97, 0)
s2.log(12, 0, 0)
s2.log(99, 0, 0)
s2.log(14, 0, 0)
s2.log(55, 0, 0)
s2.log(38, 0, 0)
s2.log(55, 0, 0)
s2.log(97, 0, 0)

s = StatsEntry(self.stats, "GET", "")
s.extend(s1, full_request_history=True)
Expand All @@ -106,9 +106,9 @@ def test_serialize_through_message(self):
from slaves to master.
"""
s1 = StatsEntry(self.stats, "test", "GET")
s1.log(10, 0)
s1.log(20, 0)
s1.log(40, 0)
s1.log(10, 0, 0)
s1.log(20, 0, 0)
s1.log(40, 0, 0)
u1 = StatsEntry.unserialize(s1.serialize())

data = Message.unserialize(Message("dummy", s1.serialize(), "none").serialize()).data
Expand Down
10 changes: 5 additions & 5 deletions locust/test/test_web.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def test_stats_no_data(self):
self.assertEqual(200, requests.get("http://127.0.0.1:%i/stats/requests" % self.web_port).status_code)

def test_stats(self):
stats.global_stats.get("/test", "GET").log(120, 5612)
stats.global_stats.get("/test", "GET").log(120, 5612, 1439303211.743936)
response = requests.get("http://127.0.0.1:%i/stats/requests" % self.web_port)
self.assertEqual(200, response.status_code)

Expand All @@ -50,14 +50,14 @@ def test_stats(self):
self.assertEqual(120, data["stats"][0]["avg_response_time"])

def test_stats_cache(self):
stats.global_stats.get("/test", "GET").log(120, 5612)
stats.global_stats.get("/test", "GET").log(120, 5612, 1439303211.743936)
response = requests.get("http://127.0.0.1:%i/stats/requests" % self.web_port)
self.assertEqual(200, response.status_code)
data = json.loads(response.content)
self.assertEqual(2, len(data["stats"])) # one entry plus Total

# add another entry
stats.global_stats.get("/test2", "GET").log(120, 5612)
stats.global_stats.get("/test2", "GET").log(120, 5612, 1439303211.743936)
data = json.loads(requests.get("http://127.0.0.1:%i/stats/requests" % self.web_port).content)
self.assertEqual(2, len(data["stats"])) # old value should be cached now

Expand All @@ -67,12 +67,12 @@ def test_stats_cache(self):
self.assertEqual(3, len(data["stats"])) # this should no longer be cached

def test_request_stats_csv(self):
stats.global_stats.get("/test", "GET").log(120, 5612)
stats.global_stats.get("/test", "GET").log(120, 5612, 1439303211.743936)
response = requests.get("http://127.0.0.1:%i/stats/requests/csv" % self.web_port)
self.assertEqual(200, response.status_code)

def test_distribution_stats_csv(self):
stats.global_stats.get("/test", "GET").log(120, 5612)
stats.global_stats.get("/test", "GET").log(120, 5612, 1439303211.743936)
response = requests.get("http://127.0.0.1:%i/stats/distribution/csv" % self.web_port)
self.assertEqual(200, response.status_code)

Expand Down

0 comments on commit 82ed91d

Please sign in to comment.