re-initialised repo to correct author

This commit is contained in:
Chris Ellrich 2025-04-18 16:27:37 +02:00
commit 31a5cce1bc
Signed by: c.ellrich
GPG Key ID: D1850E1719D845AE
11 changed files with 718 additions and 0 deletions

3
.env.example Normal file
View File

@ -0,0 +1,3 @@
KEYCLOAK_URL=https://your-keycloak/
KEYCLOAK_REALM=your-realm
KEYCLOAK_CLIENT_ID=your-app-client-id

178
.gitignore vendored Normal file
View File

@ -0,0 +1,178 @@
.env
.keycloak_token.json
role_mappings.csv
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
#uv.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# Ruff stuff:
.ruff_cache/
# PyPI configuration file
.pypirc

21
LICENSE Normal file
View File

@ -0,0 +1,21 @@
MIT License
Copyright (c) [2025] [Chris Ellrich]
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

84
README.md Normal file
View File

@ -0,0 +1,84 @@
# Cloaksmith
Cloaksmith is a CLI tool designed to interact with a Keycloak server using OAuth 2.0 Device Authorization Grant. It allows you to import roles from a CSV file and create role mappings, with a focus on simplicity and extensibility.
## Prerequisites
- Python 3.7 or higher
- Keycloak server
- Keycloak client with OAuth 2.0 Device Authorization Grant enabled
## Installation
1. Clone the repository:
```shell
git clone https://git.ellri.ch/c.ellrich/cloaksmith
```
2. Change to the project directory:
```shell
cd cloaksmith
```
3. Install Cloaksmith using `pip`:
```shell
pip install .
```
## Configuration
1. Create a Keycloak client with OAuth 2.0 Device Authorization Grant enabled. No other features are required. Refer to the Keycloak documentation for detailed instructions.
2. Initialize the configuration by running:
```shell
cloaksmith init-env
```
This will prompt you to enter the following values:
- `KEYCLOAK_URL` (e.g. `https://your-keycloak/`)
- `KEYCLOAK_REALM` (e.g. `master`)
- `KEYCLOAK_CLIENT_ID` (e.g. `your-app-client-id`)
The `.env` file will be saved to the appropriate config directory:
- **Linux/macOS:** `~/.config/cloaksmith/.env`
- **Windows:** `%APPDATA%\cloaksmith\.env`
3. Alternatively, you can specify a custom `.env` file using the `--env-file` option for any command:
```shell
cloaksmith import-roles --env-file /path/to/.env ...
```
## Usage
Once installed, you can use the `cloaksmith` command to interact with your Keycloak server.
### General Help
To see the available commands and options, run:
```shell
cloaksmith --help
```
### Import Roles and Role Mappings to a Client
Create a CSV file based on the `role_mappings.csv.example` file provided.
Run the following command to import roles and map them to groups:
```shell
cloaksmith import-roles --client-id <target_client_id> --realm <target_client_realm> <path_to_csv>
```
## Extensibility
Cloaksmith is designed to be easily extensible. You can add new commands or functionality by modifying the CLI or the underlying modules.
## License
This project is licensed under the terms specified in the LICENSE file.

1
cloaksmith/__init__.py Normal file
View File

@ -0,0 +1 @@
__version__ = "0.1.1"

157
cloaksmith/auth.py Normal file
View File

