Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Add an admin API for users' media statistics (#8700)
Browse files Browse the repository at this point in the history
Add `GET /_synapse/admin/v1/statistics/users/media` to get statisics about local media usage by users.
Related to #6094
It is the first API for statistics.
Goal is to avoid/reduce usage of sql queries like [Wiki analyzing Synapse](https://github.com/matrix-org/synapse/wiki/SQL-for-analyzing-Synapse-PostgreSQL-database-stats)

Signed-off-by: Dirk Klimpel dirk@klimpel.org
  • Loading branch information
dklimpel authored Nov 5, 2020
1 parent e4676bd commit c3119d1
Show file tree
Hide file tree
Showing 6 changed files with 820 additions and 0 deletions.
1 change: 1 addition & 0 deletions changelog.d/8700.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add an admin API for local user media statistics. Contributed by @dklimpel.
83 changes: 83 additions & 0 deletions docs/admin_api/statistics.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# Users' media usage statistics

Returns information about all local media usage of users. Gives the
possibility to filter them by time and user.

The API is:

```
GET /_synapse/admin/v1/statistics/users/media
```

To use it, you will need to authenticate by providing an `access_token`
for a server admin: see [README.rst](README.rst).

A response body like the following is returned:

```json
{
"users": [
{
"displayname": "foo_user_0",
"media_count": 2,
"media_length": 134,
"user_id": "@foo_user_0:test"
},
{
"displayname": "foo_user_1",
"media_count": 2,
"media_length": 134,
"user_id": "@foo_user_1:test"
}
],
"next_token": 3,
"total": 10
}
```

To paginate, check for `next_token` and if present, call the endpoint
again with `from` set to the value of `next_token`. This will return a new page.

If the endpoint does not return a `next_token` then there are no more
reports to paginate through.

**Parameters**

The following parameters should be set in the URL:

* `limit`: string representing a positive integer - Is optional but is
used for pagination, denoting the maximum number of items to return
in this call. Defaults to `100`.
* `from`: string representing a positive integer - Is optional but used for pagination,
denoting the offset in the returned results. This should be treated as an opaque value
and not explicitly set to anything other than the return value of `next_token` from a
previous call. Defaults to `0`.
* `order_by` - string - The method in which to sort the returned list of users. Valid values are:
- `user_id` - Users are ordered alphabetically by `user_id`. This is the default.
- `displayname` - Users are ordered alphabetically by `displayname`.
- `media_length` - Users are ordered by the total size of uploaded media in bytes.
Smallest to largest.
- `media_count` - Users are ordered by number of uploaded media. Smallest to largest.
* `from_ts` - string representing a positive integer - Considers only
files created at this timestamp or later. Unix timestamp in ms.
* `until_ts` - string representing a positive integer - Considers only
files created at this timestamp or earlier. Unix timestamp in ms.
* `search_term` - string - Filter users by their user ID localpart **or** displayname.
The search term can be found in any part of the string.
Defaults to no filtering.
* `dir` - string - Direction of order. Either `f` for forwards or `b` for backwards.
Setting this value to `b` will reverse the above sort order. Defaults to `f`.


**Response**

The following fields are returned in the JSON response body:

* `users` - An array of objects, each containing information
about the user and their local media. Objects contain the following fields:
- `displayname` - string - Displayname of this user.
- `media_count` - integer - Number of uploaded media by this user.
- `media_length` - integer - Size of uploaded media in bytes by this user.
- `user_id` - string - Fully-qualified user ID (ex. `@user:server.com`).
* `next_token` - integer - Opaque value used for pagination. See above.
* `total` - integer - Total number of users after filtering.
2 changes: 2 additions & 0 deletions synapse/rest/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
ShutdownRoomRestServlet,
)
from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet
from synapse.rest.admin.statistics import UserMediaStatisticsRestServlet
from synapse.rest.admin.users import (
AccountValidityRenewServlet,
DeactivateAccountRestServlet,
Expand Down Expand Up @@ -227,6 +228,7 @@ def register_servlets(hs, http_server):
DeviceRestServlet(hs).register(http_server)
DevicesRestServlet(hs).register(http_server)
DeleteDevicesRestServlet(hs).register(http_server)
UserMediaStatisticsRestServlet(hs).register(http_server)
EventReportDetailRestServlet(hs).register(http_server)
EventReportsRestServlet(hs).register(http_server)
PushersRestServlet(hs).register(http_server)
Expand Down
122 changes: 122 additions & 0 deletions synapse/rest/admin/statistics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
# -*- coding: utf-8 -*-
# Copyright 2020 Dirk Klimpel
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from typing import TYPE_CHECKING, Tuple

from synapse.api.errors import Codes, SynapseError
from synapse.http.servlet import RestServlet, parse_integer, parse_string
from synapse.http.site import SynapseRequest
from synapse.rest.admin._base import admin_patterns, assert_requester_is_admin
from synapse.storage.databases.main.stats import UserSortOrder
from synapse.types import JsonDict

if TYPE_CHECKING:
from synapse.server import HomeServer

logger = logging.getLogger(__name__)


class UserMediaStatisticsRestServlet(RestServlet):
"""
Get statistics about uploaded media by users.
"""

PATTERNS = admin_patterns("/statistics/users/media$")

def __init__(self, hs: "HomeServer"):
self.hs = hs
self.auth = hs.get_auth()
self.store = hs.get_datastore()

async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
await assert_requester_is_admin(self.auth, request)

order_by = parse_string(
request, "order_by", default=UserSortOrder.USER_ID.value
)
if order_by not in (
UserSortOrder.MEDIA_LENGTH.value,
UserSortOrder.MEDIA_COUNT.value,
UserSortOrder.USER_ID.value,
UserSortOrder.DISPLAYNAME.value,
):
raise SynapseError(
400,
"Unknown value for order_by: %s" % (order_by,),
errcode=Codes.INVALID_PARAM,
)

start = parse_integer(request, "from", default=0)
if start < 0:
raise SynapseError(
400,
"Query parameter from must be a string representing a positive integer.",
errcode=Codes.INVALID_PARAM,
)

limit = parse_integer(request, "limit", default=100)
if limit < 0:
raise SynapseError(
400,
"Query parameter limit must be a string representing a positive integer.",
errcode=Codes.INVALID_PARAM,
)

from_ts = parse_integer(request, "from_ts", default=0)
if from_ts < 0:
raise SynapseError(
400,
"Query parameter from_ts must be a string representing a positive integer.",
errcode=Codes.INVALID_PARAM,
)

until_ts = parse_integer(request, "until_ts")
if until_ts is not None:
if until_ts < 0:
raise SynapseError(
400,
"Query parameter until_ts must be a string representing a positive integer.",
errcode=Codes.INVALID_PARAM,
)
if until_ts <= from_ts:
raise SynapseError(
400,
"Query parameter until_ts must be greater than from_ts.",
errcode=Codes.INVALID_PARAM,
)

search_term = parse_string(request, "search_term")
if search_term == "":
raise SynapseError(
400,
"Query parameter search_term cannot be an empty string.",
errcode=Codes.INVALID_PARAM,
)

direction = parse_string(request, "dir", default="f")
if direction not in ("f", "b"):
raise SynapseError(
400, "Unknown direction: %s" % (direction,), errcode=Codes.INVALID_PARAM
)

users_media, total = await self.store.get_users_media_usage_paginate(
start, limit, from_ts, until_ts, order_by, direction, search_term
)
ret = {"users": users_media, "total": total}
if (start + limit) < total:
ret["next_token"] = start + len(users_media)

return 200, ret
127 changes: 127 additions & 0 deletions synapse/storage/databases/main/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,18 @@

import logging
from collections import Counter
from enum import Enum
from itertools import chain
from typing import Any, Dict, List, Optional, Tuple

from twisted.internet.defer import DeferredLock

from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import StoreError
from synapse.storage.database import DatabasePool
from synapse.storage.databases.main.state_deltas import StateDeltasStore
from synapse.storage.engines import PostgresEngine
from synapse.types import JsonDict
from synapse.util.caches.descriptors import cached

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -59,6 +62,23 @@
TYPE_TO_ORIGIN_TABLE = {"room": ("rooms", "room_id"), "user": ("users", "name")}


class UserSortOrder(Enum):
"""
Enum to define the sorting method used when returning users
with get_users_media_usage_paginate
MEDIA_LENGTH = ordered by size of uploaded media. Smallest to largest.
MEDIA_COUNT = ordered by number of uploaded media. Smallest to largest.
USER_ID = ordered alphabetically by `user_id`.
DISPLAYNAME = ordered alphabetically by `displayname`
"""

MEDIA_LENGTH = "media_length"
MEDIA_COUNT = "media_count"
USER_ID = "user_id"
DISPLAYNAME = "displayname"


class StatsStore(StateDeltasStore):
def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)
Expand Down Expand Up @@ -882,3 +902,110 @@ def _calculate_and_set_initial_state_for_user_txn(txn):
complete_with_stream_id=pos,
absolute_field_overrides={"joined_rooms": joined_rooms},
)

async def get_users_media_usage_paginate(
self,
start: int,
limit: int,
from_ts: Optional[int] = None,
until_ts: Optional[int] = None,
order_by: Optional[UserSortOrder] = UserSortOrder.USER_ID.value,
direction: Optional[str] = "f",
search_term: Optional[str] = None,
) -> Tuple[List[JsonDict], Dict[str, int]]:
"""Function to retrieve a paginated list of users and their uploaded local media
(size and number). This will return a json list of users and the
total number of users matching the filter criteria.
Args:
start: offset to begin the query from
limit: number of rows to retrieve
from_ts: request only media that are created later than this timestamp (ms)
until_ts: request only media that are created earlier than this timestamp (ms)
order_by: the sort order of the returned list
direction: sort ascending or descending
search_term: a string to filter user names by
Returns:
A list of user dicts and an integer representing the total number of
users that exist given this query
"""

def get_users_media_usage_paginate_txn(txn):
filters = []
args = [self.hs.config.server_name]

if search_term:
filters.append("(lmr.user_id LIKE ? OR displayname LIKE ?)")
args.extend(["@%" + search_term + "%:%", "%" + search_term + "%"])

if from_ts:
filters.append("created_ts >= ?")
args.extend([from_ts])
if until_ts:
filters.append("created_ts <= ?")
args.extend([until_ts])

