Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

port scan inactivity timeout #2060

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ install:
# Run tests
script:
# Custom diff command to ignore Xlib errors (xvfb has not RANDR extension).
- export CYLC_TEST_DIFF_CMD='diff -I Xlib'
- export CYLC_TEST_DIFF_CMD='diff -I Xlib -u'
# Only run the generic tests on Travis CI.
- export CYLC_TEST_RUN_PLATFORM=false
# Run tests with virtual frame buffer for X support.
Expand Down
28 changes: 6 additions & 22 deletions bin/cylc-scan
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ def main():
parser.add_option(
"--comms-timeout", "--pyro-timeout", metavar="SEC",
help="Set a timeout for network connections "
"to running suites. The default is 60 seconds.",
action="store", default=60, dest="comms_timeout")
"to each running suite. The default is 5 seconds.",
action="store", default=None, dest="comms_timeout")

parser.add_option(
"--old", "--old-format",
Expand Down Expand Up @@ -207,26 +207,10 @@ def main():
state_legend = state_legend.rstrip()

skip_one = True
for result in scan_all(args, options.db, options.comms_timeout):
host, scan_result = result
try:
port, suite_identity = scan_result
except ValueError:
# Back-compat (<= 6.5.0 no title or state totals).
port, name, owner = scan_result
if not (re.match(pattern_name, name) and
re.match(pattern_owner, owner)):
continue
if options.old_format:
print name, owner, host, port
elif options.raw_format:
print "%s|%s|%s|port|%s" % (name, owner, host, port)
else:
print "%s %s@%s:%s" % (name, owner, host, port)
continue
else:
name = suite_identity['name']
owner = suite_identity['owner']
for host, port, suite_identity in scan_all(
args, options.db, options.comms_timeout):
name = suite_identity['name']
owner = suite_identity['owner']

if not (re.match(pattern_name, name) and
re.match(pattern_owner, owner)):
Expand Down
12 changes: 3 additions & 9 deletions lib/cylc/gui/dbchooser.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,9 @@ def update(self):

# Scan for running suites
choices = []
for host, scan_result in scan_all(timeout=self.timeout):
try:
port, suite_identity = scan_result
except ValueError:
# Back-compat <= 6.5.0
port, name, owner = scan_result
else:
name = suite_identity['name']
owner = suite_identity['owner']
for host, port, suite_identity in scan_all(timeout=self.timeout):
name = suite_identity['name']
owner = suite_identity['owner']
if is_remote_user(owner):
continue # current user only
auth = "%s:%d" % (host, port)
Expand Down
3 changes: 1 addition & 2 deletions lib/cylc/gui/gscan.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@
def get_hosts_suites_info(hosts, timeout=None, owner=None):
"""Return a dictionary of hosts, suites, and their properties."""
host_suites_map = {}
for host, (port, result) in scan_all(
hosts=hosts, timeout=timeout):
for host, port, result in scan_all(hosts=hosts, timeout=timeout):
if owner and owner != result.get(KEY_OWNER):
continue
if host not in host_suites_map:
Expand Down
10 changes: 10 additions & 0 deletions lib/cylc/network/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@ def __str__(self):
return self.MESSAGE % (self.args[0], self.args[1], self.args[2])


class ConnectionTimeout(ConnectionError):

"""An error raised on connection timeout."""

MESSAGE = "Connection timeout: %s: %s"

def __str__(self):
return self.MESSAGE % (self.args[0], self.args[1])


def access_priv_ok(server_obj, required_privilege_level):
"""Return True if a client is allowed access to info from server_obj.

Expand Down
17 changes: 13 additions & 4 deletions lib/cylc/network/https/base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
from cylc.exceptions import PortFileError
import cylc.flags
from cylc.network import (
ConnectionError, ConnectionDeniedError, NO_PASSPHRASE, handle_proxies)
ConnectionError, ConnectionDeniedError, ConnectionTimeout, NO_PASSPHRASE,
handle_proxies)
from cylc.owner import is_remote_user, USER
from cylc.registration import RegistrationDB, PassphraseError
from cylc.suite_host import get_hostname, is_remote_host
Expand Down Expand Up @@ -134,7 +135,12 @@ def get_data_from_url_with_requests(self, url, json_data, method=None):
import traceback
traceback.print_exc()
raise ConnectionError(url, exc)
except Exception as exc:
except requests.exceptions.Timeout as exc:
if cylc.flags.debug:
import traceback
traceback.print_exc()
raise ConnectionTimeout(url, exc)
except requests.exceptions.RequestException as exc:
if cylc.flags.debug:
import traceback
traceback.print_exc()
Expand All @@ -151,7 +157,7 @@ def get_data_from_url_with_requests(self, url, json_data, method=None):
sys.stderr.write(ret.text)
try:
ret.raise_for_status()
except Exception as exc:
except requests.exceptions.HTTPError as exc:
if cylc.flags.debug:
import traceback
traceback.print_exc()
Expand Down Expand Up @@ -203,7 +209,10 @@ def get_data_from_url_with_urllib2(self, url, json_data, method=None):
if cylc.flags.debug:
import traceback
traceback.print_exc()
raise ConnectionError(url, exc)
if "timed out" in str(exc):
raise ConnectionTimeout(url, exc)
else:
raise ConnectionError(url, exc)
except Exception as exc:
if cylc.flags.debug:
import traceback
Expand Down
197 changes: 120 additions & 77 deletions lib/cylc/network/https/port_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,90 +17,87 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Port scan utilities."""

from multiprocessing import cpu_count, Pool
from multiprocessing import cpu_count, Process, Pipe
import sys
from time import sleep
from time import sleep, time
import traceback
from uuid import uuid4

from cylc.cfgspec.globalcfg import GLOBAL_CFG
import cylc.flags
from cylc.network import NO_PASSPHRASE, ConnectionError
from cylc.network import ConnectionError, ConnectionTimeout
from cylc.network.https.suite_state_client import SuiteStillInitialisingError
from cylc.network.https.suite_identifier_client import (
SuiteIdClientAnon, SuiteIdClient)
from cylc.owner import USER
from cylc.registration import RegistrationDB
from cylc.suite_host import get_hostname, is_remote_host, get_host_ip_by_name
from cylc.suite_host import is_remote_host, get_host_ip_by_name


def scan(host=None, db=None, timeout=None):
"""Scan ports, return a list of suites found: [(port, suite.identify())].
CONNECT_TIMEOUT = 5.0
INACTIVITY_TIMEOUT = 10.0
MSG_QUIT = "QUIT"
MSG_TIMEOUT = "TIMEOUT"
SLEEP_INTERVAL = 0.01

Note that we could easily scan for a given suite+owner and return its
port instead of reading port files, but this may not always be fast enough.
"""
if host is None:
host = get_hostname()
base_port = GLOBAL_CFG.get(
['communication', 'base port'])
last_port = base_port + GLOBAL_CFG.get(
['communication', 'maximum number of ports'])
if timeout:
timeout = float(timeout)
else:
timeout = None

reg_db = RegistrationDB(db)
results = []
my_uuid = uuid4()
host_for_anon = host
if is_remote_host(host):
host_for_anon = get_host_ip_by_name(host) # IP reduces DNS traffic.
for port in range(base_port, last_port):
client = SuiteIdClientAnon(None, host=host_for_anon, port=port,
my_uuid=my_uuid, timeout=timeout)
def _scan1_impl(conn, reg_db_path, timeout, my_uuid):
"""Connect to host:port to get suite identify."""
while True:
if not conn.poll(SLEEP_INTERVAL):
continue
item = conn.recv()
if item == MSG_QUIT:
break
host, port = item
host_anon = host
if is_remote_host(host):
host_anon = get_host_ip_by_name(host) # IP reduces DNS traffic
client = SuiteIdClientAnon(
None, host=host_anon, port=port, my_uuid=my_uuid, timeout=timeout)
try:
result = (port, client.identify())
result = client.identify()
except ConnectionTimeout as exc:
conn.send((host, port, MSG_TIMEOUT))
except ConnectionError as exc:
if cylc.flags.debug:
traceback.print_exc()
continue
except Exception as exc:
if cylc.flags.debug:
traceback.print_exc()
raise
conn.send((host, port, None))
else:
owner = result[1].get('owner')
name = result[1].get('name')
states = result[1].get('states', None)
owner = result.get('owner')
name = result.get('name')
states = result.get('states', None)
if cylc.flags.debug:
print ' suite:', name, owner
print >> sys.stderr, ' suite:', name, owner
if states is None:
# This suite keeps its state info private.
# Try again with the passphrase if I have it.
reg_db = RegistrationDB(reg_db_path)
pphrase = reg_db.load_passphrase(name, owner, host)
if pphrase:
client = SuiteIdClient(name, owner=owner, host=host,
port=port, my_uuid=my_uuid,
timeout=timeout)
client = SuiteIdClient(
name, owner=owner, host=host, port=port,
my_uuid=my_uuid, timeout=timeout)
try:
result = (port, client.identify())
result = client.identify()
except Exception:
# Nope (private suite, wrong passphrase).
if cylc.flags.debug:
print ' (wrong passphrase)'
print >> sys.stderr, ' (wrong passphrase)'
else:
reg_db.cache_passphrase(
name, owner, host, pphrase)
reg_db.cache_passphrase(name, owner, host, pphrase)
if cylc.flags.debug:
print ' (got states with passphrase)'
results.append(result)
return results
print >> sys.stderr, (
' (got states with passphrase)')
conn.send((host, port, result))
conn.close()


def scan_all(hosts=None, reg_db_path=None, timeout=None):
"""Scan all hosts."""
try:
timeout = float(timeout)
except:
timeout = CONNECT_TIMEOUT
my_uuid = uuid4()
# Determine hosts to scan
if not hosts:
hosts = GLOBAL_CFG.get(["suite host scanning", "hosts"])
# Ensure that it does "localhost" only once
Expand All @@ -109,31 +106,77 @@ def scan_all(hosts=None, reg_db_path=None, timeout=None):
if not is_remote_host(host):
hosts.remove(host)
hosts.add("localhost")
proc_pool_size = GLOBAL_CFG.get(["process pool size"])
if proc_pool_size is None:
proc_pool_size = cpu_count()
if proc_pool_size > len(hosts):
proc_pool_size = len(hosts)
proc_pool = Pool(proc_pool_size)
async_results = {}
# Determine ports to scan
base_port = GLOBAL_CFG.get(['communication', 'base port'])
max_ports = GLOBAL_CFG.get(['communication', 'maximum number of ports'])
# Number of child processes
max_procs = GLOBAL_CFG.get(["process pool size"])
if max_procs is None:
max_procs = cpu_count()
# To do and wait (submitted, waiting for results) sets
todo_set = set()
wait_set = set()
for host in hosts:
async_results[host] = proc_pool.apply_async(
scan, [host, reg_db_path, timeout])
proc_pool.close()
scan_results = []
scan_results_hosts = []
while async_results:
sleep(0.05)
for host, async_result in async_results.items():
if async_result.ready():
async_results.pop(host)
try:
res = async_result.get()
except Exception:
if cylc.flags.debug:
traceback.print_exc()
for port in range(base_port, base_port + max_ports):
todo_set.add((host, port))
proc_items = []
results = []
while todo_set or proc_items:
no_action = True
# Get results back from child processes where possible
busy_proc_items = []
while proc_items:
proc, my_conn, terminate_time = proc_items.pop()
if my_conn.poll():
host, port, result = my_conn.recv()
if result is None:
# Can't connect, ignore
wait_set.remove((host, port))
elif result == MSG_TIMEOUT:
# Connection timeout, leave in "wait_set"
pass
else:
# Connection success
results.append((host, port, result))
wait_set.remove((host, port))
if todo_set:
# Immediately give the child process something to do
host, port = todo_set.pop()
wait_set.add((host, port))
my_conn.send((host, port))
busy_proc_items.append(
(proc, my_conn, time() + INACTIVITY_TIMEOUT))
else:
scan_results.extend(res)
scan_results_hosts.extend([host] * len(res))
proc_pool.join()
return zip(scan_results_hosts, scan_results)
# Or quit if there is nothing left to do
my_conn.send(MSG_QUIT)
my_conn.close()
proc.join()
no_action = False
elif time() > terminate_time:
# Terminate child process if it is taking too long
proc.terminate()
proc.join()
no_action = False
else:
busy_proc_items.append((proc, my_conn, terminate_time))
proc_items += busy_proc_items
# Create some child processes where necessary
while len(proc_items) < max_procs and todo_set:
my_conn, conn = Pipe()
proc = Process(target=_scan1_impl, args=(
conn, reg_db_path, timeout, my_uuid))
proc.start()
host, port = todo_set.pop()
wait_set.add((host, port))
my_conn.send((host, port))
proc_items.append((proc, my_conn, time() + INACTIVITY_TIMEOUT))
no_action = False
if no_action:
sleep(SLEEP_INTERVAL)
# Report host:port with no results
if wait_set:
print >> sys.stderr, (
'WARNING, scan timed out, no result for the following:')
for key in sorted(wait_set):
print >> sys.stderr, ' %s:%s' % key
return results
Loading