Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for organizations #614

Open
carbonleakage opened this issue Nov 12, 2024 · 3 comments
Open

Support for organizations #614

carbonleakage opened this issue Nov 12, 2024 · 3 comments

Comments

@carbonleakage
Copy link

As you know keycloak has introduced the organization feature in v26. I would like to add methods to handle common organization management like create_organization, assign users to organization etc. Before I start creating a MR, could the maintainers please let me know what is the preferred way to approach large features like these which would include few new methods added to the keycloak admin class?

@srekkas
Copy link

srekkas commented Jan 20, 2025

Hi,

Do you done something in this regard? I try to do simple Organization management, but as admin and not programmer it is hard for me :) I and AI wrote simple python script for enabling organization and assigning identity provider, it is possible but too much case's to consider :)

import os
import requests
import json


# Configuration
KEYCLOAK_URL = "https://idp.test.my.domain.com"
KEYCLOAK_REALM = "master"
KEYCLOAK_CLIENT_ID = "temp-admin"
KEYCLOAK_CLIENT_SECRET = "password"
ORGANIZATION = "OrgName"
DOMAINS = ["example2.com", "example3.com"]
PROVIDERS = ["oidc"]
PROVIDER_DOMAIN = "example3.com"

# Fetch Access Token
def get_tokens():
    """
    Fetch access token, id token, and refresh token from Keycloak.
    """
    url = f"{KEYCLOAK_URL}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/token"
    payload = {
        "username": KEYCLOAK_CLIENT_ID,
        "password": KEYCLOAK_CLIENT_SECRET,
        "grant_type": "password",
        "client_id": "admin-cli",
    }
    headers = {"Content-Type": "application/x-www-form-urlencoded"}
    
    # Make the request
    response = requests.post(url, data=payload, headers=headers)
    response.raise_for_status()
    
    # Parse the tokens
    tokens = response.json()
    return {
        "access_token": tokens.get("access_token"),
        "id_token": tokens.get("id_token"),
        "refresh_token": tokens.get("refresh_token"),
    }

# Check if organizations are enabled for the realm
def is_organizations_enabled(token):
    url = f"{KEYCLOAK_URL}/admin/realms/{KEYCLOAK_REALM}"
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {token}"
    }
    response = requests.get(url, headers=headers)
    response.raise_for_status()
    realm_config = response.json()
    return realm_config.get("organizationsEnabled", False)

# Enable organizations for the realm
def enable_organizations(token):
    url = f"{KEYCLOAK_URL}/admin/realms/{KEYCLOAK_REALM}"
    payload = {"organizationsEnabled": True}
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {token}"
    }
    response = requests.put(url, json=payload, headers=headers)
    response.raise_for_status()

# Get organizations
def get_organizations(token):
    url = f"{KEYCLOAK_URL}/admin/realms/{KEYCLOAK_REALM}/organizations"
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {token}"
    }
    response = requests.get(url, headers=headers)
    response.raise_for_status()
    return response.json()

# Create organization
def create_organization(token, organization, domains):
    url = f"{KEYCLOAK_URL}/admin/realms/{KEYCLOAK_REALM}/organizations"
    payload = {
        "name": organization,
        "alias": organization,
        "enabled": True,
        "domains": domains
    }
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {token}"
    }
    response = requests.post(url, json=payload, headers=headers)
    response.raise_for_status()
    try:
        return response.json().get("id")
    except json.JSONDecodeError:
        print(f"Failed to decode response: {response.text}")
        raise

# Update organization's domains
def update_organization_domains(token, org_id, organization, domains):
    url = f"{KEYCLOAK_URL}/admin/realms/{KEYCLOAK_REALM}/organizations/{org_id}"
    payload = {
        "name": organization,
        "alias": organization,
        "domains": domains
    }
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {token}"
    }
    response = requests.put(url, json=payload, headers=headers)
    response.raise_for_status()

# Check if provider is already associated
def is_provider_associated(token, org_id, provider):
    url = f"{KEYCLOAK_URL}/admin/realms/{KEYCLOAK_REALM}/organizations/{org_id}/identity-providers/{provider}"
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {token}"
    }
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        return True
    elif response.status_code == 404:
        return False
    response.raise_for_status()

# Disassociate provider from organization
def disassociate_provider(token, org_id, provider):
    url = f"{KEYCLOAK_URL}/admin/realms/{KEYCLOAK_REALM}/organizations/{org_id}/identity-providers/{provider}"
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {token}"
    }
    response = requests.delete(url, headers=headers)
    if response.status_code == 204:
        print(f"Provider {provider} successfully disassociated from the organization.")
    else:
        response.raise_for_status()

# Check if provider exists in realm
def does_provider_exist_in_realm(token, provider):
    url = f"{KEYCLOAK_URL}/admin/realms/{KEYCLOAK_REALM}/identity-provider/instances/{provider}"
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {token}"
    }
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        return True
    elif response.status_code == 404:
        return False
    response.raise_for_status()