# Set ordering
if UserSortOrder(order_by) == UserSortOrder.MEDIA_LENGTH:
order_by_column = "media_length"
elif UserSortOrder(order_by) == UserSortOrder.MEDIA_COUNT:
order_by_column = "media_count"
elif UserSortOrder(order_by) == UserSortOrder.USER_ID:
order_by_column = "lmr.user_id"
elif UserSortOrder(order_by) == UserSortOrder.DISPLAYNAME:
order_by_column = "displayname"
else:
raise StoreError(
500, "Incorrect value for order_by provided: %s" % order_by
)

if direction == "b":
order = "DESC"
else:
order = "ASC"

where_clause = "WHERE " + " AND ".join(filters) if len(filters) > 0 else ""

sql_base = """
FROM local_media_repository as lmr
LEFT JOIN profiles AS p ON lmr.user_id = '@' || p.user_id || ':' || ?
{}
GROUP BY lmr.user_id, displayname
""".format(
where_clause
)

# SQLite does not support SELECT COUNT(*) OVER()
sql = """
SELECT COUNT(*) FROM (
SELECT lmr.user_id
{sql_base}
) AS count_user_ids
""".format(
sql_base=sql_base,
)
txn.execute(sql, args)
count = txn.fetchone()[0]

sql = """
SELECT
lmr.user_id,
displayname,
COUNT(lmr.user_id) as media_count,
SUM(media_length) as media_length
{sql_base}
ORDER BY {order_by_column} {order}
LIMIT ? OFFSET ?
""".format(
sql_base=sql_base, order_by_column=order_by_column, order=order,
)

args += [limit, start]
txn.execute(sql, args)
users = self.db_pool.cursor_to_dict(txn)

return users, count

return await self.db_pool.runInteraction(
"get_users_media_usage_paginate_txn", get_users_media_usage_paginate_txn
)
Loading

0 comments on commit c3119d1

Please sign in to comment.