Skip to content

Commit

Permalink
Merge pull request #10 from HeyHugo/master
Browse files Browse the repository at this point in the history
Merged Hugo's branch with support for dynamic changing of locust client count, reset stats feature and more.
  • Loading branch information
heyman committed Sep 19, 2011
2 parents b83c373 + 2f938ce commit 9a1056a
Show file tree
Hide file tree
Showing 6 changed files with 275 additions and 81 deletions.
210 changes: 161 additions & 49 deletions locust/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,9 +303,12 @@ def interrupt(self, reschedule=True):


locust_runner = None


STATE_INIT, STATE_HATCHING, STATE_RUNNING, STATE_STOPPED = ["ready", "hatching", "running", "stopped"]
SLAVE_REPORT_INTERVAL = 3.0


class LocustRunner(object):
def __init__(self, locust_classes, hatch_rate, num_clients, num_requests=None, host=None):
self.locust_classes = locust_classes
Expand Down Expand Up @@ -335,73 +338,120 @@ def errors(self):
@property
def user_count(self):
return len(self.locusts)

def hatch(self, stop_timeout=None):
if self.num_requests is not None:
RequestStats.global_max_requests = self.num_requests


def weight_locusts(self, amount, stop_timeout = None):
"""
Distributes the amount of locusts for each WebLocust-class according to it's weight
and a list: bucket with the weighted locusts is returned
"""
bucket = []
weight_sum = sum((locust.weight for locust in self.locust_classes))
for locust in self.locust_classes:
if not locust.tasks:
warnings.warn("Notice: Found locust (%s) got no tasks. Skipping..." % locust.__name__)
continue

if self.host is not None:
locust.host = self.host
if stop_timeout is not None:
locust.stop_timeout = stop_timeout

# create locusts depending on weight
percent = locust.weight / float(weight_sum)
num_locusts = int(round(self.num_clients * percent))
num_locusts = int(round(amount * percent))
bucket.extend([locust for x in xrange(0, num_locusts)])
return bucket

def hatch(self, spawn_count=None, stop_timeout=None, wait=False):
if spawn_count is None:
spawn_count = self.num_clients

if self.num_requests is not None:
RequestStats.global_max_requests = self.num_requests

bucket = self.weight_locusts(spawn_count, stop_timeout)
spawn_count = len(bucket)
if self.state == STATE_INIT or self.state == STATE_STOPPED:
self.state = STATE_HATCHING
self.num_clients = spawn_count
else:
self.num_clients += spawn_count

print "\nHatching and swarming %i clients at the rate %g clients/s...\n" % (self.num_clients, self.hatch_rate)
print "\nHatching and swarming %i clients at the rate %g clients/s...\n" % (spawn_count, self.hatch_rate)
occurence_count = dict([(l.__name__, 0) for l in self.locust_classes])
total_locust_count = len(bucket)

def spawn_locusts():
sleep_time = 1.0 / self.hatch_rate
while True:
if not bucket:
print "All locusts hatched: %s" % ", ".join(["%s: %d" % (name, count) for name, count in occurence_count.iteritems()])
events.hatch_complete.fire(total_locust_count)
events.hatch_complete.fire(self.num_clients)
return

locust = bucket.pop(random.randint(0, len(bucket)-1))
occurence_count[locust.__name__] += 1
def start_locust():
def start_locust(_):
try:
locust()()
except RescheduleTaskImmediately:
pass
except GreenletExit:
pass
new_locust = self.locusts.spawn(start_locust)
new_locust = self.locusts.spawn(start_locust, locust)
if len(self.locusts) % 10 == 0:
print "%i locusts hatched" % len(self.locusts)
gevent.sleep(sleep_time)

spawn_locusts()
self.locusts.join()
print "All locusts dead\n"
print_stats(self.request_stats)
print_percentile_stats(self.request_stats) #TODO use an event listener, or such, for this?

def start_hatching(self, locust_count=None, hatch_rate=None):
if locust_count:
self.num_clients = locust_count
if hatch_rate:
self.hatch_rate = hatch_rate

if wait:
self.locusts.join()
print "All locusts dead\n"
print_stats(self.request_stats)
print_percentile_stats(self.request_stats) #TODO use an event listener, or such, for this?

def kill_locusts(self, kill_count):
"""
Kill a kill_count of weighted locusts from the Group() object in self.locusts
"""
bucket = self.weight_locusts(kill_count)
kill_count = len(bucket)
self.num_clients -= kill_count
print "killing locusts:", kill_count
dying = []
for g in self.locusts:
for l in bucket:
if l == g.args[0]:
dying.append(g)
bucket.remove(l)
break
for g in dying:
self.locusts.killone(g)
events.hatch_complete.fire(self.num_clients)