# Associate identity provider with organization
def associate_provider(token, org_id, provider):
    if not does_provider_exist_in_realm(token, provider):
        print(f"Provider {provider} does not exist in the realm.")
        return

    if is_provider_associated(token, org_id, provider):
        print(f"Provider {provider} is already associated with the organization.")
        return

    url = f"{KEYCLOAK_URL}/admin/realms/{KEYCLOAK_REALM}/organizations/{org_id}/identity-providers"
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {token}"
    }
    response = requests.post(url, data=provider, headers=headers)
    if response.status_code == 409:
        print(f"Provider {provider} is already associated with the organization.")
    response.raise_for_status()

def fetch_idp(token, idp_alias):

    url = f"{KEYCLOAK_URL}/admin/realms/{KEYCLOAK_REALM}/identity-provider/instances/{idp_alias}"
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {token}"
    }
    response = requests.get(url, headers=headers)
    response.raise_for_status()
    return response.json()

# Update identity provider
def update_idp(token, idp_alias, payload):

    url = f"{KEYCLOAK_URL}/admin/realms/{KEYCLOAK_REALM}/identity-provider/instances/{idp_alias}"
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {token}"
    }
    response = requests.put(url, json=payload, headers=headers)
    response.raise_for_status()

# Configure identity provider
def configure_provider(token, provider, provider_domain):
    idp_config = fetch_idp(token, provider)
    idp_config["config"].update({
        "kc.org.broker.redirect.mode.email-matches": "true",
        "kc.org.domain": provider_domain
    })
    update_idp(token, provider, idp_config)

# Logout Function
def logout(refresh_token):
    """
    Logs out the user session using the refresh token.
    """
    url = f"{KEYCLOAK_URL}/realms/{KEYCLOAK_REALM}/protocol/openid-connect/logout"
    payload = {
        "client_id": "admin-cli",
        "refresh_token": refresh_token,
    }
    headers = {"Content-Type": "application/x-www-form-urlencoded"}
    
    response = requests.post(url, data=payload, headers=headers)
    if response.status_code == 204:
        print("Logout successful.")
    else:
        print(f"Failed to logout: {response.status_code} - {response.text}")


# Main workflow
def main():
    tokens = get_tokens()
    token = tokens["access_token"]
    id_token = tokens["id_token"]
    refresh_token = tokens["refresh_token"]

    # Debug tokens
    # print(f"Access Token: {token}")
    # print(f"ID Token: {id_token}")
    # print(f"Refresh Token: {refresh_token}")

    # Check if organizations are enabled for the realm
    if not is_organizations_enabled(token):
        print("Organizations are not enabled for the realm. Enabling...")
        enable_organizations(token)

        # Verify if enabling was successful
        if not is_organizations_enabled(token):
            print("Failed to enable organizations for the realm.")
            return
        print("Organizations have been successfully enabled.")

    # Check if the organization exists
    organizations = get_organizations(token)
    org_id = next((org["id"] for org in organizations if org["name"] == ORGANIZATION), None)

    if not org_id:
        # Organization does not exist, create it
        print(f"Creating organization '{ORGANIZATION}'...")
        org_id = create_organization(token, ORGANIZATION, DOMAINS)
        print(f"Organization created: Name = {ORGANIZATION}, ID = {org_id}")
    else:
        print(f"Organization exists: Name = {ORGANIZATION}, ID = {org_id}")

    # Update organization's domains
    print(f"Updating domains for organization '{ORGANIZATION}'...")
    update_organization_domains(token, org_id,ORGANIZATION, DOMAINS)
    print(f"Domains updated for organization '{ORGANIZATION}'")

    # Manage providers
    current_providers = [p for p in PROVIDERS if does_provider_exist_in_realm(token, p)]
    associated_providers = [p for p in current_providers if is_provider_associated(token, org_id, p)]

    # Debug providers
    print(current_providers)
    print(associated_providers)

    # Disassociate providers not in PROVIDERS list
    for provider in associated_providers:
        if provider not in PROVIDERS:
            print(f"Disassociating provider '{provider}' from organization '{ORGANIZATION}'...")
            disassociate_provider(token, org_id, provider)

    # Associate missing providers
    for provider in PROVIDERS:
        print(f"Ensuring provider '{provider}' is associated with organization '{ORGANIZATION}'...")
        associate_provider(token, org_id, provider)

    # Configure providers
    for provider in PROVIDERS:
        print(f"Configuring provider '{provider}'...")
        configure_provider(token, provider, PROVIDER_DOMAIN)

    # Logout
    logout(refresh_token)

if __name__ == "__main__":
    main()

@shatandv
Copy link

Any updates on this? Interested in the feature.
Right now slowly working on a similar thing based on keycloak-orgs extension (for a SaaS app).
Open to contributing if the proposal is supported.

@kandakji
Copy link

+1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants