Skip to content

Commit

Permalink
port scan inactivity timeout
Browse files Browse the repository at this point in the history
Terminate the process pool after 1 minute of inactivity.
Report each host:port that did not complete the scan.

Also:
* Each process pool job now scans a host:port.
  Previously, each process pool job scans a host.
* Single UUID for all jobs.
* Remove cylc 6.5 compatibility logic.
  Cylc 7 port scan does not work with cylc 6 suites any way.
* Improve returned data structure.
  • Loading branch information
matthewrmshin committed Nov 4, 2016
1 parent c8eb9a8 commit 6fa5c83
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 99 deletions.
24 changes: 4 additions & 20 deletions bin/cylc-scan
Original file line number Diff line number Diff line change
Expand Up @@ -207,26 +207,10 @@ def main():
state_legend = state_legend.rstrip()

skip_one = True
for result in scan_all(args, options.db, options.comms_timeout):
host, scan_result = result
try:
port, suite_identity = scan_result
except ValueError:
# Back-compat (<= 6.5.0 no title or state totals).
port, name, owner = scan_result
if not (re.match(pattern_name, name) and
re.match(pattern_owner, owner)):
continue
if options.old_format:
print name, owner, host, port
elif options.raw_format:
print "%s|%s|%s|port|%s" % (name, owner, host, port)
else:
print "%s %s@%s:%s" % (name, owner, host, port)
continue
else:
name = suite_identity['name']
owner = suite_identity['owner']
for host, port, suite_identity in scan_all(
args, options.db, options.comms_timeout):
name = suite_identity['name']
owner = suite_identity['owner']

if not (re.match(pattern_name, name) and
re.match(pattern_owner, owner)):
Expand Down
12 changes: 3 additions & 9 deletions lib/cylc/gui/dbchooser.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,9 @@ def update(self):

# Scan for running suites
choices = []
for host, scan_result in scan_all(timeout=self.timeout):
try:
port, suite_identity = scan_result
except ValueError:
# Back-compat <= 6.5.0
port, name, owner = scan_result
else:
name = suite_identity['name']
owner = suite_identity['owner']
for host, port, suite_identity in scan_all(timeout=self.timeout):
name = suite_identity['name']
owner = suite_identity['owner']
if is_remote_user(owner):
continue # current user only
auth = "%s:%d" % (host, port)
Expand Down
3 changes: 1 addition & 2 deletions lib/cylc/gui/gscan.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@
def get_hosts_suites_info(hosts, timeout=None, owner=None):
"""Return a dictionary of hosts, suites, and their properties."""
host_suites_map = {}
for host, (port, result) in scan_all(
hosts=hosts, timeout=timeout):
for host, port, result in scan_all(hosts=hosts, timeout=timeout):
if owner and owner != result.get(KEY_OWNER):
continue
if host not in host_suites_map:
Expand Down
137 changes: 69 additions & 68 deletions lib/cylc/network/https/port_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from multiprocessing import cpu_count, Pool
import sys
from time import sleep
from time import sleep, time
import traceback
from uuid import uuid4

Expand All @@ -34,73 +34,61 @@
from cylc.suite_host import get_hostname, is_remote_host, get_host_ip_by_name


