Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Calculate thread specific notification counts.
Browse files Browse the repository at this point in the history
  • Loading branch information
clokep committed Sep 14, 2022
1 parent 247f630 commit b00b4c1
Show file tree
Hide file tree
Showing 4 changed files with 271 additions and 59 deletions.
1 change: 1 addition & 0 deletions changelog.d/13776.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Experimental support for thread-specific notifications ([MSC3773](https://github.com/matrix-org/matrix-spec-proposals/pull/3773)).
2 changes: 1 addition & 1 deletion synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@
"event_search": "event_search_event_id_idx",
"local_media_repository_thumbnails": "local_media_repository_thumbnails_method_idx",
"remote_media_cache_thumbnails": "remote_media_repository_thumbnails_method_idx",
"event_push_summary": "event_push_summary_unique_index",
"event_push_summary": "event_push_summary_unique_index2",
}


Expand Down
152 changes: 94 additions & 58 deletions synapse/storage/databases/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ def _get_unread_counts_by_pos_txn(
"""

counts = NotifCounts()
thread_counts = {}

# First we pull the counts from the summary table.
#
Expand All @@ -453,7 +454,7 @@ def _get_unread_counts_by_pos_txn(
# receipt).
txn.execute(
"""
SELECT stream_ordering, notif_count, COALESCE(unread_count, 0)
SELECT stream_ordering, notif_count, COALESCE(unread_count, 0), thread_id
FROM event_push_summary
WHERE room_id = ? AND user_id = ?
AND (
Expand All @@ -463,42 +464,70 @@ def _get_unread_counts_by_pos_txn(
""",
(room_id, user_id, receipt_stream_ordering, receipt_stream_ordering),
)
row = txn.fetchone()
max_summary_stream_ordering = 0
for summary_stream_ordering, notif_count, unread_count, thread_id in txn:
if thread_id == "main":
counts = NotifCounts(
notify_count=notif_count, unread_count=unread_count
)
# TODO Delete zeroed out threads completely from the database.
elif notif_count or unread_count:
thread_counts[thread_id] = NotifCounts(
notify_count=notif_count, unread_count=unread_count
)

summary_stream_ordering = 0
if row:
summary_stream_ordering = row[0]
counts.notify_count += row[1]
counts.unread_count += row[2]
# XXX All threads should have the same stream ordering?
max_summary_stream_ordering = max(
summary_stream_ordering, max_summary_stream_ordering
)

# Next we need to count highlights, which aren't summarised
sql = """
SELECT COUNT(*) FROM event_push_actions
SELECT COUNT(*), thread_id FROM event_push_actions
WHERE user_id = ?
AND room_id = ?
AND stream_ordering > ?
AND highlight = 1
GROUP BY thread_id
"""
txn.execute(sql, (user_id, room_id, receipt_stream_ordering))
row = txn.fetchone()
if row:
counts.highlight_count += row[0]
for highlight_count, thread_id in txn:
if thread_id == "main":
counts.highlight_count += highlight_count
elif highlight_count:
if thread_id in thread_counts:
thread_counts[thread_id].highlight_count += highlight_count
else:
thread_counts[thread_id] = NotifCounts(
notify_count=0, unread_count=0, highlight_count=highlight_count
)

# Finally we need to count push actions that aren't included in the
# summary returned above. This might be due to recent events that haven't
# been summarised yet or the summary is out of date due to a recent read
# receipt.
start_unread_stream_ordering = max(
receipt_stream_ordering, summary_stream_ordering
receipt_stream_ordering, max_summary_stream_ordering
)
notify_count, unread_count = self._get_notif_unread_count_for_user_room(
unread_counts = self._get_notif_unread_count_for_user_room(
txn, room_id, user_id, start_unread_stream_ordering
)

counts.notify_count += notify_count
counts.unread_count += unread_count
for notif_count, unread_count, thread_id in unread_counts:
if thread_id == "main":
counts.notify_count += notif_count
counts.unread_count += unread_count
elif thread_id in thread_counts:
thread_counts[thread_id].notify_count += notif_count
thread_counts[thread_id].unread_count += unread_count
else:
thread_counts[thread_id] = NotifCounts(
notify_count=notif_count,
unread_count=unread_count,
highlight_count=0,
)

return RoomNotifCounts(counts, {})
return RoomNotifCounts(counts, thread_counts)

def _get_notif_unread_count_for_user_room(
self,
Expand All @@ -507,7 +536,7 @@ def _get_notif_unread_count_for_user_room(
user_id: str,
stream_ordering: int,
max_stream_ordering: Optional[int] = None,
) -> Tuple[int, int]:
) -> List[Tuple[int, int, str]]:
"""Returns the notify and unread counts from `event_push_actions` for
the given user/room in the given range.
Expand All @@ -523,13 +552,14 @@ def _get_notif_unread_count_for_user_room(
If this is not given, then no maximum is applied.
Return:
A tuple of the notif count and unread count in the given range.
A tuple of the notif count and unread count in the given range for
each thread.
"""

# If there have been no events in the room since the stream ordering,
# there can't be any push actions either.
if not self._events_stream_cache.has_entity_changed(room_id, stream_ordering):
return 0, 0
return []

clause = ""
args = [user_id, room_id, stream_ordering]
Expand All @@ -540,26 +570,23 @@ def _get_notif_unread_count_for_user_room(
# If the max stream ordering is less than the min stream ordering,
# then obviously there are zero push actions in that range.
if max_stream_ordering <= stream_ordering:
return 0, 0
return []

sql = f"""
SELECT
COUNT(CASE WHEN notif = 1 THEN 1 END),
COUNT(CASE WHEN unread = 1 THEN 1 END)
FROM event_push_actions ea
WHERE user_id = ?
COUNT(CASE WHEN unread = 1 THEN 1 END),
thread_id
FROM event_push_actions ea
WHERE user_id = ?
AND room_id = ?
AND ea.stream_ordering > ?
{clause}
GROUP BY thread_id
"""

txn.execute(sql, args)
row = txn.fetchone()

if row:
return cast(Tuple[int, int], row)

return 0, 0
return cast(List[Tuple[int, int, str]], txn.fetchall())

async def get_push_action_users_in_range(
self, min_stream_ordering: int, max_stream_ordering: int
Expand Down Expand Up @@ -1103,26 +1130,34 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:

# Fetch the notification counts between the stream ordering of the
# latest receipt and what was previously summarised.
notif_count, unread_count = self._get_notif_unread_count_for_user_room(
unread_counts = self._get_notif_unread_count_for_user_room(
txn, room_id, user_id, stream_ordering, old_rotate_stream_ordering
)

# Replace the previous summary with the new counts.
#
# TODO(threads): Upsert per-thread instead of setting them all to main.
self.db_pool.simple_upsert_txn(
# First mark the summary for all threads in the room as cleared.
self.db_pool.simple_update_txn(
txn,
table="event_push_summary",
keyvalues={"room_id": room_id, "user_id": user_id},
values={
"notif_count": notif_count,
"unread_count": unread_count,
keyvalues={"user_id": user_id, "room_id": room_id},
updatevalues={
"notif_count": 0,
"unread_count": 0,
"stream_ordering": old_rotate_stream_ordering,
"last_receipt_stream_ordering": stream_ordering,
"thread_id": "main",
},
)

# Then any updated threads get their notification count and unread
# count updated.
self.db_pool.simple_upsert_many_txn(
txn,
table="event_push_summary",
key_names=("room_id", "user_id", "thread_id"),
key_values=[(room_id, user_id, row[2]) for row in unread_counts],
value_names=("notif_count", "unread_count"),
value_values=[(row[0], row[1]) for row in unread_counts],
)

# We always update `event_push_summary_last_receipt_stream_id` to
# ensure that we don't rescan the same receipts for remote users.

Expand Down Expand Up @@ -1208,23 +1243,23 @@ def _rotate_notifs_before_txn(

# Calculate the new counts that should be upserted into event_push_summary
sql = """
SELECT user_id, room_id,
SELECT user_id, room_id, thread_id,
coalesce(old.%s, 0) + upd.cnt,
upd.stream_ordering
FROM (
SELECT user_id, room_id, count(*) as cnt,
SELECT user_id, room_id, thread_id, count(*) as cnt,
max(ea.stream_ordering) as stream_ordering
FROM event_push_actions AS ea
LEFT JOIN event_push_summary AS old USING (user_id, room_id)
LEFT JOIN event_push_summary AS old USING (user_id, room_id, thread_id)
WHERE ? < ea.stream_ordering AND ea.stream_ordering <= ?
AND (
old.last_receipt_stream_ordering IS NULL
OR old.last_receipt_stream_ordering < ea.stream_ordering
)
AND %s = 1
GROUP BY user_id, room_id
GROUP BY user_id, room_id, thread_id
) AS upd
LEFT JOIN event_push_summary AS old USING (user_id, room_id)
LEFT JOIN event_push_summary AS old USING (user_id, room_id, thread_id)
"""

# First get the count of unread messages.
Expand All @@ -1238,11 +1273,11 @@ def _rotate_notifs_before_txn(
# object because we might not have the same amount of rows in each of them. To do
# this, we use a dict indexed on the user ID and room ID to make it easier to
# populate.
summaries: Dict[Tuple[str, str], _EventPushSummary] = {}
summaries: Dict[Tuple[str, str, str], _EventPushSummary] = {}
for row in txn:
summaries[(row[0], row[1])] = _EventPushSummary(
unread_count=row[2],
stream_ordering=row[3],
summaries[(row[0], row[1], row[2])] = _EventPushSummary(
unread_count=row[3],
stream_ordering=row[4],
notif_count=0,
)

Expand All @@ -1253,34 +1288,35 @@ def _rotate_notifs_before_txn(
)

for row in txn:
if (row[0], row[1]) in summaries:
summaries[(row[0], row[1])].notif_count = row[2]
if (row[0], row[1], row[2]) in summaries:
summaries[(row[0], row[1], row[2])].notif_count = row[3]
else:
# Because the rules on notifying are different than the rules on marking
# a message unread, we might end up with messages that notify but aren't
# marked unread, so we might not have a summary for this (user, room)
# tuple to complete.
summaries[(row[0], row[1])] = _EventPushSummary(
summaries[(row[0], row[1], row[2])] = _EventPushSummary(
unread_count=0,
stream_ordering=row[3],
notif_count=row[2],
stream_ordering=row[4],
notif_count=row[3],
)

logger.info("Rotating notifications, handling %d rows", len(summaries))

# TODO(threads): Update on a per-thread basis.
self.db_pool.simple_upsert_many_txn(
txn,
table="event_push_summary",
key_names=("user_id", "room_id"),
key_values=[(user_id, room_id) for user_id, room_id in summaries],
value_names=("notif_count", "unread_count", "stream_ordering", "thread_id"),
key_names=("user_id", "room_id", "thread_id"),
key_values=[
(user_id, room_id, thread_id)
for user_id, room_id, thread_id in summaries
],
value_names=("notif_count", "unread_count", "stream_ordering"),
value_values=[
(
summary.notif_count,
summary.unread_count,
summary.stream_ordering,
"main",
)
for summary in summaries.values()
],
Expand Down
Loading

0 comments on commit b00b4c1

Please sign in to comment.