def start_hatching(self, locust_count=None, hatch_rate=None, wait=False):
print "start hatching", locust_count, hatch_rate, self.state
if self.state != STATE_RUNNING and self.state != STATE_HATCHING:
RequestStats.clear_all()

RequestStats.global_start_time = time()
self.state = STATE_HATCHING
self.hatch()

RequestStats.clear_all()
RequestStats.global_start_time = time()
# Dynamically changing the locust count
if self.state != STATE_INIT and self.state != STATE_STOPPED:
self.state = STATE_HATCHING
if self.num_clients > locust_count:
# Kill some locusts
kill_count = self.num_clients - locust_count
self.kill_locusts(kill_count)
elif self.num_clients < locust_count:
# Spawn some locusts
if hatch_rate:
self.hatch_rate = hatch_rate
spawn_count = locust_count - self.num_clients
self.hatch(spawn_count=spawn_count)
else:
if locust_count:
self.hatch(locust_count, wait=wait)
else:
self.hatch(wait=wait)

def stop(self):
# if we are currently hatching locusts we need to kill the hatching greenlet first
if self.hatching_greenlet and not self.hatching_greenlet.ready():
Expand All @@ -410,8 +460,8 @@ def stop(self):
self.state = STATE_STOPPED

class LocalLocustRunner(LocustRunner):
def start_hatching(self, locust_count=None, hatch_rate=None):
self.hatching_greenlet = gevent.spawn(lambda: super(LocalLocustRunner, self).start_hatching(locust_count, hatch_rate))
def start_hatching(self, locust_count=None, hatch_rate=None, wait=False):
self.hatching_greenlet = gevent.spawn(lambda: super(LocalLocustRunner, self).start_hatching(locust_count, hatch_rate, wait=wait))
self.greenlet = self.hatching_greenlet

class DistributedLocustRunner(LocustRunner):
Expand Down Expand Up @@ -464,27 +514,88 @@ def on_slave_report(client_id, data):
def user_count(self):
return sum([c.user_count for c in self.clients.itervalues()])

def start_hatching(self, locust_count=None, hatch_rate=None):
if locust_count:
self.num_clients = locust_count / (len(self.clients.ready) or 1)
if hatch_rate:
self.hatch_rate = float(hatch_rate) / (len(self.clients.ready) or 1)

print "Sending hatch jobs to %i ready clients" % len(self.clients.ready)
if not len(self.clients.ready):
def start_hatching(self, locust_count, hatch_rate):
self.num_clients = locust_count
slave_num_clients = locust_count / ((len(self.clients.ready) + len(self.clients.running)) or 1)
slave_hatch_rate = float(hatch_rate) / ((len(self.clients.ready) + len(self.clients.running)) or 1)

print "Sending hatch jobs to %i ready clients" % (len(self.clients.ready) + len(self.clients.running))
if not (len(self.clients.ready)+len(self.clients.running)):
print "WARNING: You are running in distributed mode but have no slave servers connected."
print "Please connect slaves prior to swarming."

if self.state != STATE_RUNNING and self.state != STATE_HATCHING:
RequestStats.clear_all()

for client in self.clients.itervalues():
msg = {"hatch_rate":self.hatch_rate, "num_clients":self.num_clients, "num_requests": self.num_requests, "host":self.host, "stop_timeout":None}
self.server.send({"type":"start", "data":msg})
msg = {"hatch_rate":slave_hatch_rate, "num_clients":slave_num_clients, "num_requests": self.num_requests, "host":self.host, "stop_timeout":None}
self.server.send({"type":"hatch", "data":msg})

RequestStats.global_start_time = time()
self.state = STATE_HATCHING


def start_ramping(self, hatch_rate=None, max_locusts=1000, hatch_stride=None, percent=0.95, response_time=2000, acceptable_fail=0.05):
if hatch_rate:
self.hatch_rate = hatch_rate

if not hatch_stride:
hatch_stride = 100

clients = hatch_stride

# Record low load percentile
def calibrate():
self.start_hatching(clients, self.hatch_rate)
while True:
if self.state != STATE_HATCHING:
print "recording low_percentile..."
gevent.sleep(30)
percentile = RequestStats.sum_stats().one_percentile(percent)
print "low_percentile:", percentile
self.start_hatching(1, self.hatch_rate)
return percentile
gevent.sleep(1)

