Skip to content

Commit

Permalink
Merge "Fix partial results sending in rps runner"
Browse files Browse the repository at this point in the history
  • Loading branch information
Jenkins authored and openstack-gerrit committed Aug 13, 2014
2 parents 619e77f + 5471a52 commit dcc9284
Showing 1 changed file with 72 additions and 50 deletions.
122 changes: 72 additions & 50 deletions rally/benchmark/runners/rps.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,67 @@

from rally.benchmark.runners import base
from rally import consts
from rally.openstack.common import log as logging
from rally import utils as rutils


LOG = logging.getLogger(__name__)
SEND_RESULT_DELAY = 1


def worker_process(rps, times, queue, scenario_context, timeout,
worker_id, cls, method_name, args):
"""Start scenario within threads.
Spawn N threads per second. Each thread runs scenario once, and appends
result to queue.
:param rps: runs per second
:param times: number of threads to be run
:param queue: queue object to append results
:param scenario_context: scenario context object
:param timeout: timeout operation
:param worker_id: id of worker process
:param cls: scenario class
:param method_name: scenario method name
:param args: scenario args
"""

def worker_thread(args):
queue.put(base._run_scenario_once(args))

pool = []
i = 0
start = time.time()
sleep = 1.0 / rps

while times > i:
i += 1
scenario_args = (("%d:%d" % (worker_id, i), cls, method_name,
scenario_context, args),)
thread = threading.Thread(target=worker_thread,
args=scenario_args)
thread.start()
pool.append(thread)

LOG.debug("Worker: %s rps: %s (requested rps: %s)" % (
worker_id, i / (time.time() - start), rps))

# try to join latest thread(s) until it finished, or until time to
# start new thread
while i / (time.time() - start) > rps:
if pool:
pool[0].join(sleep)
if not pool[0].isAlive():
pool.pop(0)
else:
time.sleep(sleep)

while pool:
thr = pool.pop(0)
thr.join()


class RPSScenarioRunner(base.ScenarioRunner):
"""Scenario runner that does the job with with specified frequency.
Expand Down Expand Up @@ -59,72 +117,36 @@ class RPSScenarioRunner(base.ScenarioRunner):

def _run_scenario(self, cls, method_name, context, args):
times = self.config["times"]
rps = self.config["rps"]
timeout = self.config.get("timeout", 600)
cpu_count = multiprocessing.cpu_count()
processes2start = cpu_count if times >= cpu_count else times
rps_per_worker = float(self.config["rps"]) / processes2start

queue = multiprocessing.Queue()
addition_args = args

class WorkerProcess(multiprocessing.Process):

def __init__(self, rps, times, queue, scenario_context, timeout,
process_id, args):
self.rps = rps
self.times = times
self.timeout = timeout
self.pool = []
self.scenario_context = scenario_context
self.id = process_id
self.args = args
self.queue = queue
super(WorkerProcess, self).__init__()

def _th_worker(self, args):
result = base._run_scenario_once(args)
self.queue.put(result)

def run(self):
for i in range(self.times):
scenario_args = (("%d:%d" % (self.id, i), cls, method_name,
self.scenario_context, self.args),)
thread = threading.Thread(target=self._th_worker,
args=scenario_args)
thread.start()
self.pool.append(thread)
time.sleep(1.0 / rps)

while len(self.pool):
thr = self.pool.pop()
thr.join(self.timeout)

process_pool = []
scenario_context = base._get_scenario_context(context)

if times <= cpu_count:
processes2start = times
else:
processes2start = cpu_count
times_per_worker, rest = divmod(times, processes2start)

for i in range(processes2start):
process = WorkerProcess(rps / float(processes2start),
times / processes2start,
queue, scenario_context, timeout,
i, addition_args)
times = times_per_worker + int(rest > 0)
rest -= 1
worker_args = (rps_per_worker, times, queue, scenario_context,
timeout, i, cls, method_name, args)
process = multiprocessing.Process(target=worker_process,
args=worker_args)
process.start()
process_pool.append(process)

while len(process_pool):
while process_pool:
for process in process_pool:
process.join(SEND_RESULT_DELAY)
if not process.is_alive():
process.join(timeout)
process.join()
process_pool.remove(process)
if not queue.empty():
self._send_result(queue.get(timeout=timeout))
time.sleep(1.0 / rps)

while not queue.empty():
result = queue.get(timeout=timeout)
self._send_result(result)
while not queue.empty():
self._send_result(queue.get())

queue.close()

0 comments on commit dcc9284

Please sign in to comment.