From 31a5cce1bc48f182dfc8b32af5d87eaaf4a90942 Mon Sep 17 00:00:00 2001 From: Chris Ellrich Date: Fri, 18 Apr 2025 16:27:37 +0200 Subject: [PATCH] re-initialised repo to correct author --- .env.example | 3 + .gitignore | 178 +++++++++++++++++++++++++++++++++++ LICENSE | 21 +++++ README.md | 84 +++++++++++++++++ cloaksmith/__init__.py | 1 + cloaksmith/auth.py | 157 ++++++++++++++++++++++++++++++ cloaksmith/cli.py | 109 +++++++++++++++++++++ cloaksmith/keycloak_roles.py | 89 ++++++++++++++++++ cloaksmith/log.py | 48 ++++++++++ role_mappings.csv.example | 5 + setup.py | 23 +++++ 11 files changed, 718 insertions(+) create mode 100644 .env.example create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 README.md create mode 100644 cloaksmith/__init__.py create mode 100644 cloaksmith/auth.py create mode 100644 cloaksmith/cli.py create mode 100644 cloaksmith/keycloak_roles.py create mode 100644 cloaksmith/log.py create mode 100644 role_mappings.csv.example create mode 100644 setup.py diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..6a6b0f7 --- /dev/null +++ b/.env.example @@ -0,0 +1,3 @@ +KEYCLOAK_URL=https://your-keycloak/ +KEYCLOAK_REALM=your-realm +KEYCLOAK_CLIENT_ID=your-app-client-id diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d7d681a --- /dev/null +++ b/.gitignore @@ -0,0 +1,178 @@ +.env +.keycloak_token.json +role_mappings.csv + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# UV +# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +#uv.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/latest/usage/project/#working-with-version-control +.pdm.toml +.pdm-python +.pdm-build/ + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ + +# Ruff stuff: +.ruff_cache/ + +# PyPI configuration file +.pypirc \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..0e03824 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) [2025] [Chris Ellrich] + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..b49102a --- /dev/null +++ b/README.md @@ -0,0 +1,84 @@ +# Cloaksmith + +Cloaksmith is a CLI tool designed to interact with a Keycloak server using OAuth 2.0 Device Authorization Grant. It allows you to import roles from a CSV file and create role mappings, with a focus on simplicity and extensibility. + +## Prerequisites + +- Python 3.7 or higher +- Keycloak server +- Keycloak client with OAuth 2.0 Device Authorization Grant enabled + +## Installation + +1. Clone the repository: + + ```shell + git clone https://git.ellri.ch/c.ellrich/cloaksmith + ``` + +2. Change to the project directory: + + ```shell + cd cloaksmith + ``` + +3. Install Cloaksmith using `pip`: + + ```shell + pip install . + ``` + +## Configuration + +1. Create a Keycloak client with OAuth 2.0 Device Authorization Grant enabled. No other features are required. Refer to the Keycloak documentation for detailed instructions. + +2. Initialize the configuration by running: + + ```shell + cloaksmith init-env + ``` + + This will prompt you to enter the following values: + + - `KEYCLOAK_URL` (e.g. `https://your-keycloak/`) + - `KEYCLOAK_REALM` (e.g. `master`) + - `KEYCLOAK_CLIENT_ID` (e.g. `your-app-client-id`) + + The `.env` file will be saved to the appropriate config directory: + + - **Linux/macOS:** `~/.config/cloaksmith/.env` + - **Windows:** `%APPDATA%\cloaksmith\.env` + +3. Alternatively, you can specify a custom `.env` file using the `--env-file` option for any command: + + ```shell + cloaksmith import-roles --env-file /path/to/.env ... + ``` + +## Usage + +Once installed, you can use the `cloaksmith` command to interact with your Keycloak server. + +### General Help + +To see the available commands and options, run: + +```shell +cloaksmith --help +``` + +### Import Roles and Role Mappings to a Client + +Create a CSV file based on the `role_mappings.csv.example` file provided. + +Run the following command to import roles and map them to groups: + +```shell +cloaksmith import-roles --client-id --realm +``` + +## Extensibility +Cloaksmith is designed to be easily extensible. You can add new commands or functionality by modifying the CLI or the underlying modules. + +## License +This project is licensed under the terms specified in the LICENSE file. \ No newline at end of file diff --git a/cloaksmith/__init__.py b/cloaksmith/__init__.py new file mode 100644 index 0000000..485f44a --- /dev/null +++ b/cloaksmith/__init__.py @@ -0,0 +1 @@ +__version__ = "0.1.1" diff --git a/cloaksmith/auth.py b/cloaksmith/auth.py new file mode 100644 index 0000000..ace763a --- /dev/null +++ b/cloaksmith/auth.py @@ -0,0 +1,157 @@ +import os +import json +import time +import requests +from pathlib import Path +from cloaksmith.log import get_logger + +log = get_logger() + + +class AuthSession: + def __init__(self, base_url, realm, client_id, no_cache=False): + self.base_url = base_url + self.realm = realm + self.client_id = client_id + self.no_cache = no_cache + if os.name == "nt": + cache_dir = Path(os.getenv("LOCALAPPDATA")) / "cloaksmith" + else: + cache_dir = Path.home() / ".cache" / "cloaksmith" + cache_dir.mkdir(parents=True, exist_ok=True) + self.cache_path = cache_dir / "token.json" + self.token_set = None + if not self.no_cache: + self._load_cached_token() + + def _cache_token(self): + """ + Cache the token to a file. + """ + if self.no_cache: + return + try: + self.cache_path.parent.mkdir(parents=True, exist_ok=True) + with self.cache_path.open("w") as f: + json.dump({ + "realm": self.realm, + "client_id": self.client_id, + "token": self.token_set + }, f) + log.info("Token cached successfully.") + except Exception as e: + log.error(f"Failed to cache token: {e}") + + def _load_cached_token(self): + """ + Load the cached token from a file. + """ + if not self.cache_path.exists(): + return + try: + with self.cache_path.open() as f: + data = json.load(f) + + if data["realm"] != self.realm or data["client_id"] != self.client_id: + return + + token = data["token"] + issued_at = token.get("timestamp", 0) + expires_in = token.get("expires_in", 0) + + if time.time() < issued_at + expires_in - 10: + self.token_set = token + log.info("Loaded cached access token.") + else: + self.token_set = token + log.info( + "Cached token expired. Will attempt refresh on first request.") + except Exception as e: + log.warning(f"Failed to load cached token: {e}") + + def authenticate(self): + """ + Authenticate the user by obtaining an access token. + """ + try: + if self.token_set: + return + + device_url = f"{self.base_url}/realms/{self.realm}/protocol/openid-connect/auth/device" + token_url = f"{self.base_url}/realms/{self.realm}/protocol/openid-connect/token" + + res = requests.post(device_url, data={"client_id": self.client_id}) + res.raise_for_status() + device = res.json() + + direct_link = f"{device['verification_uri']}?user_code={device['user_code']}" + log.info(f"Go to the following URL to authenticate: {direct_link}") + + while True: + poll = requests.post(token_url, data={ + "client_id": self.client_id, + "grant_type": "urn:ietf:params:oauth:grant-type:device_code", + "device_code": device["device_code"] + }) + if poll.status_code == 200: + self.token_set = poll.json() + self.token_set["timestamp"] = int(time.time()) + self._cache_token() + log.info("Authentication successful.") + return + elif poll.status_code in (400, 428): + continue + else: + log.error(f"Authentication failed: {poll.text}") + raise Exception(f"Authentication failed: {poll.text}") + except Exception as e: + log.error(f"Error during authentication: {e}") + raise + + def refresh_token(self): + """ + Refresh the access token using the refresh token. + """ + try: + token_url = f"{self.base_url}/realms/{self.realm}/protocol/openid-connect/token" + res = requests.post(token_url, data={ + "grant_type": "refresh_token", + "client_id": self.client_id, + "refresh_token": self.token_set["refresh_token"], + }) + res.raise_for_status() + self.token_set = res.json() + self.token_set["timestamp"] = int(time.time()) + self._cache_token() + log.info("Token refreshed.") + except Exception as e: + log.error(f"Failed to refresh token: {e}") + raise + + def request(self, method, url, **kwargs): + """ + Make an authenticated request to the specified URL. + """ + try: + if not self.token_set: + raise Exception("Authentication required.") + + headers = kwargs.pop("headers", {}) + headers["Authorization"] = f"Bearer {self.token_set['access_token']}" + headers["Content-Type"] = "application/json" + kwargs["headers"] = headers + + res = requests.request(method, url, **kwargs) + + if res.status_code == 401: + log.warning("Access token expired. Refreshing.") + self.refresh_token() + headers["Authorization"] = f"Bearer {self.token_set['access_token']}" + kwargs["headers"] = headers + res = requests.request(method, url, **kwargs) + + return res + + except Exception as e: + log.error(f"Request failed: {e}") + raise diff --git a/cloaksmith/cli.py b/cloaksmith/cli.py new file mode 100644 index 0000000..d1c30f4 --- /dev/null +++ b/cloaksmith/cli.py @@ -0,0 +1,109 @@ +import click +import os +from pathlib import Path +import os +from dotenv import load_dotenv + +from cloaksmith.log import setup_logging, get_logger +from cloaksmith.auth import AuthSession +from cloaksmith.keycloak_roles import KeycloakClientRoleManager +from cloaksmith import __version__ + +log = get_logger() + + +def load_default_env(): + if os.name == "nt": + config_path = Path(os.getenv("APPDATA")) / "cloaksmith" / ".env" + else: + config_path = Path.home() / ".config" / "cloaksmith" / ".env" + + if config_path.exists(): + load_dotenv(dotenv_path=config_path) + else: + log.warning( + f"No .env file found at {config_path}. Please run 'cloaksmith init-env' to create one.") + exit(1) + + +def load_env(ctx): + env_file = ctx.obj.get("env_file") + if env_file: + load_dotenv(dotenv_path=env_file) + log.info("Loaded environment variables from provided .env file.") + else: + load_default_env() + log.info("Loaded environment variables from default .env file.") + + +@click.group() +@click.help_option("-h", "--help") +@click.version_option(__version__) +@click.option('--log-level', default="INFO", type=click.Choice(["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], case_sensitive=False), help="Set the log level") +@click.option('--env-file', type=click.Path(exists=True), help="Path to a .env file") +@click.pass_context +def cli(ctx, log_level, env_file): + """ + A command-line interface for performing various Keycloak administration tasks. + """ + setup_logging(log_level) + ctx.obj = {"env_file": env_file} + + +@cli.command() +@click.help_option("-h", "--help") +def init_env(): + """ + Initialize a .env file in the appropriate config directory by prompting for values. + """ + url = click.prompt( + "KEYCLOAK_URL", prompt_suffix=" (e.g. https://your-keycloak/): ") + realm = click.prompt( + "KEYCLOAK_REALM", prompt_suffix=" (e.g. your-realm): ") + client_id = click.prompt("KEYCLOAK_CLIENT_ID", + prompt_suffix=" (e.g. your-app-client-id): ") + + if os.name == "nt": + config_dir = Path(os.getenv("APPDATA")) / "cloaksmith" + else: + config_dir = Path.home() / ".config" / "cloaksmith" + config_dir.mkdir(parents=True, exist_ok=True) + + env_path = config_dir / ".env" + with env_path.open("w") as f: + f.write(f"KEYCLOAK_URL={url}\n") + f.write(f"KEYCLOAK_REALM={realm}\n") + f.write(f"KEYCLOAK_CLIENT_ID={client_id}\n") + + log.info(f".env file written to: {env_path}") + + +@cli.command() +@click.help_option("-h", "--help") +@click.argument("csv_path", type=click.Path(exists=True)) +@click.option("--client-id", required=True, help="Target client ID") +@click.option("--realm", required=True, help="Target realm to modify") +@click.option("--no-cache", is_flag=True, help="Disable token caching") +@click.pass_context +def import_roles(ctx, csv_path, client_id, realm, no_cache): + """ + Import roles and map them to groups using a CSV file. + + CSVPATH: Path to the CSV file containing role and group mappings. + """ + load_env(ctx) + + base_url = os.getenv("KEYCLOAK_URL") + login_realm = os.getenv("KEYCLOAK_REALM") + client_id_env = os.getenv("KEYCLOAK_CLIENT_ID") + + auth = AuthSession(base_url=base_url, realm=login_realm, + client_id=client_id_env, no_cache=no_cache) + auth.authenticate() + + kc = KeycloakClientRoleManager(auth, target_realm=realm) + kc.import_roles_and_mappings(client_id, csv_path) + + +if __name__ == "__main__": + cli() diff --git a/cloaksmith/keycloak_roles.py b/cloaksmith/keycloak_roles.py new file mode 100644 index 0000000..2272c5b --- /dev/null +++ b/cloaksmith/keycloak_roles.py @@ -0,0 +1,89 @@ +import csv +from cloaksmith.log import get_logger +log = get_logger() + + +class KeycloakClientRoleManager: + def __init__(self, auth_session, target_realm): + self.auth = auth_session + self.target_realm = target_realm + self.base_url = auth_session.base_url + + def get_client_id(self, client_id): + url = f"{self.base_url}/admin/realms/{self.target_realm}/clients" + res = self.auth.request("GET", url) + clients = res.json() + client = next((c for c in clients if c["clientId"] == client_id), None) + if not client: + log.error( + f"Client '{client_id}' not found in realm '{self.target_realm}'") + raise ValueError( + f"Client '{client_id}' not found in realm '{self.target_realm}'") + log.info(f"Found client ID for '{client_id}': {client['id']}") + return client["id"] + + def create_role(self, client_internal_id, role_name): + url = f"{self.base_url}/admin/realms/{self.target_realm}/clients/{client_internal_id}/roles" + res = self.auth.request("POST", url, json={"name": role_name}) + if res.status_code not in (201, 409): + log.error(f"Failed to create role '{role_name}': {res.text}") + raise Exception(f"Failed to create role '{role_name}': {res.text}") + log.info(f"Role '{role_name}' created successfully or already exists.") + + def get_role(self, client_internal_id, role_name): + url = f"{self.base_url}/admin/realms/{self.target_realm}/clients/{client_internal_id}/roles/{role_name}" + res = self.auth.request("GET", url) + if res.status_code == 404: + log.error( + f"Role '{role_name}' not found in client '{client_internal_id}'") + raise ValueError(f"Role '{role_name}' not found") + log.info(f"Found role '{role_name}' in client '{client_internal_id}'") + return res.json() + + def get_group_id(self, group_name): + url = f"{self.base_url}/admin/realms/{self.target_realm}/groups" + res = self.auth.request("GET", url) + groups = res.json() + group = next((g for g in groups if g["name"] == group_name), None) + if not group: + log.error( + f"Group '{group_name}' not found in realm '{self.target_realm}'") + raise ValueError( + f"Group '{group_name}' not found in realm '{self.target_realm}'") + log.info(f"Found group ID for '{group_name}': {group['id']}") + return group["id"] + + def map_role_to_group(self, group_id, client_internal_id, role_obj): + url = f"{self.base_url}/admin/realms/{self.target_realm}/groups/{group_id}/role-mappings/clients/{client_internal_id}" + res = self.auth.request("POST", url, json=[role_obj]) + if res.status_code not in (204, 409): + log.error(f"Failed to map role to group '{group_id}': {res.text}") + raise Exception(f"Failed to map role to group: {res.text}") + log.info(f"Role mapped to group '{group_id}' successfully.") + + def import_roles_and_mappings(self, client_id, csv_path): + client_internal_id = self.get_client_id(client_id) + failures = [] + + with open(csv_path, newline='') as csvfile: + reader = csv.DictReader(csvfile) + for row in reader: + role = row["role_name"] + group = row["group_name"] + log.info(f"Processing: Role='{role}' -> Group='{group}'") + + try: + self.create_role(client_internal_id, role) + group_id = self.get_group_id(group) + role_obj = self.get_role(client_internal_id, role) + self.map_role_to_group( + group_id, client_internal_id, role_obj) + except Exception as e: + msg = f"Failed to process role '{role}' for group '{group}': {e}" + log.error(msg) + failures.append(msg) + + if failures: + log.warning(f"Completed with {len(failures)} error(s).") + else: + log.info("Role cloning and mapping completed successfully.") diff --git a/cloaksmith/log.py b/cloaksmith/log.py new file mode 100644 index 0000000..98a11a0 --- /dev/null +++ b/cloaksmith/log.py @@ -0,0 +1,48 @@ +# log.py +import logging + + +class ColorFormatter(logging.Formatter): + COLORS = { + "DEBUG": "\033[36m", # Cyan + "INFO": "\033[32m", # Green + "WARNING": "\033[33m", # Yellow + "ERROR": "\033[31m", # Red + "CRITICAL": "\033[41m", # Red background + "GREY": "\033[90m", # Grau + "RESET": "\033[0m" + } + + def format(self, record): + # Pad uncolored level name + raw_level = record.levelname + padded = f"{raw_level:<8}" + + # Add color after padding + color = self.COLORS.get(raw_level, "") + reset = self.COLORS["RESET"] + record.levelname = f"{color}{padded}{reset}" + record.name = f"{self.COLORS.get('GREY')}{record.name}{reset}" + return super().format(record) + + +_logger = None + +def setup_logging(level="INFO"): + global _logger + if _logger: + return _logger + + logger = logging.getLogger("keycloak-scripts") + logger.setLevel(getattr(logging, level.upper(), logging.INFO)) + + if not logger.handlers: + handler = logging.StreamHandler() + handler.setFormatter(ColorFormatter("%(asctime)s [ %(levelname)s | %(name)s ] %(message)s")) + logger.addHandler(handler) + + _logger = logger + return logger + +def get_logger(): + return _logger or setup_logging() \ No newline at end of file diff --git a/role_mappings.csv.example b/role_mappings.csv.example new file mode 100644 index 0000000..22217e9 --- /dev/null +++ b/role_mappings.csv.example @@ -0,0 +1,5 @@ +role_name,group_name +admin,admins_group +user,users_group +moderator,moderators_group +developer,developers_group diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..8bbc51d --- /dev/null +++ b/setup.py @@ -0,0 +1,23 @@ +from setuptools import setup, find_packages + +# Read version from __init__.py +version = {} +with open("cloaksmith/__init__.py") as f: + exec(f.read(), version) + +setup( + name='cloaksmith', + version=version["__version__"], + license='MIT', + packages=find_packages(), + install_requires=[ + 'Click', + 'python-dotenv', + 'requests', + ], + entry_points={ + 'console_scripts': [ + 'cloaksmith = cloaksmith.cli:cli', + ], + }, +)