@ -0,0 +1,157 @@
import os
import json
import time
import requests
from pathlib import Path
from cloaksmith.log import get_logger
log = get_logger()
class AuthSession:
def __init__(self, base_url, realm, client_id, no_cache=False):
self.base_url = base_url
self.realm = realm
self.client_id = client_id
self.no_cache = no_cache
if os.name == "nt":
cache_dir = Path(os.getenv("LOCALAPPDATA")) / "cloaksmith"
else:
cache_dir = Path.home() / ".cache" / "cloaksmith"
cache_dir.mkdir(parents=True, exist_ok=True)
self.cache_path = cache_dir / "token.json"
self.token_set = None
if not self.no_cache:
self._load_cached_token()
def _cache_token(self):
"""
Cache the token to a file.
"""
if self.no_cache:
return
try:
self.cache_path.parent.mkdir(parents=True, exist_ok=True)
with self.cache_path.open("w") as f:
json.dump({
"realm": self.realm,
"client_id": self.client_id,
"token": self.token_set
}, f)
log.info("Token cached successfully.")
except Exception as e:
log.error(f"Failed to cache token: {e}")
def _load_cached_token(self):
"""
Load the cached token from a file.
"""
if not self.cache_path.exists():
return
try:
with self.cache_path.open() as f:
data = json.load(f)
if data["realm"] != self.realm or data["client_id"] != self.client_id:
return
token = data["token"]
issued_at = token.get("timestamp", 0)
expires_in = token.get("expires_in", 0)
if time.time() < issued_at + expires_in - 10:
self.token_set = token
log.info("Loaded cached access token.")
else:
self.token_set = token
log.info(
"Cached token expired. Will attempt refresh on first request.")
except Exception as e:
log.warning(f"Failed to load cached token: {e}")
def authenticate(self):
"""
Authenticate the user by obtaining an access token.
"""
try:
if self.token_set:
return
device_url = f"{self.base_url}/realms/{self.realm}/protocol/openid-connect/auth/device"
token_url = f"{self.base_url}/realms/{self.realm}/protocol/openid-connect/token"
res = requests.post(device_url, data={"client_id": self.client_id})
res.raise_for_status()
device = res.json()
direct_link = f"{device['verification_uri']}?user_code={device['user_code']}"
log.info(f"Go to the following URL to authenticate: {direct_link}")
while True:
poll = requests.post(token_url, data={
"client_id": self.client_id,
"grant_type": "urn:ietf:params:oauth:grant-type:device_code",
"device_code": device["device_code"]
})
if poll.status_code == 200:
self.token_set = poll.json()
self.token_set["timestamp"] = int(time.time())
self._cache_token()
log.info("Authentication successful.")
return
elif poll.status_code in (400, 428):
continue
else:
log.error(f"Authentication failed: {poll.text}")
raise Exception(f"Authentication failed: {poll.text}")
except Exception as e:
log.error(f"Error during authentication: {e}")
raise
def refresh_token(self):
"""
Refresh the access token using the refresh token.
"""
try:
token_url = f"{self.base_url}/realms/{self.realm}/protocol/openid-connect/token"
res = requests.post(token_url, data={
"grant_type": "refresh_token",
"client_id": self.client_id,
"refresh_token": self.token_set["refresh_token"],
})
res.raise_for_status()
self.token_set = res.json()
self.token_set["timestamp"] = int(time.time())
self._cache_token()
log.info("Token refreshed.")
except Exception as e:
log.error(f"Failed to refresh token: {e}")
raise
def request(self, method, url, **kwargs):
"""
Make an authenticated request to the specified URL.
"""
try:
if not self.token_set:
raise Exception("Authentication required.")
headers = kwargs.pop("headers", {})
headers["Authorization"] = f"Bearer {self.token_set['access_token']}"
headers["Content-Type"] = "application/json"
kwargs["headers"] = headers
res = requests.request(method, url, **kwargs)
if res.status_code == 401:
log.warning("Access token expired. Refreshing.")
self.refresh_token()
headers["Authorization"] = f"Bearer {self.token_set['access_token']}"
kwargs["headers"] = headers
res = requests.request(method, url, **kwargs)
return res
except Exception as e:
log.error(f"Request failed: {e}")
raise

109
cloaksmith/cli.py Normal file
View File

