Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HP-2429 | feat: migrate old user from Tunnistamo to Keycloak #102

Merged
merged 2 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 31 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,9 @@ OIDC_API_TOKEN_AUTH = {
# authorization server configuration and public keys are "remembered".
# The value is in seconds. Default is 24 hours.
"OIDC_CONFIG_EXPIRATION_TIME": 600,

# Allow only algorithms that we actually use. In case of tunnistamo and
# tunnistus only RS256 is used with API access tokens.
# tunnistus only RS256 is used with API access tokens.
"ALLOWED_ALGORITHMS": ["RS256"],
}
```
Expand Down Expand Up @@ -417,6 +417,34 @@ If you're not allowing users to log in with passwords, you may disable the
username/password form from Django admin login page by setting `HELUSERS_PASSWORD_LOGIN_DISABLED`
to `True`.


#### Migrating old user from Tunnistamo to Keycloak

By default, the migration logic is configured to support migrating users from Tunnistamo
AD authentication to Keycloak AD authentication. The migration should be tested by the
service before enabling it in production. This migration logic most likely shouldn't be
configured for other authentication methods besides AD (i.e. staff/admin) users.

When transitioning from one authentication provider to another, it is possible to
migrate the old user data for the new user with a different UUID. Migration is done by
finding the old user instance and replacing its UUID with the new one from the token
payload. So instead of creating a new user instance, we update the old one. Migration
happens one user at a time upon login.

Feature can be configured using the following settings.
- `HELUSERS_USER_MIGRATE_ENABLED`: Enable the feature. Defaults to `False`.
- `HELUSERS_USER_MIGRATE_EMAIL_DOMAINS`: Whitelisted email domains for migration.
Defaults to `["hel.fi"]`.
- `HELUSERS_USER_MIGRATE_AMRS` which authentication methods are used for migration.
Defaults to `["helsinkiad"]`.

Migration logic is only run on certain conditions:
- Correct authentication method is used (AMR-claim)
- Email domain is correct
- User with the new UUID doesn't exist yet
- Old user is found by email
- Username has been generated by helusers.utils.uuid_to_username for the old user

# Development

Virtual Python environment can be used. For example:
Expand Down Expand Up @@ -456,7 +484,7 @@ tox
This project uses
[`black`](https://github.com/ambv/black),
[`flake8`](https://github.com/pycqa/flake8) and
[`isort`](https://github.com/timothycrosley/isort)
[`isort`](https://github.com/pycqa/isort)
for code formatting and quality checking. Project follows the basic
black config, without any modifications.

Expand Down
112 changes: 112 additions & 0 deletions helusers/tests/test_migrate_user.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import uuid

import pytest
from django.contrib.auth import get_user_model

from helusers.user_utils import get_or_create_user, migrate_user
from helusers.utils import uuid_to_username


@pytest.fixture(autouse=True)
def setup_migrate(settings):
settings.HELUSERS_USER_MIGRATE_ENABLED = True
settings.HELUSERS_USER_MIGRATE_AMRS = ["a", "b"]
settings.HELUSERS_USER_MIGRATE_EMAIL_DOMAINS = ["example.com", "example.org"]


@pytest.mark.parametrize(
"migrate_enabled, amr, email, good_username, expect_migration",
[
pytest.param(True, ["a"], "auser@example.org", True, True, id="migrate"),
pytest.param(
True, ["a"], "auser@example.org", False, False, id="wrong_username"
),
pytest.param(True, ["c"], "auser@example.org", True, False, id="wrong_amr"),
pytest.param(True, ["a"], "auser@example.net", True, False, id="wrong_domain"),
pytest.param(False, ["a"], "auser@example.org", True, False, id="disabled"),
],
)
@pytest.mark.django_db
def test_migrate_user(
settings, migrate_enabled, amr, email, good_username, expect_migration
):
settings.HELUSERS_USER_MIGRATE_ENABLED = migrate_enabled
old_uuid = uuid.uuid4()
new_uuid = uuid.uuid4()
old_username = uuid_to_username(old_uuid) if good_username else str(old_uuid)
user_model = get_user_model()
user = user_model.objects.create(
uuid=old_uuid,
username=old_username,
first_name="A",
last_name="User",
email=email,
)

payload = {
"sub": str(new_uuid),
"amr": amr,
"email": user.email,
"first_name": user.first_name,
"last_name": user.last_name,
}

migrate_user(user_id=str(new_uuid), payload=payload)

if expect_migration:
user.refresh_from_db()
assert user.uuid == new_uuid
assert user.username == uuid_to_username(new_uuid)
else:
user.refresh_from_db()
assert user.uuid == old_uuid
assert user.username == old_username


@pytest.mark.parametrize(
"migrate_enabled, amr, email, good_username, expect_migration",
[
pytest.param(True, ["a"], "auser@example.org", True, True, id="migrate"),
pytest.param(
True, ["a"], "auser@example.org", False, False, id="wrong_username"
),
pytest.param(True, ["c"], "auser@example.org", True, False, id="wrong_amr"),
pytest.param(True, ["a"], "auser@example.net", True, False, id="wrong_domain"),
pytest.param(False, ["a"], "auser@example.org", True, False, id="disabled"),
],
)
@pytest.mark.django_db
def test_get_or_create_user_migrate_user(
settings, migrate_enabled, amr, email, good_username, expect_migration
):
settings.HELUSERS_USER_MIGRATE_ENABLED = migrate_enabled
old_uuid = uuid.uuid4()
new_uuid = uuid.uuid4()
user_model = get_user_model()
old_user = user_model.objects.create(
uuid=old_uuid,
username=uuid_to_username(old_uuid) if good_username else str(old_uuid),
first_name="A",
last_name="User",
email=email,
)

payload = {
"sub": str(new_uuid),
"amr": amr,
"email": old_user.email,
"first_name": old_user.first_name,
"last_name": old_user.last_name,
}

user = get_or_create_user(payload)

if expect_migration:
assert user_model.objects.count() == 1
assert user.uuid == new_uuid
assert user.username == uuid_to_username(new_uuid)
else:
assert user_model.objects.count() == 2
assert user_model.objects.filter(uuid=old_uuid).exists()
assert user.uuid == new_uuid
assert user.username == uuid_to_username(new_uuid)
78 changes: 78 additions & 0 deletions helusers/user_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from django.db import IntegrityError, transaction
from django.utils.translation import gettext as _

from helusers.utils import uuid_to_username

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -111,7 +113,81 @@ def convert_to_uuid(convertable, namespace=None):
return generated_uuid


def migrate_user(user_id: str, payload: dict):
"""Migrate old user from Tunnistamo to Keycloak.

Migration logic is only run on certain conditions:
- Correct authentication method is used (AMR-claim)
- Email domain is correct
- User with the new UUID doesn't exist yet
- Old user is found by email
- Old user has username generated by helusers.utils.uuid_to_username

Instead of allowing a new user to be created the migration is done by
replacing the old user UUID with the one from the incoming token
payload. Logic which is run later should take care of updating other
user related fields.

Primary key is separate from the user UUID, so the user UUID can be
changed. This migration should therefore retain all the data related
to the user.

Migration logic only supports authentication methods from this package
and Python Social Auth pipeline helusers.defaults.SOCIAL_AUTH_PIPELINE.
This doesn't support migrating users which are using e.g. a different
pipeline for Python Social Auth (e.g. the default pipeline).
"""
if not getattr(settings, "HELUSERS_USER_MIGRATE_ENABLED", False):
return

amrs_to_migrate = getattr(settings, "HELUSERS_USER_MIGRATE_AMRS", ["helsinkiad"])
domains_to_migrate = getattr(
settings, "HELUSERS_USER_MIGRATE_EMAIL_DOMAINS", ["hel.fi"]
)

# Don't continue if not using the expected authentication method.
if not (
(amr := payload.get("amr"))
and isinstance(amr, list)
and any(value in amrs_to_migrate for value in amr)
):
logger.debug(f"User {user_id} has wrong amr: {amr}, not trying to migrate.")
return

uid = UUID(user_id)
email = payload.get("email")

if not email or not any(
[email.endswith(f"@{domain}") for domain in domains_to_migrate]
):
logger.debug(
f"User {user_id} email: {email} doesn't have a whitelisted domain, not trying to migrate."
)
return

user_model = get_user_model()

if user_model.objects.filter(uuid=uid).exists():
logger.debug(f"User {user_id} exist, not trying to migrate.")
return

users = user_model.objects.filter(email=email, username__startswith="u-")

if users.count() > 1:
logger.warning(
f"Multiple users found for email {email} when migrating user {user_id}."
)

if user := users.first():
logger.info(f"Migrating user {user.uuid} to {user_id}.")

user.uuid = uid
user.username = uuid_to_username(uid)
user.save()


def get_or_create_user(payload, oidc=False):
"""Get, create or update a user based on the payload."""
user_id = payload.get("sub")
if not user_id:
msg = _("Invalid payload. sub missing")
Expand All @@ -126,6 +202,8 @@ def get_or_create_user(payload, oidc=False):
namespace = payload.get("tid")
user_id = convert_to_uuid(user_id, namespace)

migrate_user(user_id, payload)

try_again = False
try:
user = _try_create_or_update(user_id, payload, oidc)
Expand Down
Loading