Skip to content

Commit

Permalink
#838: ensure we buffer any pending reads on the connection we forward…
Browse files Browse the repository at this point in the history
… to the tcp proxy so we don't lose packets, steal_connection() also disables the connection object until we are ready to deal with incoming data again - which should ensure the io threads exit more promptly

git-svn-id: https://xpra.org/svn/Xpra/trunk@9160 3bb7dfac-3a0b-4e04-842a-767bc560f471
  • Loading branch information
totaam committed Apr 27, 2015
1 parent a60f086 commit 22cd290
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 13 deletions.
17 changes: 15 additions & 2 deletions src/xpra/net/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def __init__(self, scheduler, conn, process_packet_cb, get_packet_cb=None):
self._process_packet_cb = process_packet_cb
self._write_queue = Queue(1)
self._read_queue = Queue(20)
self._read_queue_put = self._read_queue.put
# Invariant: if .source is None, then _source_has_more == False
self._get_packet_cb = get_packet_cb
#counters:
Expand Down Expand Up @@ -165,6 +166,7 @@ def wait_for_io_threads_exit(self, timeout=None):
exited = True
for t in (self._read_thread, self._write_thread):
if t.isAlive():
log.warn("%s thread of %s has not yet exited (timeout=%s)", t.name, self._conn, timeout)
exited = False
break
return exited
Expand Down Expand Up @@ -549,7 +551,8 @@ def _read_thread_loop(self):
def _read(self):
buf = self._conn.read(READ_BUFFER_SIZE)
#log("read thread: got data of size %s: %s", len(buf), repr_ellipsized(buf))
self._read_queue.put(buf)
#add to the read queue (or whatever takes its place - see steal_connection)
self._read_queue_put(buf)
if not buf:
log("read thread: eof")
self.close()
Expand Down Expand Up @@ -871,13 +874,23 @@ def close(self):
self.terminate_queue_threads()
self.idle_add(self.clean)

def steal_connection(self):
def steal_connection(self, read_callback=None):
#so we can re-use this connection somewhere else
#(frees all protocol threads and resources)
#Note: this method can only be used with non-blocking sockets,
#and if more than one packet can arrive, the read_callback should be used
#to ensure that no packets get lost.
#The caller must call wait_for_io_threads_exit() to ensure that this
#class is no longer reading from the connection before it can re-use it
assert not self._closed
if read_callback:
self._read_queue_put = read_callback
conn = self._conn
self._closed = True
self._conn = None
if conn:
#this ensures that we exit the untilConcludes() read/write loop
conn.set_active(False)
self.terminate_queue_threads()
return conn

Expand Down
16 changes: 9 additions & 7 deletions src/xpra/server/proxy_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
log = Logger("proxy")


from xpra.util import LOGIN_TIMEOUT, AUTHENTICATION_ERROR, SESSION_NOT_FOUND
from xpra.util import LOGIN_TIMEOUT, AUTHENTICATION_ERROR, SESSION_NOT_FOUND, repr_ellipsized
from xpra.server.proxy_instance_process import ProxyInstanceProcess
from xpra.server.server_core import ServerCore
from xpra.scripts.config import make_defaults_struct
Expand Down Expand Up @@ -185,7 +185,10 @@ def parse_error(*args):
return
log("server connection=%s", server_conn)

client_conn = client_proto.steal_connection()
#no other packets should be arriving until the proxy instance responds to the initial hello packet
def unexpected_packet(*args):
log.warn("received an unexpected packet on the proxy connection: %s", [repr_ellipsized(x) for x in args])
client_conn = client_proto.steal_connection(unexpected_packet)
client_state = client_proto.save_state()
cipher = None
encryption_key = None
Expand All @@ -200,16 +203,15 @@ def parse_error(*args):
def do_start_proxy():
log("do_start_proxy()")
try:
#stop IO in proxy:
#(it may take up to _socket_timeout until the thread exits)
client_conn.set_active(False)
ioe = client_proto.wait_for_io_threads_exit(0.1+self._socket_timeout)
ioe = client_proto.wait_for_io_threads_exit(0.5+self._socket_timeout)
if not ioe:
log.error("IO threads have failed to terminate!")
log.error("some network IO threads have failed to terminate!")
return
#now we can go back to using blocking sockets:
self.set_socket_timeout(client_conn, None)
client_conn.set_active(True)
#maybe this is safe to enable?
#self.set_socket_timeout(server_conn, None)

assert uid!=0 and gid!=0
message_queue = MQueue()
Expand Down
27 changes: 23 additions & 4 deletions src/xpra/server/server_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from xpra.scripts.server import deadly_signal
from xpra.net.bytestreams import SocketConnection
from xpra.platform import set_application_name
from xpra.os_util import load_binary_file, get_machine_id, get_user_uuid, SIGNAMES
from xpra.os_util import load_binary_file, get_machine_id, get_user_uuid, SIGNAMES, Queue
from xpra.version_util import version_compat_check, get_version_info, get_platform_info, get_host_info, local_version
from xpra.net.protocol import Protocol, get_network_caps, sanity_checks
from xpra.net.crypto import new_cipher_caps
Expand Down Expand Up @@ -403,7 +403,9 @@ def start_tcp_proxy(self, proto, data):
proxylog("start_tcp_proxy(%s, '%s')", proto, repr_ellipsized(data))
self._potential_protocols.remove(proto)
proxylog("start_tcp_proxy: protocol state before stealing: %s", proto.get_info(alias_info=False))
client_connection = proto.steal_connection()
#any buffers read after we steal the connection will be placed in this temporary queue:
temp_read_buffer = Queue()
client_connection = proto.steal_connection(temp_read_buffer.put)
#connect to web server:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(10)
Expand All @@ -415,9 +417,26 @@ def start_tcp_proxy(self, proto, data):
proto.gibberish("invalid packet header", data)
return
proxylog("proxy connected to tcp server at %s:%s : %s", host, port, web_server_connection)
sock.settimeout(0.5)
self.set_socket_timeout(client_connection, 0.5)
sock.settimeout(self._socket_timeout)

ioe = proto.wait_for_io_threads_exit(0.5+self._socket_timeout)
if not ioe:
proxylog.warn("proxy failed to stop all existing network threads!")
self.disconnect_protocol(proto, "internal threading error")
return
#now that we own it, we can start it again:
client_connection.set_active(True)
#and we can use blocking sockets:
self.set_socket_timeout(client_connection, None)
sock.settimeout(None)

proxylog("pushing initial buffer to its new destination: %s", repr_ellipsized(data))
web_server_connection.write(data)
while not temp_read_buffer.empty():
buf = temp_read_buffer.get()
if buf:
proxylog("pushing read buffer to its new destination: %s", repr_ellipsized(buf))
web_server_connection.write(buf)
p = XpraProxy(client_connection.target, client_connection, web_server_connection)
self._tcp_proxy_clients.append(p)
proxylog.info("client connection from %s forwarded to proxy server on %s:%s", client_connection.target, host, port)
Expand Down

0 comments on commit 22cd290

Please sign in to comment.