Skip to content

Commit

Permalink
Add event.internal_metadata.instance_name (#17300)
Browse files Browse the repository at this point in the history
Add `event.internal_metadata.instance_name` (the worker instance that persisted the event) to go alongside the existing `event.internal_metadata.stream_ordering`.

`instance_name` is useful to properly compare and query for events with a token since you need to compare both the `stream_ordering` and `instance_name` against the vector clock/`instance_map` in the `RoomStreamToken`.

This is pre-requisite work and may be used in #17293

Adding `event.internal_metadata.instance_name` was first mentioned in the initial Sliding Sync PR while pairing with @erikjohnston, see 09609cb#diff-5cd773fb307aa754bd3948871ba118b1ef0303f4d72d42a2d21e38242bf4e096R405-R410
  • Loading branch information
MadLittleMods authored Jun 13, 2024
1 parent ebdce69 commit 8c58eb7
Show file tree
Hide file tree
Showing 10 changed files with 31 additions and 9 deletions.
1 change: 1 addition & 0 deletions changelog.d/17300.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Expose the worker instance that persisted the event on `event.internal_metadata.instance_name`.
3 changes: 3 additions & 0 deletions rust/src/events/internal_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ pub struct EventInternalMetadata {
/// The stream ordering of this event. None, until it has been persisted.
#[pyo3(get, set)]
stream_ordering: Option<NonZeroI64>,
#[pyo3(get, set)]
instance_name: Option<String>,

/// whether this event is an outlier (ie, whether we have the state at that
/// point in the DAG)
Expand Down Expand Up @@ -232,6 +234,7 @@ impl EventInternalMetadata {
Ok(EventInternalMetadata {
data,
stream_ordering: None,
instance_name: None,
outlier: false,
})
}
Expand Down
2 changes: 2 additions & 0 deletions synapse/events/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def prune_event(event: EventBase) -> EventBase:
pruned_event.internal_metadata.stream_ordering = (
event.internal_metadata.stream_ordering
)
pruned_event.internal_metadata.instance_name = event.internal_metadata.instance_name
pruned_event.internal_metadata.outlier = event.internal_metadata.outlier

# Mark the event as redacted
Expand All @@ -116,6 +117,7 @@ def clone_event(event: EventBase) -> EventBase:
new_event.internal_metadata.stream_ordering = (
event.internal_metadata.stream_ordering
)
new_event.internal_metadata.instance_name = event.internal_metadata.instance_name
new_event.internal_metadata.outlier = event.internal_metadata.outlier

return new_event
Expand Down
1 change: 1 addition & 0 deletions synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -1551,6 +1551,7 @@ async def _persist_events(
# stream_ordering entry manually (as it was persisted on
# another worker).
event.internal_metadata.stream_ordering = stream_id
event.internal_metadata.instance_name = writer_instance

return event

Expand Down
1 change: 1 addition & 0 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ async def _persist_events_and_state_updates(
async with stream_ordering_manager as stream_orderings:
for (event, _), stream in zip(events_and_contexts, stream_orderings):
event.internal_metadata.stream_ordering = stream
event.internal_metadata.instance_name = self._instance_name

await self.db_pool.runInteraction(
"persist_events",
Expand Down
16 changes: 10 additions & 6 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ class _EventRow:

event_id: str
stream_ordering: int
instance_name: str
json: str
internal_metadata: str
format_version: Optional[int]
Expand Down Expand Up @@ -1354,6 +1355,7 @@ async def _fetch_event_ids_and_get_outstanding_redactions(
rejected_reason=rejected_reason,
)
original_ev.internal_metadata.stream_ordering = row.stream_ordering
original_ev.internal_metadata.instance_name = row.instance_name
original_ev.internal_metadata.outlier = row.outlier

# Consistency check: if the content of the event has been modified in the
Expand Down Expand Up @@ -1439,6 +1441,7 @@ def _fetch_event_rows(
SELECT
e.event_id,
e.stream_ordering,
e.instance_name,
ej.internal_metadata,
ej.json,
ej.format_version,
Expand All @@ -1462,13 +1465,14 @@ def _fetch_event_rows(
event_dict[event_id] = _EventRow(
event_id=event_id,
stream_ordering=row[1],
internal_metadata=row[2],
json=row[3],
format_version=row[4],
room_version_id=row[5],
rejected_reason=row[6],
instance_name=row[2],
internal_metadata=row[3],
json=row[4],
format_version=row[5],
room_version_id=row[6],
rejected_reason=row[7],
redactions=[],
outlier=bool(row[7]), # This is an int in SQLite3
outlier=bool(row[8]), # This is an int in SQLite3
)

# check for redactions
Expand Down
2 changes: 2 additions & 0 deletions synapse/synapse_rust/events.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ class EventInternalMetadata:

stream_ordering: Optional[int]
"""the stream ordering of this event. None, until it has been persisted."""
instance_name: Optional[str]
"""the instance name of the server that persisted this event. None, until it has been persisted."""

outlier: bool
"""whether this event is an outlier (ie, whether we have the state at that
Expand Down
3 changes: 3 additions & 0 deletions tests/events/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,13 +625,16 @@ def test_unsigned_is_copied(self) -> None:
)
original.internal_metadata.stream_ordering = 1234
self.assertEqual(original.internal_metadata.stream_ordering, 1234)
original.internal_metadata.instance_name = "worker1"
self.assertEqual(original.internal_metadata.instance_name, "worker1")

cloned = clone_event(original)
cloned.unsigned["b"] = 3

self.assertEqual(original.unsigned, {"a": 1, "b": 2})
self.assertEqual(cloned.unsigned, {"a": 1, "b": 3})
self.assertEqual(cloned.internal_metadata.stream_ordering, 1234)
self.assertEqual(cloned.internal_metadata.instance_name, "worker1")
self.assertEqual(cloned.internal_metadata.txn_id, "txn")


Expand Down
10 changes: 7 additions & 3 deletions tests/replication/storage/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ def test_invites(self) -> None:
self.persist(type="m.room.create", key="", creator=USER_ID)
self.check("get_invited_rooms_for_local_user", [USER_ID_2], [])
event = self.persist(type="m.room.member", key=USER_ID_2, membership="invite")
assert event.internal_metadata.instance_name is not None
assert event.internal_metadata.stream_ordering is not None

self.replicate()
Expand All @@ -155,7 +156,7 @@ def test_invites(self) -> None:
"invite",
event.event_id,
PersistedEventPosition(
self.hs.get_instance_name(),
event.internal_metadata.instance_name,
event.internal_metadata.stream_ordering,
),
RoomVersions.V1.identifier,
Expand Down Expand Up @@ -232,11 +233,12 @@ def test_get_rooms_for_user_with_stream_ordering(self) -> None:
j2 = self.persist(
type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join"
)
assert j2.internal_metadata.instance_name is not None
assert j2.internal_metadata.stream_ordering is not None
self.replicate()

expected_pos = PersistedEventPosition(
"master", j2.internal_metadata.stream_ordering
j2.internal_metadata.instance_name, j2.internal_metadata.stream_ordering
)
self.check(
"get_rooms_for_user_with_stream_ordering",
Expand Down Expand Up @@ -288,6 +290,7 @@ def test_get_rooms_for_user_with_stream_ordering_with_multi_event_persist(
msg, msgctx = self.build_event()
self.get_success(self.persistance.persist_events([(j2, j2ctx), (msg, msgctx)]))
self.replicate()
assert j2.internal_metadata.instance_name is not None
assert j2.internal_metadata.stream_ordering is not None

event_source = RoomEventSource(self.hs)
Expand Down Expand Up @@ -329,7 +332,8 @@ def test_get_rooms_for_user_with_stream_ordering_with_multi_event_persist(
# joined_rooms list.
if membership_changes:
expected_pos = PersistedEventPosition(
"master", j2.internal_metadata.stream_ordering
j2.internal_metadata.instance_name,
j2.internal_metadata.stream_ordering,
)
self.assertEqual(
joined_rooms,
Expand Down
1 change: 1 addition & 0 deletions tests/storage/test_event_chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ def persist(

for e in events:
e.internal_metadata.stream_ordering = self._next_stream_ordering
e.internal_metadata.instance_name = self.hs.get_instance_name()
self._next_stream_ordering += 1

def _persist(txn: LoggingTransaction) -> None:
Expand Down

0 comments on commit 8c58eb7

Please sign in to comment.