Working with Run:ai¶
Estimated time to read: 12 minutes
Overview¶
I've recently been building a demo/training environment which includes Run:ai. I'm making some notes here in case I forget how I set it up
Environment setup¶
I couldn't find a Terraform/Ansible module provider or role to configure the base settings when I install Run:ai so I had ChatGPT generate one for me. This is a shared environment with multiple teams so we each have a single GPU but allow for over quota (requires the preemptibility label shown in the section below)
"""
Run:ai v2.24 Environment Configuration Script
=============================================
Modes:
DISCOVER_MODE = True → Lists all clusters and their IDs, then exits.
Use this first to find your CLUSTER_ID.
DISCOVER_MODE = False → Runs the full department/project configuration
using the CLUSTER_ID you supply.
Requirements:
pip install runapy
"""
from runai.configuration import Configuration
from runai.api_client import ApiClient
from runai.api import DepartmentsApi, ProjectsApi, ClustersApi, NodePoolsApi
from runai import models
import logging
import sys
# ──────────────────────────────────────────────
# Logging Configuration
# ──────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.StreamHandler(sys.stdout)]
)
log = logging.getLogger(__name__)
# ──────────────────────────────────────────────
# Script Mode
# ──────────────────────────────────────────────
DISCOVER_MODE = False
# ──────────────────────────────────────────────
# Environment Configuration
# ──────────────────────────────────────────────
RUNAI_BASE_URL = "https://runai.apps.openshift.mydomain.com"
CLIENT_ID = ""
CLIENT_SECRET = ""
CLUSTER_ID = ""
# ──────────────────────────────────────────────
# Resource Definitions
# ──────────────────────────────────────────────
NODEPOOL_NAME = "default"
GPU_QUOTA = 1
GPU_LIMIT = None # null = unlimited
DEPARTMENTS = [
"demo"
"team-01",
"team-02",
"team-03",
"team-04",
"team-05",
"team-06",
]
# ──────────────────────────────────────────────
# Pre-flight: Fetch Existing Departments
# ──────────────────────────────────────────────
def get_existing_departments(dept_api: DepartmentsApi) -> dict[str, str]:
"""
Fetches all existing departments on the cluster and returns
a mapping of department name → department ID.
Args:
dept_api: Instantiated DepartmentsApi client
Returns:
dict of {department_name: department_id}
"""
existing = {}
try:
# Run:ai filter format requires == not =
response = dept_api.get_departments(
filter_by=[f"clusterId=={CLUSTER_ID}"]
)
departments = []
if hasattr(response, "data"):
data = response.data
if isinstance(data, list):
departments = data
elif isinstance(data, dict):
departments = data.get("departments", [])
elif isinstance(response, list):
departments = response
for dept in departments:
if isinstance(dept, dict):
name = dept.get("name")
did = dept.get("id")
else:
name = getattr(dept, "name", None)
did = getattr(dept, "id", None)
if name and did:
existing[name] = str(did)
log.info(f" Found {len(existing)} existing department(s) on cluster.")
except Exception as e:
log.warning(f" Could not fetch existing departments: {e}")
log.warning(" Will handle conflicts during creation.")
return existing
# ──────────────────────────────────────────────
# Pre-flight: Fetch Existing Projects
# ──────────────────────────────────────────────
def get_existing_projects(proj_api: ProjectsApi) -> dict[str, str]:
"""
Fetches all existing projects on the cluster and returns
a mapping of project name → project ID.
Args:
proj_api: Instantiated ProjectsApi client
Returns:
dict of {project_name: project_id}
"""
existing = {}
try:
# Run:ai filter format requires == not =
response = proj_api.get_projects(
filter_by=[f"clusterId=={CLUSTER_ID}"]
)
projects = []
if hasattr(response, "data"):
data = response.data
if isinstance(data, list):
projects = data
elif isinstance(data, dict):
projects = data.get("projects", [])
elif isinstance(response, list):
projects = response
for proj in projects:
if isinstance(proj, dict):
name = proj.get("name")
pid = proj.get("id")
else:
name = getattr(proj, "name", None)
pid = getattr(proj, "id", None)
if name and pid:
existing[name] = str(pid)
log.info(f" Found {len(existing)} existing project(s) on cluster.")
except Exception as e:
log.warning(f" Could not fetch existing projects: {e}")
log.warning(" Will handle conflicts during creation.")
return existing
# ──────────────────────────────────────────────
# Helper: Check if exception is a 409 conflict
# ──────────────────────────────────────────────
def is_conflict(e: Exception) -> bool:
"""
Checks whether an exception represents a 409 Conflict
response from the Run:ai API.
Args:
e: The caught exception
Returns:
True if the exception is a 409 Conflict, False otherwise
"""
return "409" in str(e) or "Conflict" in str(e) or "already exists" in str(e)
# ──────────────────────────────────────────────
# Helper: Extract field from API response
# ──────────────────────────────────────────────
def extract_field(response, field: str):
"""
Safely extracts a field from an API response.
The runapy client wraps responses in an object with a
.data attribute containing the actual response dict.
Args:
response: Raw API response object
field: The field name to extract
Returns:
The extracted value, or None if not found
"""
if hasattr(response, "data"):
data = response.data
if isinstance(data, dict):
return data.get(field)
if isinstance(response, dict):
return response.get(field)
return getattr(response, field, None)
# ──────────────────────────────────────────────
# Helper: Build resource model
# ──────────────────────────────────────────────
def build_resource(nodepool_id: str) -> models.ResourcesNullable:
"""
Constructs a ResourcesNullable model as required by both
DepartmentCreationRequest and ProjectCreationRequest.
Note: over_quota_weight is omitted — the over-quota
feature is disabled on this cluster and the API rejects
the field entirely when the feature is off.
Args:
nodepool_id: The node pool UUID
Returns:
models.ResourcesNullable instance
"""
return models.ResourcesNullable(
node_pool=models.ResourcesNodePool(
id=nodepool_id,
name=NODEPOOL_NAME
),
gpu=models.NonNullResource(
deserved=GPU_QUOTA,
limit=GPU_LIMIT,
# over_quota_weight intentionally omitted —
# feature is disabled on this cluster
)
)
# ──────────────────────────────────────────────
# Cluster Discovery
# ──────────────────────────────────────────────
def discover_clusters(api_client: ApiClient) -> None:
"""
Queries the Run:ai API for all clusters and prints
a formatted summary.
Args:
api_client: Authenticated ApiClient instance
"""
log.info(f"Fetching cluster list from Run:ai tenant...")
log.info(f"Tenant: {RUNAI_BASE_URL}\n")
try:
clusters_api = ClustersApi(api_client)
response = clusters_api.get_clusters(verbosity="full")
if hasattr(response, "data"):
clusters = response.data
elif isinstance(response, list):
clusters = response
else:
clusters = [response]
if not clusters:
log.warning("No clusters found for this tenant.")
return
log.info("=" * 60)
log.info(f" {'CLUSTER NAME':<25} {'CLUSTER ID':<36}")
log.info("=" * 60)
for cluster in clusters:
if isinstance(cluster, dict):
cluster_name = cluster.get("name", "N/A")
cluster_id = cluster.get("uuid") or cluster.get("id", "N/A")
cluster_status = cluster.get("status", {}).get("state", "N/A")
cluster_ver = cluster.get("version", "N/A")
else:
cluster_name = getattr(cluster, "name", "N/A")
cluster_id = getattr(cluster, "uuid", None) \
or getattr(cluster, "id", "N/A")
cluster_status = getattr(cluster, "state", "N/A")
cluster_ver = getattr(cluster, "version", "N/A")
log.info(f" {cluster_name:<25} {str(cluster_id):<36}")
log.info(f" {'Version:':<25} {cluster_ver}")
log.info(f" {'Status:':<25} {cluster_status}")
log.info("-" * 60)
log.info(
"\n ► Copy your CLUSTER_ID from above, paste it into "
"the script,\n then set DISCOVER_MODE = False to run "
"the full configuration."
)
except Exception as e:
log.error(f"Failed to retrieve clusters: {e}")
raise
# ──────────────────────────────────────────────
# Node Pool ID Lookup
# ──────────────────────────────────────────────
def get_nodepool_id(api_client: ApiClient) -> str:
"""
Looks up the node pool ID for NODEPOOL_NAME on the
configured cluster.
Args:
api_client: Authenticated ApiClient instance
Returns:
The node pool ID string
Raises:
ValueError: If the node pool cannot be found
"""
log.info(f"Looking up node pool ID for '{NODEPOOL_NAME}'...")
try:
nodepool_api = NodePoolsApi(api_client)
response = nodepool_api.get_node_pools(cluster_id=CLUSTER_ID)
if hasattr(response, "data"):
nodepools = response.data
elif isinstance(response, list):
nodepools = response
else:
nodepools = [response]
log.info(" Available node pools:")
for np in nodepools:
if isinstance(np, dict):
np_name = np.get("name", "N/A")
np_id = np.get("id", "N/A")
else:
np_name = getattr(np, "name", "N/A")
np_id = getattr(np, "id", "N/A")
log.info(f" • {np_name} (ID: {np_id})")
if np_name == NODEPOOL_NAME:
log.info(
f" ✔ Found node pool '{NODEPOOL_NAME}' "
f"with ID: {np_id}"
)
return str(np_id)
raise ValueError(
f"Node pool '{NODEPOOL_NAME}' not found on cluster "
f"'{CLUSTER_ID}'. Check NODEPOOL_NAME in the script."
)
except ValueError:
raise
except Exception as e:
log.error(f"Failed to retrieve node pools: {e}")
raise
# ──────────────────────────────────────────────
# Step 1: Create Departments (idempotent)
# ──────────────────────────────────────────────
def create_departments(
dept_api: DepartmentsApi,
nodepool_id: str
) -> dict[str, str]:
"""
Creates all departments and returns a mapping of
department name → department ID.
Idempotent — skips creation if the department already exists,
either detected via pre-flight check or caught as a 409 conflict.
On 409, fetches the existing department ID via a name search
so the script can continue.
Args:
dept_api: Instantiated DepartmentsApi client
nodepool_id: Resolved node pool UUID
Returns:
dict of {department_name: department_id}
"""
department_ids = {}
log.info(" Checking for existing departments...")
existing_departments = get_existing_departments(dept_api)
for dept_name in DEPARTMENTS:
# ── Skip if pre-flight found it ────────
if dept_name in existing_departments:
dept_id = existing_departments[dept_name]
log.info(
f" ↷ Department '{dept_name}' already exists "
f"(ID: {dept_id}) — skipping creation."
)
department_ids[dept_name] = dept_id
continue
# ── Attempt creation ───────────────────
log.info(f" Creating department: '{dept_name}'")
try:
response = dept_api.create_department(
department_creation_request=models.DepartmentCreationRequest(
name=dept_name,
cluster_id=CLUSTER_ID,
resources=[build_resource(nodepool_id)]
)
)
dept_id = extract_field(response, "id")
if not dept_id:
raise ValueError(
f"Department '{dept_name}' created but no ID returned. "
f"Response: {response}"
)
department_ids[dept_name] = dept_id
log.info(
f" ✔ Department '{dept_name}' created successfully "
f"(ID: {dept_id})"
)
except Exception as e:
# ── Handle 409 — already exists ───
if is_conflict(e):
log.warning(
f" ↷ Department '{dept_name}' already exists "
f"(409 conflict) — fetching existing ID."
)
# Re-fetch all departments to get the ID
refreshed = get_existing_departments(dept_api)
if dept_name in refreshed:
department_ids[dept_name] = refreshed[dept_name]
log.info(
f" ✔ Resolved existing department '{dept_name}' "
f"(ID: {refreshed[dept_name]})"
)
else:
log.error(
f" ✘ Could not resolve existing department "
f"'{dept_name}' after 409 conflict."
)
raise
else:
log.error(
f" ✘ Failed to create department '{dept_name}': {e}"
)
raise
return department_ids
# ──────────────────────────────────────────────
# Step 2: Create Projects (idempotent)
# ──────────────────────────────────────────────
def create_projects(
proj_api: ProjectsApi,
department_ids: dict[str, str],
nodepool_id: str
) -> None:
"""
Creates one project per department, matching the department name.
Idempotent — skips creation if the project already exists,
either detected via pre-flight check or caught as a 409 conflict.
Department assignment uses parent_id (alias: parentId) confirmed
from ProjectCreationRequest source inspection.
Namespace is auto-generated by Run:ai as runai-<project-name>.
Args:
proj_api: Instantiated ProjectsApi client
department_ids: Mapping of department name → department ID
nodepool_id: Resolved node pool UUID
"""
log.info(" Checking for existing projects...")
existing_projects = get_existing_projects(proj_api)
for dept_name, dept_id in department_ids.items():
project_name = dept_name
# ── Skip if pre-flight found it ────────
if project_name in existing_projects:
proj_id = existing_projects[project_name]
log.info(
f" ↷ Project '{project_name}' already exists "
f"(ID: {proj_id}) — skipping creation."
)
continue
# ── Attempt creation ───────────────────
log.info(
f" Creating project: '{project_name}' "
f"under department '{dept_name}' (parent_id: {dept_id})"
)
try:
response = proj_api.create_project(
project_creation_request=models.ProjectCreationRequest(
name=project_name,
cluster_id=CLUSTER_ID,
parent_id=dept_id,
default_node_pools=[NODEPOOL_NAME],
resources=[build_resource(nodepool_id)]
# requested_namespace omitted —
# Run:ai auto-generates as runai-<project-name>
)
)
proj_id = extract_field(response, "id")
# Read back auto-generated namespace from response
namespace = "pending"
if hasattr(response, "data") and isinstance(response.data, dict):
namespace = response.data.get(
"status", {}
).get("namespace", "pending")
log.info(
f" ✔ Project '{project_name}' created successfully "
f"(ID: {proj_id}, Namespace: '{namespace}')"
)
except Exception as e:
# ── Handle 409 — already exists ───
if is_conflict(e):
log.warning(
f" ↷ Project '{project_name}' already exists "
f"(409 conflict) — skipping."
)
else:
log.error(
f" ✘ Failed to create project '{project_name}' "
f"under department '{dept_name}': {e}"
)
raise
# ──────────────────────────────────────────────
# Main Entry Point
# ──────────────────────────────────────────────
def main():
# ── Validate credentials are set ──────────
if "YOUR_" in CLIENT_ID or "YOUR_" in CLIENT_SECRET or "YOUR_" in RUNAI_BASE_URL:
log.error(
"Please update RUNAI_BASE_URL, CLIENT_ID, and "
"CLIENT_SECRET before running the script."
)
sys.exit(1)
# ── Initialise API client ──────────────────
configuration = Configuration(
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
runai_base_url=RUNAI_BASE_URL,
)
api_client = ApiClient(configuration)
# ── Mode: Discover clusters ────────────────
if DISCOVER_MODE:
log.info("Running in DISCOVER MODE")
log.info("─" * 55)
discover_clusters(api_client)
log.info("\nDiscovery complete. Exiting.")
sys.exit(0)
# ── Mode: Full configuration ───────────────
if "YOUR_" in CLUSTER_ID:
log.error(
"CLUSTER_ID is not set. Run with DISCOVER_MODE = True "
"first to find your cluster ID."
)
sys.exit(1)
log.info("=" * 55)
log.info(" Run:ai Environment Configuration")
log.info(f" Target cluster : {CLUSTER_ID}")
log.info(f" Node pool : {NODEPOOL_NAME}")
log.info(f" GPU quota : {GPU_QUOTA}")
log.info(f" GPU limit : {GPU_LIMIT} (-1 = unlimited)")
log.info(f" Departments : {', '.join(DEPARTMENTS)}")
log.info("=" * 55)
dept_api = DepartmentsApi(api_client)
proj_api = ProjectsApi(api_client)
try:
# ── Resolve node pool ID ───────────────
log.info("\n[Step 0/2] Resolving node pool ID...")
nodepool_id = get_nodepool_id(api_client)
# ── Create departments ─────────────────
log.info("\n[Step 1/2] Creating departments...")
department_ids = create_departments(dept_api, nodepool_id)
# ── Create projects ────────────────────
log.info("\n[Step 2/2] Creating projects...")
create_projects(proj_api, department_ids, nodepool_id)
log.info("\n✔ Configuration completed successfully.")
log.info("Summary:")
for name, dept_id in department_ids.items():
log.info(
f" • Department '{name}' (ID: {dept_id}) "
f"→ Project '{name}' (namespace: auto-generated by Run:ai)"
)
except Exception as e:
log.error(f"\n✘ Configuration failed: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
Preemption¶
I'm also using Openshift AI in this environment and I saw the following error when deploying some pods.
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Warning Unschedulable <invalid> (x3 over 1s) runai-scheduler NonPreemptibleOverQuota: Non-preemptible workload is over quota. Workload requested 1 GPUs, but demo-fraud-detection quota is 1 GPUs, while 1 GPUs are already allocated for non-preemptible pods. Use a preemptible workload to go over quota..
I was missing the kai.scheduler/preemptibility: preemptible label when deploying the notebook which I found in this document
To schedule the fractional GPU in Run:ai I have the following annotations.
Here's a simplified example of how I have deployed some of the workloads.
apiVersion: kubeflow.org/v1
kind: Notebook
metadata:
name: demo-fraud-detection-workbench-train
namespace: runai-demo-fraud-detection
labels:
app: demo-fraud-detection-workbench-train
opendatahub.io/dashboard: "true"
opendatahub.io/odh-managed: "true"
opendatahub.io/workbenches: "true"
kai.scheduler/preemptibility: preemptible
annotations:
notebooks.opendatahub.io/inject-oauth: "true"
notebooks.opendatahub.io/last-image-selection: "minimal-gpu:2025.1"
opendatahub.io/image-display-name: "CUDA"
opendatahub.io/accelerator-name: "migrated-gpu"
gpu-fraction: "0.5"
gpu-fraction-num-devices: "2"
openshift.io/display-name: "02-demo-fraud-detection-workbench-train"
openshift.io/description: "XGBoost GPU fraud detection - RunAI 0.50 GPU"
spec:
template:
spec:
serviceAccountName: demo-fraud-detection-workbench-train
containers:
- name: demo-fraud-detection-workbench-train
image: image-registry.openshift-image-registry.svc:5000/runai-demo-fraud-detection/fraud-detection-image:latest
imagePullPolicy: Always
resources:
requests:
cpu: "2"
memory: 12Gi
limits:
cpu: "4"
memory: 24Gi