diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml
index 265c4d57..47ea5f40 100644
--- a/.github/workflows/pytest.yml
+++ b/.github/workflows/pytest.yml
@@ -28,9 +28,17 @@ jobs:
run: |
docker compose create
docker compose start
- # Wait for the services to accept connections,
- # TODO: do that smarter, poll connection attempt until it succeeds
- sleep 30
+ echo "wait mysql server"
+
+ while :
+ do
+ if mysql -h 127.0.0.1 --user=root --execute "SELECT version();" 2>&1 >/dev/null && mysql -h 127.0.0.1 --port=3307 --user=root --execute "SELECT version();" 2>&1 >/dev/null; then
+ break
+ fi
+ sleep 1
+ done
+
+ echo "run pytest"
- name: Install dependencies
run: |
diff --git a/.mariadb/my.cnf b/.mariadb/my.cnf
new file mode 100644
index 00000000..c530c80c
--- /dev/null
+++ b/.mariadb/my.cnf
@@ -0,0 +1,23 @@
+[client-server]
+# Port or socket location where to connect
+# port = 3306
+socket = /run/mysqld/mysqld.sock
+
+# Import all .cnf files from configuration directory
+
+!includedir /etc/mysql/mariadb.conf.d/
+!includedir /etc/mysql/conf.d/
+
+
+[mariadb]
+plugin_load_add = file_key_management
+# Key files that are not encrypted
+loose_file_key_management_filename = /opt/key_file/no_encryption_key.key
+
+# Encrypted key file
+# loose_file_key_management_filename=/opt/key_file/keyfile.enc
+# loose_file_key_management_filekey=FILE:/opt/key_file/no_encryption_key.key
+# file_key_management_encryption_algorithm=aes_ctr
+
+# Set encrypt_binlog
+encrypt_binlog=ON
\ No newline at end of file
diff --git a/.mariadb/no_encryption_key.key b/.mariadb/no_encryption_key.key
new file mode 100755
index 00000000..476ede79
--- /dev/null
+++ b/.mariadb/no_encryption_key.key
@@ -0,0 +1 @@
+1;dda0ccb18a28b0b4c2448b5f0217a134
\ No newline at end of file
diff --git a/README.md b/README.md
index 78cf85a1..2bc28db5 100644
--- a/README.md
+++ b/README.md
@@ -4,7 +4,7 @@ python-mysql-replication
-Pure Python Implementation of MySQL replication protocol build on top of PyMYSQL. This allow you to receive event like insert, update, delete with their datas and raw SQL queries.
+Pure Python Implementation of MySQL replication protocol build on top of PyMYSQL. This allows you to receive event like insert, update, delete with their datas and raw SQL queries.
Use cases
===========
@@ -56,6 +56,11 @@ Limitations
https://python-mysql-replication.readthedocs.org/en/latest/limitations.html
+Featured Books
+=============
+
+[Data Pipelines Pocket Reference](https://www.oreilly.com/library/view/data-pipelines-pocket/9781492087823/) (by James Densmore, O'Reilly): Introduced and exemplified in Chapter 4: Data Ingestion: Extracting Data.
+
Projects using this library
===========================
diff --git a/docker-compose-test.yml b/docker-compose-test.yml
new file mode 100644
index 00000000..38d95827
--- /dev/null
+++ b/docker-compose-test.yml
@@ -0,0 +1,59 @@
+version: '3.2'
+services:
+ percona-5.7:
+ platform: linux/amd64
+ image: percona:5.7
+ environment:
+ MYSQL_ALLOW_EMPTY_PASSWORD: true
+ MYSQL_DATABASE: pymysqlreplication_test
+ ports:
+ - 3306:3306
+ command: mysqld --log-bin=mysql-bin.log --server-id 1 --binlog-format=row --gtid_mode=on --enforce-gtid-consistency=on --log_slave_updates
+ restart: always
+ networks:
+ - default
+
+ percona-5.7-ctl:
+ image: percona:5.7
+ environment:
+ MYSQL_ALLOW_EMPTY_PASSWORD: true
+ MYSQL_DATABASE: pymysqlreplication_test
+ ports:
+ - 3307:3307
+ command: mysqld --log-bin=mysql-bin.log --server-id 1 --binlog-format=row --gtid_mode=on --enforce-gtid-consistency=on --log_slave_updates -P 3307
+
+ pymysqlreplication:
+ build:
+ context: .
+ dockerfile: test.Dockerfile
+ args:
+ BASE_IMAGE: python:3.11-alpine
+ MYSQL_5_7: percona-5.7
+ MYSQL_5_7_CTL: percona-5.7-ctl
+
+ command:
+ - /bin/sh
+ - -ce
+ - |
+ echo "wait mysql server"
+
+ while :
+ do
+ if mysql -h percona-5.7 --user=root --execute "USE pymysqlreplication_test;" 2>&1 >/dev/null && mysql -h percona-5.7-ctl --port=3307 --user=root --execute "USE pymysqlreplication_test;" 2>&1 >/dev/null; then
+ break
+ fi
+ sleep 1
+ done
+
+ echo "run pytest"
+ pytest -k "not test_no_trailing_rotate_event and not test_end_log_pos"
+
+ working_dir: /pymysqlreplication
+ networks:
+ - default
+ depends_on:
+ - percona-5.7
+ - percona-5.7-ctl
+
+networks:
+ default: {}
diff --git a/docker-compose.yml b/docker-compose.yml
index 5032ae6e..3de0a9cf 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -16,10 +16,30 @@ services:
- 3307:3307
command: mysqld --log-bin=mysql-bin.log --server-id 1 --binlog-format=row --gtid_mode=on --enforce-gtid-consistency=on --log_slave_updates -P 3307
+ mariadb-10.6:
+ image: mariadb:10.6
+ environment:
+ MARIADB_ALLOW_EMPTY_ROOT_PASSWORD: 1
+ ports:
+ - "3308:3306"
+ command: |
+ --server-id=1
+ --default-authentication-plugin=mysql_native_password
+ --log-bin=master-bin
+ --binlog-format=row
+ --log-slave-updates=on
+ volumes:
+ - type: bind
+ source: ./.mariadb
+ target: /opt/key_file
+ - type: bind
+ source: ./.mariadb/my.cnf
+ target: /etc/mysql/my.cnf
+
percona-8.0:
image: percona:8.0
environment:
MYSQL_ALLOW_EMPTY_PASSWORD: true
ports:
- - 3308:3308
- command: mysqld --log-bin=mysql-bin.log --server-id 1 --binlog-format=row --gtid_mode=on --enforce-gtid-consistency=on --log_slave_updates -P 3308
+ - 3309:3309
+ command: mysqld --log-bin=mysql-bin.log --server-id 1 --binlog-format=row --gtid_mode=on --enforce-gtid-consistency=on --log_slave_updates -P 3309
diff --git a/docs/developement.rst b/docs/developement.rst
index 30e257e0..68eecc21 100644
--- a/docs/developement.rst
+++ b/docs/developement.rst
@@ -23,7 +23,7 @@ When it's possible we have an unit test.
*pymysqlreplication/tests/* contains the test suite. The test suite
use the standard *unittest* Python module.
-**Be carefull** tests will reset the binary log of your MySQL server.
+**Be careful** tests will reset the binary log of your MySQL server.
Make sure you have the following configuration set in your mysql config file (usually my.cnf on development env):
diff --git a/examples/mariadb_gtid/read_event.py b/examples/mariadb_gtid/read_event.py
index cc88a97f..49598c3f 100644
--- a/examples/mariadb_gtid/read_event.py
+++ b/examples/mariadb_gtid/read_event.py
@@ -1,7 +1,7 @@
import pymysql
from pymysqlreplication import BinLogStreamReader, gtid
-from pymysqlreplication.event import GtidEvent, RotateEvent, MariadbGtidEvent, QueryEvent
+from pymysqlreplication.event import GtidEvent, RotateEvent, MariadbGtidEvent, QueryEvent,MariadbAnnotateRowsEvent
from pymysqlreplication.row_event import WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent
MARIADB_SETTINGS = {
@@ -65,10 +65,12 @@ def query_server_id(self):
RotateEvent,
WriteRowsEvent,
UpdateRowsEvent,
- DeleteRowsEvent
+ DeleteRowsEvent,
+ MariadbAnnotateRowsEvent
],
auto_position=gtid,
- is_mariadb=True
+ is_mariadb=True,
+ annotate_rows_event=True
)
print('Starting reading events from GTID ', gtid)
diff --git a/pymysqlreplication/binlogstream.py b/pymysqlreplication/binlogstream.py
index fa65aa22..9c324a06 100644
--- a/pymysqlreplication/binlogstream.py
+++ b/pymysqlreplication/binlogstream.py
@@ -14,7 +14,8 @@
QueryEvent, RotateEvent, FormatDescriptionEvent,
XidEvent, GtidEvent, StopEvent, XAPrepareEvent,
BeginLoadQueryEvent, ExecuteLoadQueryEvent,
- HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent)
+ HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent,
+ MariadbAnnotateRowsEvent, RandEvent, MariadbStartEncryptionEvent)
from .exceptions import BinLogNotEnabled
from .row_event import (
UpdateRowsEvent, WriteRowsEvent, DeleteRowsEvent, TableMapEvent)
@@ -141,6 +142,7 @@ def __init__(self, connection_settings, server_id,
fail_on_table_metadata_unavailable=False,
slave_heartbeat=None,
is_mariadb=False,
+ annotate_rows_event=False,
ignore_decode_errors=False):
"""
Attributes:
@@ -166,7 +168,8 @@ def __init__(self, connection_settings, server_id,
skip_to_timestamp: Ignore all events until reaching specified
timestamp.
report_slave: Report slave in SHOW SLAVE HOSTS.
- slave_uuid: Report slave_uuid in SHOW SLAVE HOSTS.
+ slave_uuid: Report slave_uuid or replica_uuid in SHOW SLAVE HOSTS(MySQL 8.0.21-) or
+ SHOW REPLICAS(MySQL 8.0.22+) depends on your MySQL version.
fail_on_table_metadata_unavailable: Should raise exception if we
can't get table information on
row_events
@@ -178,6 +181,8 @@ def __init__(self, connection_settings, server_id,
for semantics
is_mariadb: Flag to indicate it's a MariaDB server, used with auto_position
to point to Mariadb specific GTID.
+ annotate_rows_event: Parameter value to enable annotate rows event in mariadb,
+ used with 'is_mariadb'
ignore_decode_errors: If true, any decode errors encountered
when reading column data will be ignored.
"""
@@ -219,6 +224,7 @@ def __init__(self, connection_settings, server_id,
self.auto_position = auto_position
self.skip_to_timestamp = skip_to_timestamp
self.is_mariadb = is_mariadb
+ self.__annotate_rows_event = annotate_rows_event
if end_log_pos:
self.is_past_end_log_pos = False
@@ -301,7 +307,7 @@ def __connect_to_stream(self):
if self.slave_uuid:
cur = self._stream_connection.cursor()
- cur.execute("set @slave_uuid= '%s'" % self.slave_uuid)
+ cur.execute("SET @slave_uuid = %s, @replica_uuid = %s", (self.slave_uuid, self.slave_uuid))
cur.close()
if self.slave_heartbeat:
@@ -331,67 +337,39 @@ def __connect_to_stream(self):
self._register_slave()
if not self.auto_position:
- # only when log_file and log_pos both provided, the position info is
- # valid, if not, get the current position from master
- if self.log_file is None or self.log_pos is None:
- cur = self._stream_connection.cursor()
- cur.execute("SHOW MASTER STATUS")
- master_status = cur.fetchone()
- if master_status is None:
- raise BinLogNotEnabled()
- self.log_file, self.log_pos = master_status[:2]
- cur.close()
-
- prelude = struct.pack('