Skip to content

Commit

Permalink
Ansible launcher: add private gearman function
Browse files Browse the repository at this point in the history
OpenStack's Zuul's gear server is spending most of its time dealing
with function registration (800 nodes, 20k functions each).
Alleviate the pain by adding a private gearman function to register
functions en masse.  In preliminary testing, this causes function
registration to take 27% of the current time.

Change-Id: I0d2342cadca5e3d6d6c1964a119e0d50b0bcc548
  • Loading branch information
James E. Blair committed Jun 13, 2016
1 parent 4829398 commit d437159
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 11 deletions.
12 changes: 6 additions & 6 deletions zuul/cmd/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,18 +117,18 @@ def start_gear_server(self):
if child_pid == 0:
os.close(pipe_write)
self.setup_logging('gearman_server', 'log_config')
import gear
import zuul.lib.gearserver
statsd_host = os.environ.get('STATSD_HOST')
statsd_port = int(os.environ.get('STATSD_PORT', 8125))
if self.config.has_option('gearman_server', 'listen_address'):
host = self.config.get('gearman_server', 'listen_address')
else:
host = None
gear.Server(4730,
host=host,
statsd_host=statsd_host,
statsd_port=statsd_port,
statsd_prefix='zuul.geard')
zuul.lib.gearserver.GearServer(4730,
host=host,
statsd_host=statsd_host,
statsd_port=statsd_port,
statsd_prefix='zuul.geard')

# Keep running until the parent dies:
pipe_read = os.fdopen(pipe_read)
Expand Down
20 changes: 15 additions & 5 deletions zuul/launcher/ansiblelaunchserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,19 @@ def boolify(x):
return bool(x)


class GearWorker(gear.Worker):
MASS_DO = 101

def sendMassDo(self, functions):
data = b'\x00'.join([gear.convert_to_bytes(x) for x in functions])
self.broadcast_lock.acquire()
try:
p = gear.Packet(gear.constants.REQ, self.MASS_DO, data)
self.broadcast(p)
finally:
self.broadcast_lock.release()


class Watchdog(object):
def __init__(self, timeout, function, args):
self.timeout = timeout
Expand Down Expand Up @@ -518,7 +531,7 @@ def run(self):
port = self.config.get('gearman', 'port')
else:
port = 4730
self.worker = gear.Worker(self.name)
self.worker = GearWorker(self.name)
self.worker.addServer(server, port)
self.log.debug("Waiting for server")
self.worker.waitForServer()
Expand Down Expand Up @@ -648,10 +661,7 @@ def register(self):
new_functions = set()
for job in self.jobs.values():
new_functions |= self.generateFunctionNames(job)
for function in new_functions - self.registered_functions:
self.worker.registerFunction(function)
for function in self.registered_functions - new_functions:
self.worker.unRegisterFunction(function)
self.worker.sendMassDo(new_functions)
self.registered_functions = new_functions

def abortRunningJob(self):
Expand Down
35 changes: 35 additions & 0 deletions zuul/lib/gearserver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Copyright 2016 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import gear

MASS_DO = 101


class GearServer(gear.Server):
def handlePacket(self, packet):
if packet.ptype == MASS_DO:
self.log.info("Received packet from %s: %s" % (packet.connection,
packet))
self.handleMassDo(packet)
else:
return super(GearServer, self).handlePacket(packet)

def handleMassDo(self, packet):
packet.connection.functions = set()
for name in packet.data.split(b'\x00'):
self.log.debug("Adding function %s to %s" % (
name, packet.connection))
packet.connection.functions.add(name)
self.functions.add(name)

0 comments on commit d437159

Please sign in to comment.