low_percentile = calibrate()

while True:
if self.state != STATE_HATCHING:
if self.num_clients >= max_locusts:
print "ramping stopped due to max_locusts limit reached:", max_locusts
return
gevent.sleep(10)
if RequestStats.sum_stats().fail_ratio >= acceptable_fail:
print "ramping stopped due to acceptable_fail ratio (%d1.2%%) exceeded with fail ratio %1.2d%%", (acceptable_fail*100, RequestStats.sum_stats().fail_ratio*100)
return
p = RequestStats.sum_stats().one_percentile(percent)
if p >= low_percentile * 2.0:
print "ramping stopped due to response times getting high:", p
return
self.start_hatching(clients, self.hatch_rate)
clients += hatch_stride
gevent.sleep(1)

# while True:
# if self.state != STATE_HATCHING:
# print "self.num_clients: %i max_locusts: %i" % (self.num_clients, max_locusts)
# if self.num_clients >= max_locusts:
# print "ramping stopped due to max_locusts limit reached:", max_locusts
# return
# gevent.sleep(5)
# if self.state != STATE_INIT:
# print "num_reqs: %i fail_ratio: %1.2d" % (RequestStats.sum_stats().num_reqs, RequestStats.sum_stats().fail_ratio)
# while RequestStats.sum_stats().num_reqs < 100:
# if RequestStats.sum_stats().fail_ratio >= acceptable_fail:
# print "ramping stopped due to acceptable_fail ratio (%d1.2%%) exceeded with fail ratio %1.2d%%", (acceptable_fail*100, RequestStats.sum_stats().fail_ratio*100)
# return
# gevent.sleep(1)
# if RequestStats.sum_stats().one_percentile(percent) >= response_time:
# print "ramping stopped due to response times over %ims for %1.2f%%" % (response_time, percent*100)
# return
# self.start_hatching(clients, self.hatch_rate)
# clients += 10 * hatchrate
# gevent.sleep(1)

def stop(self):
for client in self.clients.hatching + self.clients.running:
self.server.send({"type":"stop", "data":{}})
Expand All @@ -504,7 +615,7 @@ def client_listener(self):
elif msg["type"] == "stats":
report = msg["data"]
events.slave_report.fire(report["client_id"], report["data"])
elif msg["type"] == "running":
elif msg["type"] == "hatching":
id = msg["data"]
self.clients[id].state = STATE_HATCHING
elif msg["type"] == "hatch_complete":
Expand All @@ -514,7 +625,7 @@ def client_listener(self):
if len(self.clients.hatching) == 0:
count = sum(c.user_count for c in self.clients.itervalues())
events.hatch_complete.fire(count)

@property
def slave_count(self):
return len(self.clients.ready) + len(self.clients.hatching) + len(self.clients.running)
Expand Down Expand Up @@ -543,19 +654,20 @@ def on_report_to_master(client_id, data):
def worker(self):
while True:
msg = self.client.recv()
if msg["type"] == "start":
self.client.send({"type":"running", "data":self.client_id})
if msg["type"] == "hatch":
self.client.send({"type":"hatching", "data":self.client_id})
job = msg["data"]
self.hatch_rate = job["hatch_rate"]
self.num_clients = job["num_clients"]
#self.num_clients = job["num_clients"]
self.num_requests = job["num_requests"]
self.host = job["host"]
self.hatching_greenlet = gevent.spawn(lambda: self.start_hatching())
self.hatching_greenlet = gevent.spawn(lambda: self.start_hatching(locust_count=job["num_clients"], hatch_rate=job["hatch_rate"]))
elif msg["type"] == "stop":
self.stop()
self.client.send({"type":"client_stopped", "data":self.client_id})
self.client.send({"type":"client_ready", "data":self.client_id})



def stats_reporter(self):
while True:
data = {}
Expand Down
2 changes: 1 addition & 1 deletion locust/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ def main():
core.locust_runner = LocalLocustRunner(locust_classes, options.hatch_rate, options.num_clients, options.num_requests, options.host)
# spawn client spawning/hatching greenlet
if not options.web:
core.locust_runner.start_hatching()
core.locust_runner.start_hatching(wait=True)
main_greenlet = core.locust_runner.greenlet
elif options.master:
core.locust_runner = MasterLocustRunner(locust_classes, options.hatch_rate, options.num_clients, num_requests=options.num_requests, host=options.host, master_host=options.master_host)
Expand Down
Loading

0 comments on commit 9a1056a

Please sign in to comment.