Skip to content

Commit

Permalink
Add user prune-devices command
Browse files Browse the repository at this point in the history
Fixes issues JOJ0#44
  • Loading branch information
nemobis committed Oct 5, 2021
1 parent 1c35421 commit 375c573
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 0 deletions.
13 changes: 13 additions & 0 deletions synadm/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,19 @@ def user_whois(self, user_id):
"""
return self.query("get", f"v1/whois/{user_id}")

def user_devices(self, user_id):
""" Return information about all devices for a specific user
"""
return self.query("get", f"v2/users/{user_id}/devices")

def user_devices_delete(self, user_id, devices):
""" Delete the specified devices for a specific user.
Returns an empty JSON dict.
devices is a list of device IDs
"""
return self.query("post", f"v2/users/{user_id}/delete_devices", data={"devices": devices})

def room_join(self, room_id_or_alias, user_id):
""" Allow an administrator to join an user account with a given user_id
to a room with a given room_id_or_alias
Expand Down
84 changes: 84 additions & 0 deletions synadm/cli/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

from synadm import cli

import time

# helper function to retrieve functions from within this package from another
# package (e.g used in ctx.invoke calls)
Expand Down Expand Up @@ -136,6 +137,89 @@ def deactivate(ctx, helper, user_id, gdpr_erase):
click.echo("Abort.")


@user.command(name="prune-devices")
@click.argument("user_id", type=str)
@click.option(
"--list-only", "-l", is_flag=True, default=False,
help="""dry-run: does not perform the deletion but shows what would be
done. If you want to list all sessions, you can also use the whois
command.""",
show_default=True)
@click.option(
"--min-days", "-d", type=int, default=90,
help="""how many days need to have passed from the last time a device
was seen for it to be deleted.""",
show_default=True)
@click.option(
"--min-surviving", "-s", type=int, default=1,
help="""stop processing devices when only this number of devices is
left for this user. Allows to reduce disruption by preserving recently
used devices/sessions.""",
show_default=True)
@click.option(
"--device-id", "-i", type=str, default=None,
help="""only search devices with this ID.
The other options still apply if they're not 0.""",
show_default=True)
@click.pass_obj
@click.pass_context
def prune(ctx, helper, user_id, list_only, min_days, min_surviving, device_id):
""" deletes devices of a user and invalidates any access token associated
with them. Starts from deleting the oldest devices, not seen in a
number of days, which may be abandoned.
Note that this will affect the encryption and decryption of messages
sent by other users to this user or to rooms where the user is present.
"""
# ctx.invoke(user_details_cmd, user_id=user_id)
devices_data = helper.api.user_devices(user_id)
devices_todelete = []
devices_count = devices_data.get("total", 0)
if devices_count <= min_surviving:
# Nothing to do
return

devices = devices_data.get("devices", [])
devices.sort(key=lambda k : k["last_seen_ts"])
for device in devices:
if devices_count-len(devices_todelete) <= min_surviving:
break
if device_id:
if device.get("device_id", None) == device_id:
devices_todelete.append(device)
# We found all we were looking for
break
else:
continue
if min_days:
# Get the UNIX epoch in ms the device was last seen.
seen = device.get("last_seen_ts", None)
# A device with "null" as last seen was seen a very long time ago.
# Otherwise skip the device if it was seen recently enough.
if seen and time.time()-(seen/1000) > min_days*3600*24:
continue
# If no conditions were met, just add to the devices to delete.
devices_todelete.append(device)

if not devices_todelete:
# We didn't find anything to do.
if helper.output_format == "human":
click.echo("User {} had no relevant devices to delete.".format(user_id))
return

helper.output(devices_todelete)
if not list_only:
devices_todelete_ids = [d.get("device_id", None) for d in devices_todelete]
deleted = helper.api.user_devices_delete(user_id, devices_todelete_ids)
# We should have received an empty dict
if len(deleted) > 0:
click.echo("Failed deleting user {} devices: {}."
.format(user_id, deleted))
raise SystemExit(1)
if helper.output_format == "human":
click.echo("User {} devices successfully deleted: {}."
.format(user_id, devices_todelete_ids))


@user.command(name="password")
@click.argument("user_id", type=str)
@click.option(
Expand Down

0 comments on commit 375c573

Please sign in to comment.