diff --git a/rally/benchmark/runners/rps.py b/rally/benchmark/runners/rps.py index 20aef60fce..64a6629d49 100644 --- a/rally/benchmark/runners/rps.py +++ b/rally/benchmark/runners/rps.py @@ -19,9 +19,67 @@ from rally.benchmark.runners import base from rally import consts +from rally.openstack.common import log as logging from rally import utils as rutils +LOG = logging.getLogger(__name__) +SEND_RESULT_DELAY = 1 + + +def worker_process(rps, times, queue, scenario_context, timeout, + worker_id, cls, method_name, args): + """Start scenario within threads. + + Spawn N threads per second. Each thread runs scenario once, and appends + result to queue. + + :param rps: runs per second + :param times: number of threads to be run + :param queue: queue object to append results + :param scenario_context: scenario context object + :param timeout: timeout operation + :param worker_id: id of worker process + :param cls: scenario class + :param method_name: scenario method name + :param args: scenario args + """ + + def worker_thread(args): + queue.put(base._run_scenario_once(args)) + + pool = [] + i = 0 + start = time.time() + sleep = 1.0 / rps + + while times > i: + i += 1 + scenario_args = (("%d:%d" % (worker_id, i), cls, method_name, + scenario_context, args),) + thread = threading.Thread(target=worker_thread, + args=scenario_args) + thread.start() + pool.append(thread) + + LOG.debug("Worker: %s rps: %s (requested rps: %s)" % ( + worker_id, i / (time.time() - start), rps)) + + # try to join latest thread(s) until it finished, or until time to + # start new thread + while i / (time.time() - start) > rps: + if pool: + pool[0].join(sleep) + if not pool[0].isAlive(): + pool.pop(0) + else: + time.sleep(sleep) + + while pool: + thr = pool.pop(0) + thr.join() + + class RPSScenarioRunner(base.ScenarioRunner): """Scenario runner that does the job with with specified frequency. @@ -59,72 +117,36 @@ class RPSScenarioRunner(base.ScenarioRunner): def _run_scenario(self, cls, method_name, context, args): times = self.config["times"] - rps = self.config["rps"] timeout = self.config.get("timeout", 600) cpu_count = multiprocessing.cpu_count() + processes2start = cpu_count if times >= cpu_count else times + rps_per_worker = float(self.config["rps"]) / processes2start queue = multiprocessing.Queue() - addition_args = args - - class WorkerProcess(multiprocessing.Process): - - def __init__(self, rps, times, queue, scenario_context, timeout, - process_id, args): - self.rps = rps - self.times = times - self.timeout = timeout - self.pool = [] - self.scenario_context = scenario_context - self.id = process_id - self.args = args - self.queue = queue - super(WorkerProcess, self).__init__() - - def _th_worker(self, args): - result = base._run_scenario_once(args) - self.queue.put(result) - - def run(self): - for i in range(self.times): - scenario_args = (("%d:%d" % (self.id, i), cls, method_name, - self.scenario_context, self.args),) - thread = threading.Thread(target=self._th_worker, - args=scenario_args) - thread.start() - self.pool.append(thread) - time.sleep(1.0 / rps) - - while len(self.pool): - thr = self.pool.pop() - thr.join(self.timeout) process_pool = [] scenario_context = base._get_scenario_context(context) - if times <= cpu_count: - processes2start = times - else: - processes2start = cpu_count + times_per_worker, rest = divmod(times, processes2start) for i in range(processes2start): - process = WorkerProcess(rps / float(processes2start), - times / processes2start, - queue, scenario_context, timeout, - i, addition_args) + times = times_per_worker + int(rest > 0) + rest -= 1 + worker_args = (rps_per_worker, times, queue, scenario_context, + timeout, i, cls, method_name, args) + process = multiprocessing.Process(target=worker_process, + args=worker_args) process.start() process_pool.append(process) - while len(process_pool): + while process_pool: for process in process_pool: + process.join(SEND_RESULT_DELAY) if not process.is_alive(): - process.join(timeout) + process.join() process_pool.remove(process) - if not queue.empty(): - self._send_result(queue.get(timeout=timeout)) - time.sleep(1.0 / rps) - while not queue.empty(): - result = queue.get(timeout=timeout) - self._send_result(result) + while not queue.empty(): + self._send_result(queue.get()) queue.close()