Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Initial TCP protocol implementation
Browse files Browse the repository at this point in the history
This defines the low level TCP replication protocol
  • Loading branch information
erikjohnston committed Mar 27, 2017
1 parent 0762bf3 commit d5fb256
Show file tree
Hide file tree
Showing 3 changed files with 1,064 additions and 0 deletions.
52 changes: 52 additions & 0 deletions synapse/replication/tcp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,55 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""This module implements the TCP replication protocol used by synapse to
communicate between the master process and its workers (when they're enabled).
The protocol is based on fire and forget, line based commands. An example flow
would be (where '>' indicates master->worker and '<' worker->master flows)::
> SERVER example.com
< REPLICATE events 53
> RDATA events 54 ["$foo1:bar.com", ...]
> RDATA events 55 ["$foo4:bar.com", ...]
The example shows the server accepting a new connection and sending its identity
with the `SERVER` command, followed by the client asking to subscribe to the
`events` stream from the token `53`. The server then periodically sends `RDATA`
commands which have the format `RDATA <stream_name> <token> <row>`, where the
format of `<row>` is defined by the individual streams.
Error reporting happens by either the client or server sending an `ERROR`
command, and usually the connection will be closed.
Structure of the module:
* client.py - the client classes used for workers to connect to master
* command.py - the definitions of all the valid commands
* protocol.py - contains bot the client and server protocol implementations,
these should not be used directly
* resource.py - the server classes that accepts and handle client connections
* streams.py - the definitons of all the valid streams
Further detail about the wire protocol can be found in protocol.py and the
meaning of the various commands in command.py.
Since the protocol is a simple line based, its possible to manually connect to
the server using a tool like netcat. A few things should be noted when manually
using the protocol:
* When subscribing to a stream using `REPLICATE`, the special token `NOW` can
be used to get all future updates. The special stream name `ALL` can be used
with `NOW` to subscribe to all available streams.
* The federation stream is only available if federation sending has been
disabled on the main process.
* The server will only time connections out that have sent a `PING` command.
If a ping is sent then the connection will be closed if no further commands
are receieved within 15s. Both the client and server protocol implementations
will send an initial PING on connection and ensure at least one command every
5s is sent (not necessarily `PING`).
* `RDATA` commands *usually* include a numeric token, however if the stream
has multiple rows to replicate per token the server will send multiple
`RDATA` commands, with all but the last having a token of `batch`. See
the documentation on `commands.RdataCommand` for further details.
"""
341 changes: 341 additions & 0 deletions synapse/replication/tcp/commands.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,341 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Defines the various valid commands
The VALID_SERVER_COMMANDS and VALID_CLIENT_COMMANDS define which commands are
allowed to be sent by which side.
"""

import logging
import ujson as json


logger = logging.getLogger(__name__)


class Command(object):
"""The base command class.
All subclasses must set the NAME variable which equates to the name of the
command on the wire.
A full command line on the wire is constructed from `NAME + " " + to_line()`
The default implementation creates a command of form `<NAME> <data>`
"""
NAME = None

def __init__(self, data):
self.data = data

@classmethod
def from_line(cls, line):
"""Deserialises a line from the wire into this command. `line` does not
include the command.
"""
return cls(line)

def to_line(self):
"""Serialises the comamnd for the wire. Does not include the command
prefix.
"""
return self.data


class ServerCommand(Command):
"""Sent by the server on new connection and includes the server_name.
Format::
SERVER <server_name>
"""
NAME = "SERVER"


class RdataCommand(Command):
"""Sent by server when a subscribed stream has an update.
Format::
RDATA <stream_name> <token> <row_json>
The `<token>` may either be a numeric stream id OR "batch". The latter case
is used to support sending multiple updates with the same stream ID. This
is done by sending an RDATA for each row, with all but the last RDATA having
a token of "batch" and the last having the final stream ID.
The client should batch all incoming RDATA with a token of "batch" (per
stream_name) until it sees an RDATA with a numeric stream ID.
`<token>` of "batch" maps to the instance variable `token` being None.
An example of a batched series of RDATA::
RDATA presence batch ["@foo:example.com", "online", ...]
RDATA presence batch ["@bar:example.com", "online", ...]
RDATA presence 59 ["@baz:example.com", "online", ...]
"""
NAME = "RDATA"

def __init__(self, stream_name, token, row):
self.stream_name = stream_name
self.token = token
self.row = row

@classmethod
def from_line(cls, line):
stream_name, token, row_json = line.split(" ", 2)
return cls(
stream_name,
None if token == "batch" else int(token),
json.loads(row_json)
)

def to_line(self):
return " ".join((
self.stream_name,
str(self.token) if self.token is not None else "batch",
json.dumps(self.row),
))


class PositionCommand(Command):
"""Sent by the client to tell the client the stream postition without
needing to send an RDATA.
"""
NAME = "POSITION"

def __init__(self, stream_name, token):
self.stream_name = stream_name
self.token = token

@classmethod
def from_line(cls, line):
stream_name, token = line.split(" ", 1)
return cls(stream_name, int(token))

def to_line(self):
return " ".join((self.stream_name, str(self.token),))


class ErrorCommand(Command):
"""Sent by either side if there was an ERROR. The data is a string describing
the error.
"""
NAME = "ERROR"


class PingCommand(Command):
"""Sent by either side as a keep alive. The data is arbitary (often timestamp)
"""
NAME = "PING"


class NameCommand(Command):
"""Sent by client to inform the server of the client's identity. The data
is the name
"""
NAME = "NAME"


class ReplicateCommand(Command):
"""Sent by the client to subsribe to the stream.
Format::
REPLICATE <stream_name> <token>
Where <token> may be either:
* a numeric stream_id to stream updates from
* "NOW" to stream all subsequent updates.
The <stream_name> can be "ALL" to subscribe to all known streams, in which
case the <token> must be set to "NOW", i.e.::
REPLICATE ALL NOW
"""
NAME = "REPLICATE"

def __init__(self, stream_name, token):
self.stream_name = stream_name
self.token = token

@classmethod
def from_line(cls, line):
stream_name, token = line.split(" ", 1)
if token in ("NOW", "now"):
token = "NOW"
else:
token = int(token)
return cls(stream_name, token)

def to_line(self):
return " ".join((self.stream_name, str(self.token),))


class UserSyncCommand(Command):
"""Sent by the client to inform the server that a user has started or
stopped syncing. Used to calculate presence on the master.
Format::
USER_SYNC <user_id> <state>
Where <state> is either "start" or "stop"
"""
NAME = "USER_SYNC"

def __init__(self, user_id, is_syncing):
self.user_id = user_id
self.is_syncing = is_syncing

@classmethod
def from_line(cls, line):
user_id, state = line.split(" ", 1)

if state not in ("start", "end"):
raise Exception("Invalid USER_SYNC state %r" % (state,))

return cls(user_id, state == "start")

def to_line(self):
return " ".join((self.user_id, "start" if self.is_syncing else "end"))


class FederationAckCommand(Command):
"""Sent by the client when its processed upto a given point in the
federation stream. This allows the master to drop in memory caches of the
federation stream.
This must only be sent from one worker (i.e. the one sending federation)
Format::
FEDERATION_ACK <token>
"""
NAME = "FEDERATION_ACK"

def __init__(self, token):
self.token = token

@classmethod
def from_line(cls, line):
return cls(int(line))

def to_line(self):
return str(self.token)


class SyncCommand(Command):
"""Used for testing. The client protocol implementation allows waiting
on a SYNC command with a specified data.
"""
NAME = "SYNC"


class RemovePusherCommand(Command):
"""Sent by the client to request the master remove the given pusher.
Format::
REMOVE_PUSHER <app_id> <push_key> <user_id>
"""
NAME = "REMOVE_PUSHER"

def __init__(self, app_id, push_key, user_id):
self.user_id = user_id
self.app_id = app_id
self.push_key = push_key

@classmethod
def from_line(cls, line):
app_id, push_key, user_id = line.split(" ", 2)

return cls(app_id, push_key, user_id)

def to_line(self):
return " ".join((self.app_id, self.push_key, self.user_id))


class InvalidateCacheCommand(Command):
"""Sent by the client to invalidate an upstream cache.
THIS IS NOT RELIABLE, AND SHOULD *NOT* BE USED ACCEPT FOR THINGS THAT ARE
NOT DISASTROUS IF WE DROP ON THE FLOOR.
Mainly used to invalidate destination retry timing caches.
Format::
INVALIDATE_CACHE <cache_func> <keys_json>
Where <keys_json> is a json list.
"""
NAME = "INVALIDATE_CACHE"

def __init__(self, cache_func, keys):
self.cache_func = cache_func
self.keys = keys

@classmethod
def from_line(cls, line):
cache_func, keys_json = line.split(" ", 1)

return cls(cache_func, json.loads(keys_json))

def to_line(self):
return " ".join((self.cache_func, json.dumps(self.keys)))


# Map of command name to command type.
COMMAND_MAP = {
cmd.NAME: cmd
for cmd in (
ServerCommand,
RdataCommand,
PositionCommand,
ErrorCommand,
PingCommand,
NameCommand,
ReplicateCommand,
UserSyncCommand,
FederationAckCommand,
SyncCommand,
RemovePusherCommand,
InvalidateCacheCommand,
)
}

# The commands the server is allowed to send
VALID_SERVER_COMMANDS = (
ServerCommand.NAME,
RdataCommand.NAME,
PositionCommand.NAME,
ErrorCommand.NAME,
PingCommand.NAME,
SyncCommand.NAME,
)

# The commands the client is allowed to send
VALID_CLIENT_COMMANDS = (
NameCommand.NAME,
ReplicateCommand.NAME,
PingCommand.NAME,
UserSyncCommand.NAME,
FederationAckCommand.NAME,
RemovePusherCommand.NAME,
InvalidateCacheCommand.NAME,
ErrorCommand.NAME,
)
Loading

0 comments on commit d5fb256

Please sign in to comment.