def scan(host=None, db=None, timeout=None):
"""Scan ports, return a list of suites found: [(port, suite.identify())].
TIMEOUT = 60.0


def scan(host, port, db, timeout, my_uuid):
"""Connect to host:port to get suite identify.
Note that we could easily scan for a given suite+owner and return its
port instead of reading port files, but this may not always be fast enough.
"""
if host is None:
host = get_hostname()
base_port = GLOBAL_CFG.get(
['communication', 'base port'])
last_port = base_port + GLOBAL_CFG.get(
['communication', 'maximum number of ports'])
if timeout:
timeout = float(timeout)
else:
timeout = None

reg_db = RegistrationDB(db)
results = []
my_uuid = uuid4()
host_for_anon = host
if is_remote_host(host):
host_for_anon = get_host_ip_by_name(host) # IP reduces DNS traffic.
for port in range(base_port, last_port):
client = SuiteIdClientAnon(None, host=host_for_anon, port=port,
my_uuid=my_uuid, timeout=timeout)
try:
result = (port, client.identify())
except ConnectionError as exc:
if cylc.flags.debug:
traceback.print_exc()
continue
except Exception as exc:
if cylc.flags.debug:
traceback.print_exc()
raise
else:
owner = result[1].get('owner')
name = result[1].get('name')
states = result[1].get('states', None)
if cylc.flags.debug:
print ' suite:', name, owner
if states is None:
# This suite keeps its state info private.
# Try again with the passphrase if I have it.
pphrase = reg_db.load_passphrase(name, owner, host)
if pphrase:
client = SuiteIdClient(name, owner=owner, host=host,
port=port, my_uuid=my_uuid,
timeout=timeout)
try:
result = (port, client.identify())
except Exception:
# Nope (private suite, wrong passphrase).
if cylc.flags.debug:
print ' (wrong passphrase)'
else:
reg_db.cache_passphrase(
name, owner, host, pphrase)
if cylc.flags.debug:
print ' (got states with passphrase)'
results.append(result)
return results
client = SuiteIdClientAnon(
None, host=host_for_anon, port=port, my_uuid=my_uuid, timeout=timeout)
try:
result = client.identify()
except ConnectionError:
if cylc.flags.debug:
traceback.print_exc()
return None
except Exception as exc:
if cylc.flags.debug:
traceback.print_exc()
raise
owner = result.get('owner')
name = result.get('name')
states = result.get('states', None)
if cylc.flags.debug:
print >> sys.stderr, ' suite:', name, owner
if states is None:
# This suite keeps its state info private.
# Try again with the passphrase if I have it.
pphrase = RegistrationDB(db).load_passphrase(name, owner, host)
if pphrase:
client = SuiteIdClient(
name, owner=owner, host=host, port=port, my_uuid=my_uuid,
timeout=timeout)
try:
result = client.identify()
except Exception:
# Nope (private suite, wrong passphrase).
if cylc.flags.debug:
print >> sys.stderr, ' (wrong passphrase)'
else:
reg_db.cache_passphrase(name, owner, host, pphrase)
if cylc.flags.debug:
print >> sys.stderr, ' (got states with passphrase)'
return result


def scan_all(hosts=None, reg_db_path=None, timeout=None):
def scan_all(hosts=None, reg_db_path=None, timeout=TIMEOUT):
"""Scan all hosts."""
# Determine hosts to scan
if not hosts:
hosts = GLOBAL_CFG.get(["suite host scanning", "hosts"])
# Ensure that it does "localhost" only once
Expand All @@ -109,31 +97,44 @@ def scan_all(hosts=None, reg_db_path=None, timeout=None):
if not is_remote_host(host):
hosts.remove(host)
hosts.add("localhost")
# Determine ports to scan
base_port = GLOBAL_CFG.get(['communication', 'base port'])
max_ports = GLOBAL_CFG.get(['communication', 'maximum number of ports'])
# Use process pool
proc_pool_size = GLOBAL_CFG.get(["process pool size"])
if proc_pool_size is None:
proc_pool_size = cpu_count()
if proc_pool_size > len(hosts):
proc_pool_size = len(hosts)
proc_pool = Pool(proc_pool_size)
# Submit scans to process pool
timeout = float(timeout)
my_uuid = uuid4()
async_results = {}
for host in hosts:
async_results[host] = proc_pool.apply_async(
scan, [host, reg_db_path, timeout])
for port in range(base_port, base_port + max_ports):
async_results[(host, port)] = proc_pool.apply_async(
scan, [host, port, reg_db_path, timeout, my_uuid])
proc_pool.close()
# Get results back from process pool
# Terminate on inactivity
scan_results = []
scan_results_hosts = []
while async_results:
sleep(0.05)
for host, async_result in async_results.items():
terminate_time = time() + timeout
while async_results and time() < terminate_time:
sleep(0.01)
for key, async_result in async_results.items():
if async_result.ready():
async_results.pop(host)
terminate_time = time() + timeout
async_results.pop(key)
try:
res = async_result.get()
except Exception:
if cylc.flags.debug:
traceback.print_exc()
else:
scan_results.extend(res)
scan_results_hosts.extend([host] * len(res))
if res is not None:
host, port = key
scan_results.append((host, port, res))
for key in async_results:
print >> sys.stderr, 'WARNING: %s:%s connection hangs' % key
proc_pool.terminate()
proc_pool.join()
return zip(scan_results_hosts, scan_results)
return scan_results

0 comments on commit 6fa5c83

Please sign in to comment.