@ -0,0 +1,109 @@
import click
import os
from pathlib import Path
import os
from dotenv import load_dotenv
from cloaksmith.log import setup_logging, get_logger
from cloaksmith.auth import AuthSession
from cloaksmith.keycloak_roles import KeycloakClientRoleManager
from cloaksmith import __version__
log = get_logger()
def load_default_env():
if os.name == "nt":
config_path = Path(os.getenv("APPDATA")) / "cloaksmith" / ".env"
else:
config_path = Path.home() / ".config" / "cloaksmith" / ".env"
if config_path.exists():
load_dotenv(dotenv_path=config_path)
else:
log.warning(
f"No .env file found at {config_path}. Please run 'cloaksmith init-env' to create one.")
exit(1)
def load_env(ctx):
env_file = ctx.obj.get("env_file")
if env_file:
load_dotenv(dotenv_path=env_file)
log.info("Loaded environment variables from provided .env file.")
else:
load_default_env()
log.info("Loaded environment variables from default .env file.")
@click.group()
@click.help_option("-h", "--help")
@click.version_option(__version__)
@click.option('--log-level', default="INFO", type=click.Choice(["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], case_sensitive=False), help="Set the log level")
@click.option('--env-file', type=click.Path(exists=True), help="Path to a .env file")
@click.pass_context
def cli(ctx, log_level, env_file):
"""
A command-line interface for performing various Keycloak administration tasks.
"""
setup_logging(log_level)
ctx.obj = {"env_file": env_file}
@cli.command()
@click.help_option("-h", "--help")
def init_env():
"""
Initialize a .env file in the appropriate config directory by prompting for values.
"""
url = click.prompt(
"KEYCLOAK_URL", prompt_suffix=" (e.g. https://your-keycloak/): ")
realm = click.prompt(
"KEYCLOAK_REALM", prompt_suffix=" (e.g. your-realm): ")
client_id = click.prompt("KEYCLOAK_CLIENT_ID",
prompt_suffix=" (e.g. your-app-client-id): ")
if os.name == "nt":
config_dir = Path(os.getenv("APPDATA")) / "cloaksmith"
else:
config_dir = Path.home() / ".config" / "cloaksmith"
config_dir.mkdir(parents=True, exist_ok=True)
env_path = config_dir / ".env"
with env_path.open("w") as f:
f.write(f"KEYCLOAK_URL={url}\n")
f.write(f"KEYCLOAK_REALM={realm}\n")
f.write(f"KEYCLOAK_CLIENT_ID={client_id}\n")
log.info(f".env file written to: {env_path}")
@cli.command()
@click.help_option("-h", "--help")
@click.argument("csv_path", type=click.Path(exists=True))
@click.option("--client-id", required=True, help="Target client ID")
@click.option("--realm", required=True, help="Target realm to modify")
@click.option("--no-cache", is_flag=True, help="Disable token caching")
@click.pass_context
def import_roles(ctx, csv_path, client_id, realm, no_cache):
"""
Import roles and map them to groups using a CSV file.
CSVPATH: Path to the CSV file containing role and group mappings.
"""
load_env(ctx)
base_url = os.getenv("KEYCLOAK_URL")
login_realm = os.getenv("KEYCLOAK_REALM")
client_id_env = os.getenv("KEYCLOAK_CLIENT_ID")
auth = AuthSession(base_url=base_url, realm=login_realm,
client_id=client_id_env, no_cache=no_cache)
auth.authenticate()
kc = KeycloakClientRoleManager(auth, target_realm=realm)
kc.import_roles_and_mappings(client_id, csv_path)
if __name__ == "__main__":
cli()

View File

@ -0,0 +1,89 @@
import csv
from cloaksmith.log import get_logger
log = get_logger()
class KeycloakClientRoleManager:
def __init__(self, auth_session, target_realm):
self.auth = auth_session
self.target_realm = target_realm
self.base_url = auth_session.base_url
def get_client_id(self, client_id):
url = f"{self.base_url}/admin/realms/{self.target_realm}/clients"
res = self.auth.request("GET", url)
clients = res.json()
client = next((c for c in clients if c["clientId"] == client_id), None)
if not client:
log.error(
f"Client '{client_id}' not found in realm '{self.target_realm}'")
raise ValueError(
f"Client '{client_id}' not found in realm '{self.target_realm}'")
log.info(f"Found client ID for '{client_id}': {client['id']}")
return client["id"]
def create_role(self, client_internal_id, role_name):
url = f"{self.base_url}/admin/realms/{self.target_realm}/clients/{client_internal_id}/roles"
res = self.auth.request("POST", url, json={"name": role_name})
if res.status_code not in (201, 409):
log.error(f"Failed to create role '{role_name}': {res.text}")
raise Exception(f"Failed to create role '{role_name}': {res.text}")
log.info(f"Role '{role_name}' created successfully or already exists.")
def get_role(self, client_internal_id, role_name):
url = f"{self.base_url}/admin/realms/{self.target_realm}/clients/{client_internal_id}/roles/{role_name}"
res = self.auth.request("GET", url)
if res.status_code == 404:
log.error(
f"Role '{role_name}' not found in client '{client_internal_id}'")
raise ValueError(f"Role '{role_name}' not found")
log.info(f"Found role '{role_name}' in client '{client_internal_id}'")
return res.json()
def get_group_id(self, group_name):
url = f"{self.base_url}/admin/realms/{self.target_realm}/groups"
res = self.auth.request("GET", url)
groups = res.json()
group = next((g for g in groups if g["name"] == group_name), None)
if not group:
log.error(
f"Group '{group_name}' not found in realm '{self.target_realm}'")
raise ValueError(
f"Group '{group_name}' not found in realm '{self.target_realm}'")
log.info(f"Found group ID for '{group_name}': {group['id']}")
return group["id"]
def map_role_to_group(self, group_id, client_internal_id, role_obj):
url = f"{self.base_url}/admin/realms/{self.target_realm}/groups/{group_id}/role-mappings/clients/{client_internal_id}"
res = self.auth.request("POST", url, json=[role_obj])
if res.status_code not in (204, 409):
log.error(f"Failed to map role to group '{group_id}': {res.text}")
raise Exception(f"Failed to map role to group: {res.text}")
log.info(f"Role mapped to group '{group_id}' successfully.")
def import_roles_and_mappings(self, client_id, csv_path):
client_internal_id = self.get_client_id(client_id)
failures = []
with open(csv_path, newline='') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
role = row["role_name"]
group = row["group_name"]
log.info(f"Processing: Role='{role}' -> Group='{group}'")
try:
self.create_role(client_internal_id, role)
group_id = self.get_group_id(group)
role_obj = self.get_role(client_internal_id, role)
self.map_role_to_group(
group_id, client_internal_id, role_obj)
except Exception as e:
msg = f"Failed to process role '{role}' for group '{group}': {e}"
log.error(msg)
failures.append(msg)
if failures:
log.warning(f"Completed with {len(failures)} error(s).")
else:
log.info("Role cloning and mapping completed successfully.")

48
cloaksmith/log.py Normal file
View File

@ -0,0 +1,48 @@
# log.py
import logging
class ColorFormatter(logging.Formatter):
COLORS = {
"DEBUG": "\033[36m", # Cyan
"INFO": "\033[32m", # Green
"WARNING": "\033[33m", # Yellow
"ERROR": "\033[31m", # Red
"CRITICAL": "\033[41m", # Red background
"GREY": "\033[90m", # Grau
"RESET": "\033[0m"
}
def format(self, record):
# Pad uncolored level name
raw_level = record.levelname
padded = f"{raw_level:<8}"
# Add color after padding
color = self.COLORS.get(raw_level, "")
reset = self.COLORS["RESET"]
record.levelname = f"{color}{padded}{reset}"
record.name = f"{self.COLORS.get('GREY')}{record.name}{reset}"
return super().format(record)
_logger = None
def setup_logging(level="INFO"):
global _logger
if _logger:
return _logger
logger = logging.getLogger("keycloak-scripts")
logger.setLevel(getattr(logging, level.upper(), logging.INFO))
if not logger.handlers:
handler = logging.StreamHandler()
handler.setFormatter(ColorFormatter("%(asctime)s [ %(levelname)s | %(name)s ] %(message)s"))
logger.addHandler(handler)
_logger = logger
return logger
def get_logger():
return _logger or setup_logging()

View File

@ -0,0 +1,5 @@
role_name,group_name
admin,admins_group
user,users_group
moderator,moderators_group
developer,developers_group

23
setup.py Normal file
View File

@ -0,0 +1,23 @@
from setuptools import setup, find_packages
# Read version from __init__.py
version = {}
with open("cloaksmith/__init__.py") as f:
exec(f.read(), version)
setup(
name='cloaksmith',
version=version["__version__"],
license='MIT',
packages=find_packages(),
install_requires=[
'Click',
'python-dotenv',
'requests',
],
entry_points={
'console_scripts': [
'cloaksmith = cloaksmith.cli:cli',
],
},
)