diff --git a/README.md b/README.md index 2c969a1..47a361f 100644 --- a/README.md +++ b/README.md @@ -159,9 +159,9 @@ OIDC_API_TOKEN_AUTH = { # authorization server configuration and public keys are "remembered". # The value is in seconds. Default is 24 hours. "OIDC_CONFIG_EXPIRATION_TIME": 600, - + # Allow only algorithms that we actually use. In case of tunnistamo and - # tunnistus only RS256 is used with API access tokens. + # tunnistus only RS256 is used with API access tokens. "ALLOWED_ALGORITHMS": ["RS256"], } ``` @@ -417,6 +417,34 @@ If you're not allowing users to log in with passwords, you may disable the username/password form from Django admin login page by setting `HELUSERS_PASSWORD_LOGIN_DISABLED` to `True`. + +#### Migrating old user from Tunnistamo to Keycloak + +By default, the migration logic is configured to support migrating users from Tunnistamo +AD authentication to Keycloak AD authentication. The migration should be tested by the +service before enabling it in production. This migration logic most likely shouldn't be +configured for other authentication methods besides AD (i.e. staff/admin) users. + +When transitioning from one authentication provider to another, it is possible to +migrate the old user data for the new user with a different UUID. Migration is done by +finding the old user instance and replacing its UUID with the new one from the token +payload. So instead of creating a new user instance, we update the old one. Migration +happens one user at a time upon login. + +Feature can be configured using the following settings. +- `HELUSERS_USER_MIGRATE_ENABLED`: Enable the feature. Defaults to `False`. +- `HELUSERS_USER_MIGRATE_EMAIL_DOMAINS`: Whitelisted email domains for migration. + Defaults to `["hel.fi"]`. +- `HELUSERS_USER_MIGRATE_AMRS` which authentication methods are used for migration. + Defaults to `["helsinkiad"]`. + +Migration logic is only run on certain conditions: +- Correct authentication method is used (AMR-claim) +- Email domain is correct +- User with the new UUID doesn't exist yet +- Old user is found by email +- Username has been generated by helusers.utils.uuid_to_username for the old user + # Development Virtual Python environment can be used. For example: @@ -456,7 +484,7 @@ tox This project uses [`black`](https://github.com/ambv/black), [`flake8`](https://github.com/pycqa/flake8) and -[`isort`](https://github.com/timothycrosley/isort) +[`isort`](https://github.com/pycqa/isort) for code formatting and quality checking. Project follows the basic black config, without any modifications. diff --git a/helusers/tests/test_migrate_user.py b/helusers/tests/test_migrate_user.py new file mode 100644 index 0000000..8416d10 --- /dev/null +++ b/helusers/tests/test_migrate_user.py @@ -0,0 +1,112 @@ +import uuid + +import pytest +from django.contrib.auth import get_user_model + +from helusers.user_utils import get_or_create_user, migrate_user +from helusers.utils import uuid_to_username + + +@pytest.fixture(autouse=True) +def setup_migrate(settings): + settings.HELUSERS_USER_MIGRATE_ENABLED = True + settings.HELUSERS_USER_MIGRATE_AMRS = ["a", "b"] + settings.HELUSERS_USER_MIGRATE_EMAIL_DOMAINS = ["example.com", "example.org"] + + +@pytest.mark.parametrize( + "migrate_enabled, amr, email, good_username, expect_migration", + [ + pytest.param(True, ["a"], "auser@example.org", True, True, id="migrate"), + pytest.param( + True, ["a"], "auser@example.org", False, False, id="wrong_username" + ), + pytest.param(True, ["c"], "auser@example.org", True, False, id="wrong_amr"), + pytest.param(True, ["a"], "auser@example.net", True, False, id="wrong_domain"), + pytest.param(False, ["a"], "auser@example.org", True, False, id="disabled"), + ], +) +@pytest.mark.django_db +def test_migrate_user( + settings, migrate_enabled, amr, email, good_username, expect_migration +): + settings.HELUSERS_USER_MIGRATE_ENABLED = migrate_enabled + old_uuid = uuid.uuid4() + new_uuid = uuid.uuid4() + old_username = uuid_to_username(old_uuid) if good_username else str(old_uuid) + user_model = get_user_model() + user = user_model.objects.create( + uuid=old_uuid, + username=old_username, + first_name="A", + last_name="User", + email=email, + ) + + payload = { + "sub": str(new_uuid), + "amr": amr, + "email": user.email, + "first_name": user.first_name, + "last_name": user.last_name, + } + + migrate_user(user_id=str(new_uuid), payload=payload) + + if expect_migration: + user.refresh_from_db() + assert user.uuid == new_uuid + assert user.username == uuid_to_username(new_uuid) + else: + user.refresh_from_db() + assert user.uuid == old_uuid + assert user.username == old_username + + +@pytest.mark.parametrize( + "migrate_enabled, amr, email, good_username, expect_migration", + [ + pytest.param(True, ["a"], "auser@example.org", True, True, id="migrate"), + pytest.param( + True, ["a"], "auser@example.org", False, False, id="wrong_username" + ), + pytest.param(True, ["c"], "auser@example.org", True, False, id="wrong_amr"), + pytest.param(True, ["a"], "auser@example.net", True, False, id="wrong_domain"), + pytest.param(False, ["a"], "auser@example.org", True, False, id="disabled"), + ], +) +@pytest.mark.django_db +def test_get_or_create_user_migrate_user( + settings, migrate_enabled, amr, email, good_username, expect_migration +): + settings.HELUSERS_USER_MIGRATE_ENABLED = migrate_enabled + old_uuid = uuid.uuid4() + new_uuid = uuid.uuid4() + user_model = get_user_model() + old_user = user_model.objects.create( + uuid=old_uuid, + username=uuid_to_username(old_uuid) if good_username else str(old_uuid), + first_name="A", + last_name="User", + email=email, + ) + + payload = { + "sub": str(new_uuid), + "amr": amr, + "email": old_user.email, + "first_name": old_user.first_name, + "last_name": old_user.last_name, + } + + user = get_or_create_user(payload) + + if expect_migration: + assert user_model.objects.count() == 1 + assert user.uuid == new_uuid + assert user.username == uuid_to_username(new_uuid) + else: + assert user_model.objects.count() == 2 + assert user_model.objects.filter(uuid=old_uuid).exists() + assert user.uuid == new_uuid + assert user.username == uuid_to_username(new_uuid) diff --git a/helusers/user_utils.py b/helusers/user_utils.py index fbfc31d..63734b2 100644 --- a/helusers/user_utils.py +++ b/helusers/user_utils.py @@ -6,6 +6,8 @@ from django.db import IntegrityError, transaction from django.utils.translation import gettext as _ +from helusers.utils import uuid_to_username + logger = logging.getLogger(__name__) @@ -111,7 +113,81 @@ def convert_to_uuid(convertable, namespace=None): return generated_uuid +def migrate_user(user_id: str, payload: dict): + """Migrate old user from Tunnistamo to Keycloak. + + Migration logic is only run on certain conditions: + - Correct authentication method is used (AMR-claim) + - Email domain is correct + - User with the new UUID doesn't exist yet + - Old user is found by email + - Old user has username generated by helusers.utils.uuid_to_username + + Instead of allowing a new user to be created the migration is done by + replacing the old user UUID with the one from the incoming token + payload. Logic which is run later should take care of updating other + user related fields. + + Primary key is separate from the user UUID, so the user UUID can be + changed. This migration should therefore retain all the data related + to the user. + + Migration logic only supports authentication methods from this package + and Python Social Auth pipeline helusers.defaults.SOCIAL_AUTH_PIPELINE. + This doesn't support migrating users which are using e.g. a different + pipeline for Python Social Auth (e.g. the default pipeline). + """ + if not getattr(settings, "HELUSERS_USER_MIGRATE_ENABLED", False): + return + + amrs_to_migrate = getattr(settings, "HELUSERS_USER_MIGRATE_AMRS", ["helsinkiad"]) + domains_to_migrate = getattr( + settings, "HELUSERS_USER_MIGRATE_EMAIL_DOMAINS", ["hel.fi"] + ) + + # Don't continue if not using the expected authentication method. + if not ( + (amr := payload.get("amr")) + and isinstance(amr, list) + and any(value in amrs_to_migrate for value in amr) + ): + logger.debug(f"User {user_id} has wrong amr: {amr}, not trying to migrate.") + return + + uid = UUID(user_id) + email = payload.get("email") + + if not email or not any( + [email.endswith(f"@{domain}") for domain in domains_to_migrate] + ): + logger.debug( + f"User {user_id} email: {email} doesn't have a whitelisted domain, not trying to migrate." + ) + return + + user_model = get_user_model() + + if user_model.objects.filter(uuid=uid).exists(): + logger.debug(f"User {user_id} exist, not trying to migrate.") + return + + users = user_model.objects.filter(email=email, username__startswith="u-") + + if users.count() > 1: + logger.warning( + f"Multiple users found for email {email} when migrating user {user_id}." + ) + + if user := users.first(): + logger.info(f"Migrating user {user.uuid} to {user_id}.") + + user.uuid = uid + user.username = uuid_to_username(uid) + user.save() + + def get_or_create_user(payload, oidc=False): + """Get, create or update a user based on the payload.""" user_id = payload.get("sub") if not user_id: msg = _("Invalid payload. sub missing") @@ -126,6 +202,8 @@ def get_or_create_user(payload, oidc=False): namespace = payload.get("tid") user_id = convert_to_uuid(user_id, namespace) + migrate_user(user_id, payload) + try_again = False try: user = _try_create_or_update(user_id, payload, oidc)