Skip to content

Working with Run:ai

Estimated time to read: 12 minutes

Overview

I've recently been building a demo/training environment which includes Run:ai. I'm making some notes here in case I forget how I set it up

Environment setup

I couldn't find a Terraform/Ansible module provider or role to configure the base settings when I install Run:ai so I had ChatGPT generate one for me. This is a shared environment with multiple teams so we each have a single GPU but allow for over quota (requires the preemptibility label shown in the section below)

"""
Run:ai v2.24 Environment Configuration Script
=============================================
Modes:
  DISCOVER_MODE = True  → Lists all clusters and their IDs, then exits.
                          Use this first to find your CLUSTER_ID.

  DISCOVER_MODE = False → Runs the full department/project configuration
                          using the CLUSTER_ID you supply.

Requirements:
  pip install runapy
"""

from runai.configuration import Configuration
from runai.api_client import ApiClient
from runai.api import DepartmentsApi, ProjectsApi, ClustersApi, NodePoolsApi
from runai import models
import logging
import sys

# ──────────────────────────────────────────────
# Logging Configuration
# ──────────────────────────────────────────────
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[logging.StreamHandler(sys.stdout)]
)
log = logging.getLogger(__name__)

# ──────────────────────────────────────────────
# Script Mode
# ──────────────────────────────────────────────
DISCOVER_MODE = False

# ──────────────────────────────────────────────
# Environment Configuration
# ──────────────────────────────────────────────
RUNAI_BASE_URL  = "https://runai.apps.openshift.mydomain.com"
CLIENT_ID       = ""
CLIENT_SECRET   = ""
CLUSTER_ID      = ""

# ──────────────────────────────────────────────
# Resource Definitions
# ──────────────────────────────────────────────
NODEPOOL_NAME   = "default"
GPU_QUOTA       = 1
GPU_LIMIT       = None                          # null = unlimited

DEPARTMENTS = [
    "demo"
    "team-01",
    "team-02",
    "team-03",
    "team-04",
    "team-05",
    "team-06",
]


# ──────────────────────────────────────────────
# Pre-flight: Fetch Existing Departments
# ──────────────────────────────────────────────
def get_existing_departments(dept_api: DepartmentsApi) -> dict[str, str]:
    """
    Fetches all existing departments on the cluster and returns
    a mapping of department name → department ID.

    Args:
        dept_api: Instantiated DepartmentsApi client

    Returns:
        dict of {department_name: department_id}
    """
    existing = {}
    try:
        # Run:ai filter format requires == not =
        response = dept_api.get_departments(
            filter_by=[f"clusterId=={CLUSTER_ID}"]
        )

        departments = []
        if hasattr(response, "data"):
            data = response.data
            if isinstance(data, list):
                departments = data
            elif isinstance(data, dict):
                departments = data.get("departments", [])
        elif isinstance(response, list):
            departments = response

        for dept in departments:
            if isinstance(dept, dict):
                name = dept.get("name")
                did  = dept.get("id")
            else:
                name = getattr(dept, "name", None)
                did  = getattr(dept, "id", None)

            if name and did:
                existing[name] = str(did)

        log.info(f"  Found {len(existing)} existing department(s) on cluster.")

    except Exception as e:
        log.warning(f"  Could not fetch existing departments: {e}")
        log.warning("  Will handle conflicts during creation.")

    return existing

# ──────────────────────────────────────────────
# Pre-flight: Fetch Existing Projects
# ──────────────────────────────────────────────
def get_existing_projects(proj_api: ProjectsApi) -> dict[str, str]:
    """
    Fetches all existing projects on the cluster and returns
    a mapping of project name → project ID.

    Args:
        proj_api: Instantiated ProjectsApi client

    Returns:
        dict of {project_name: project_id}
    """
    existing = {}
    try:
        # Run:ai filter format requires == not =
        response = proj_api.get_projects(
            filter_by=[f"clusterId=={CLUSTER_ID}"]
        )

        projects = []
        if hasattr(response, "data"):
            data = response.data
            if isinstance(data, list):
                projects = data
            elif isinstance(data, dict):
                projects = data.get("projects", [])
        elif isinstance(response, list):
            projects = response

        for proj in projects:
            if isinstance(proj, dict):
                name = proj.get("name")
                pid  = proj.get("id")
            else:
                name = getattr(proj, "name", None)
                pid  = getattr(proj, "id", None)

            if name and pid:
                existing[name] = str(pid)

        log.info(f"  Found {len(existing)} existing project(s) on cluster.")

    except Exception as e:
        log.warning(f"  Could not fetch existing projects: {e}")
        log.warning("  Will handle conflicts during creation.")

    return existing

# ──────────────────────────────────────────────
# Helper: Check if exception is a 409 conflict
# ──────────────────────────────────────────────
def is_conflict(e: Exception) -> bool:
    """
    Checks whether an exception represents a 409 Conflict
    response from the Run:ai API.

    Args:
        e: The caught exception

    Returns:
        True if the exception is a 409 Conflict, False otherwise
    """
    return "409" in str(e) or "Conflict" in str(e) or "already exists" in str(e)


# ──────────────────────────────────────────────
# Helper: Extract field from API response
# ──────────────────────────────────────────────
def extract_field(response, field: str):
    """
    Safely extracts a field from an API response.
    The runapy client wraps responses in an object with a
    .data attribute containing the actual response dict.

    Args:
        response: Raw API response object
        field:    The field name to extract

    Returns:
        The extracted value, or None if not found
    """
    if hasattr(response, "data"):
        data = response.data
        if isinstance(data, dict):
            return data.get(field)
    if isinstance(response, dict):
        return response.get(field)
    return getattr(response, field, None)


# ──────────────────────────────────────────────
# Helper: Build resource model
# ──────────────────────────────────────────────
def build_resource(nodepool_id: str) -> models.ResourcesNullable:
    """
    Constructs a ResourcesNullable model as required by both
    DepartmentCreationRequest and ProjectCreationRequest.

    Note: over_quota_weight is omitted — the over-quota
    feature is disabled on this cluster and the API rejects
    the field entirely when the feature is off.

    Args:
        nodepool_id: The node pool UUID

    Returns:
        models.ResourcesNullable instance
    """
    return models.ResourcesNullable(
        node_pool=models.ResourcesNodePool(
            id=nodepool_id,
            name=NODEPOOL_NAME
        ),
        gpu=models.NonNullResource(
            deserved=GPU_QUOTA,
            limit=GPU_LIMIT,
            # over_quota_weight intentionally omitted —
            # feature is disabled on this cluster
        )
    )


# ──────────────────────────────────────────────
# Cluster Discovery
# ──────────────────────────────────────────────
def discover_clusters(api_client: ApiClient) -> None:
    """
    Queries the Run:ai API for all clusters and prints
    a formatted summary.

    Args:
        api_client: Authenticated ApiClient instance
    """
    log.info(f"Fetching cluster list from Run:ai tenant...")
    log.info(f"Tenant: {RUNAI_BASE_URL}\n")

    try:
        clusters_api = ClustersApi(api_client)
        response = clusters_api.get_clusters(verbosity="full")

        if hasattr(response, "data"):
            clusters = response.data
        elif isinstance(response, list):
            clusters = response
        else:
            clusters = [response]

        if not clusters:
            log.warning("No clusters found for this tenant.")
            return

        log.info("=" * 60)
        log.info(f"  {'CLUSTER NAME':<25} {'CLUSTER ID':<36}")
        log.info("=" * 60)

        for cluster in clusters:
            if isinstance(cluster, dict):
                cluster_name   = cluster.get("name", "N/A")
                cluster_id     = cluster.get("uuid") or cluster.get("id", "N/A")
                cluster_status = cluster.get("status", {}).get("state", "N/A")
                cluster_ver    = cluster.get("version", "N/A")
            else:
                cluster_name   = getattr(cluster, "name", "N/A")
                cluster_id     = getattr(cluster, "uuid", None) \
                              or getattr(cluster, "id", "N/A")
                cluster_status = getattr(cluster, "state", "N/A")
                cluster_ver    = getattr(cluster, "version", "N/A")

            log.info(f"  {cluster_name:<25} {str(cluster_id):<36}")
            log.info(f"  {'Version:':<25} {cluster_ver}")
            log.info(f"  {'Status:':<25} {cluster_status}")
            log.info("-" * 60)

        log.info(
            "\n  ► Copy your CLUSTER_ID from above, paste it into "
            "the script,\n    then set DISCOVER_MODE = False to run "
            "the full configuration."
        )

    except Exception as e:
        log.error(f"Failed to retrieve clusters: {e}")
        raise


# ──────────────────────────────────────────────
# Node Pool ID Lookup
# ──────────────────────────────────────────────
def get_nodepool_id(api_client: ApiClient) -> str:
    """
    Looks up the node pool ID for NODEPOOL_NAME on the
    configured cluster.

    Args:
        api_client: Authenticated ApiClient instance

    Returns:
        The node pool ID string

    Raises:
        ValueError: If the node pool cannot be found
    """
    log.info(f"Looking up node pool ID for '{NODEPOOL_NAME}'...")

    try:
        nodepool_api = NodePoolsApi(api_client)
        response = nodepool_api.get_node_pools(cluster_id=CLUSTER_ID)

        if hasattr(response, "data"):
            nodepools = response.data
        elif isinstance(response, list):
            nodepools = response
        else:
            nodepools = [response]

        log.info("  Available node pools:")
        for np in nodepools:
            if isinstance(np, dict):
                np_name = np.get("name", "N/A")
                np_id   = np.get("id", "N/A")
            else:
                np_name = getattr(np, "name", "N/A")
                np_id   = getattr(np, "id", "N/A")

            log.info(f"    • {np_name} (ID: {np_id})")

            if np_name == NODEPOOL_NAME:
                log.info(
                    f"  ✔ Found node pool '{NODEPOOL_NAME}' "
                    f"with ID: {np_id}"
                )
                return str(np_id)

        raise ValueError(
            f"Node pool '{NODEPOOL_NAME}' not found on cluster "
            f"'{CLUSTER_ID}'. Check NODEPOOL_NAME in the script."
        )

    except ValueError:
        raise
    except Exception as e:
        log.error(f"Failed to retrieve node pools: {e}")
        raise


# ──────────────────────────────────────────────
# Step 1: Create Departments (idempotent)
# ──────────────────────────────────────────────
def create_departments(
    dept_api: DepartmentsApi,
    nodepool_id: str
) -> dict[str, str]:
    """
    Creates all departments and returns a mapping of
    department name → department ID.

    Idempotent — skips creation if the department already exists,
    either detected via pre-flight check or caught as a 409 conflict.
    On 409, fetches the existing department ID via a name search
    so the script can continue.

    Args:
        dept_api:    Instantiated DepartmentsApi client
        nodepool_id: Resolved node pool UUID

    Returns:
        dict of {department_name: department_id}
    """
    department_ids = {}

    log.info("  Checking for existing departments...")
    existing_departments = get_existing_departments(dept_api)

    for dept_name in DEPARTMENTS:

        # ── Skip if pre-flight found it ────────
        if dept_name in existing_departments:
            dept_id = existing_departments[dept_name]
            log.info(
                f"  ↷ Department '{dept_name}' already exists "
                f"(ID: {dept_id}) — skipping creation."
            )
            department_ids[dept_name] = dept_id
            continue

        # ── Attempt creation ───────────────────
        log.info(f"  Creating department: '{dept_name}'")
        try:
            response = dept_api.create_department(
                department_creation_request=models.DepartmentCreationRequest(
                    name=dept_name,
                    cluster_id=CLUSTER_ID,
                    resources=[build_resource(nodepool_id)]
                )
            )

            dept_id = extract_field(response, "id")

            if not dept_id:
                raise ValueError(
                    f"Department '{dept_name}' created but no ID returned. "
                    f"Response: {response}"
                )

            department_ids[dept_name] = dept_id
            log.info(
                f"  ✔ Department '{dept_name}' created successfully "
                f"(ID: {dept_id})"
            )

        except Exception as e:
            # ── Handle 409 — already exists ───
            if is_conflict(e):
                log.warning(
                    f"  ↷ Department '{dept_name}' already exists "
                    f"(409 conflict) — fetching existing ID."
                )
                # Re-fetch all departments to get the ID
                refreshed = get_existing_departments(dept_api)
                if dept_name in refreshed:
                    department_ids[dept_name] = refreshed[dept_name]
                    log.info(
                        f"  ✔ Resolved existing department '{dept_name}' "
                        f"(ID: {refreshed[dept_name]})"
                    )
                else:
                    log.error(
                        f"  ✘ Could not resolve existing department "
                        f"'{dept_name}' after 409 conflict."
                    )
                    raise
            else:
                log.error(
                    f"  ✘ Failed to create department '{dept_name}': {e}"
                )
                raise

    return department_ids


# ──────────────────────────────────────────────
# Step 2: Create Projects (idempotent)
# ──────────────────────────────────────────────
def create_projects(
    proj_api: ProjectsApi,
    department_ids: dict[str, str],
    nodepool_id: str
) -> None:
    """
    Creates one project per department, matching the department name.

    Idempotent — skips creation if the project already exists,
    either detected via pre-flight check or caught as a 409 conflict.

    Department assignment uses parent_id (alias: parentId) confirmed
    from ProjectCreationRequest source inspection.
    Namespace is auto-generated by Run:ai as runai-<project-name>.

    Args:
        proj_api:       Instantiated ProjectsApi client
        department_ids: Mapping of department name → department ID
        nodepool_id:    Resolved node pool UUID
    """
    log.info("  Checking for existing projects...")
    existing_projects = get_existing_projects(proj_api)

    for dept_name, dept_id in department_ids.items():
        project_name = dept_name

        # ── Skip if pre-flight found it ────────
        if project_name in existing_projects:
            proj_id = existing_projects[project_name]
            log.info(
                f"  ↷ Project '{project_name}' already exists "
                f"(ID: {proj_id}) — skipping creation."
            )
            continue

        # ── Attempt creation ───────────────────
        log.info(
            f"  Creating project: '{project_name}' "
            f"under department '{dept_name}' (parent_id: {dept_id})"
        )
        try:
            response = proj_api.create_project(
                project_creation_request=models.ProjectCreationRequest(
                    name=project_name,
                    cluster_id=CLUSTER_ID,
                    parent_id=dept_id,
                    default_node_pools=[NODEPOOL_NAME],
                    resources=[build_resource(nodepool_id)]
                    # requested_namespace omitted —
                    # Run:ai auto-generates as runai-<project-name>
                )
            )

            proj_id = extract_field(response, "id")

            # Read back auto-generated namespace from response
            namespace = "pending"
            if hasattr(response, "data") and isinstance(response.data, dict):
                namespace = response.data.get(
                    "status", {}
                ).get("namespace", "pending")

            log.info(
                f"  ✔ Project '{project_name}' created successfully "
                f"(ID: {proj_id}, Namespace: '{namespace}')"
            )

        except Exception as e:
            # ── Handle 409 — already exists ───
            if is_conflict(e):
                log.warning(
                    f"  ↷ Project '{project_name}' already exists "
                    f"(409 conflict) — skipping."
                )
            else:
                log.error(
                    f"  ✘ Failed to create project '{project_name}' "
                    f"under department '{dept_name}': {e}"
                )
                raise

# ──────────────────────────────────────────────
# Main Entry Point
# ──────────────────────────────────────────────
def main():

    # ── Validate credentials are set ──────────
    if "YOUR_" in CLIENT_ID or "YOUR_" in CLIENT_SECRET or "YOUR_" in RUNAI_BASE_URL:
        log.error(
            "Please update RUNAI_BASE_URL, CLIENT_ID, and "
            "CLIENT_SECRET before running the script."
        )
        sys.exit(1)

    # ── Initialise API client ──────────────────
    configuration = Configuration(
        client_id=CLIENT_ID,
        client_secret=CLIENT_SECRET,
        runai_base_url=RUNAI_BASE_URL,
    )
    api_client = ApiClient(configuration)

    # ── Mode: Discover clusters ────────────────
    if DISCOVER_MODE:
        log.info("Running in DISCOVER MODE")
        log.info("─" * 55)
        discover_clusters(api_client)
        log.info("\nDiscovery complete. Exiting.")
        sys.exit(0)

    # ── Mode: Full configuration ───────────────
    if "YOUR_" in CLUSTER_ID:
        log.error(
            "CLUSTER_ID is not set. Run with DISCOVER_MODE = True "
            "first to find your cluster ID."
        )
        sys.exit(1)

    log.info("=" * 55)
    log.info("  Run:ai Environment Configuration")
    log.info(f"  Target cluster : {CLUSTER_ID}")
    log.info(f"  Node pool      : {NODEPOOL_NAME}")
    log.info(f"  GPU quota      : {GPU_QUOTA}")
    log.info(f"  GPU limit      : {GPU_LIMIT} (-1 = unlimited)")
    log.info(f"  Departments    : {', '.join(DEPARTMENTS)}")
    log.info("=" * 55)

    dept_api = DepartmentsApi(api_client)
    proj_api = ProjectsApi(api_client)

    try:
        # ── Resolve node pool ID ───────────────
        log.info("\n[Step 0/2] Resolving node pool ID...")
        nodepool_id = get_nodepool_id(api_client)

        # ── Create departments ─────────────────
        log.info("\n[Step 1/2] Creating departments...")
        department_ids = create_departments(dept_api, nodepool_id)

        # ── Create projects ────────────────────
        log.info("\n[Step 2/2] Creating projects...")
        create_projects(proj_api, department_ids, nodepool_id)

        log.info("\n✔ Configuration completed successfully.")
        log.info("Summary:")
        for name, dept_id in department_ids.items():
            log.info(
                f"  • Department '{name}' (ID: {dept_id}) "
                f"→ Project '{name}' (namespace: auto-generated by Run:ai)"
            )

    except Exception as e:
        log.error(f"\n✘ Configuration failed: {e}")
        sys.exit(1)


if __name__ == "__main__":
    main()

Preemption

I'm also using Openshift AI in this environment and I saw the following error when deploying some pods.

Events:
  Type     Reason             Age                     From             Message
  ----     ------             ----                    ----             -------
  Warning  Unschedulable      <invalid> (x3 over 1s)  runai-scheduler  NonPreemptibleOverQuota: Non-preemptible workload is over quota. Workload requested 1 GPUs, but demo-fraud-detection quota is 1 GPUs, while 1 GPUs are already allocated for non-preemptible pods. Use a preemptible workload to go over quota..

I was missing the kai.scheduler/preemptibility: preemptible label when deploying the notebook which I found in this document

https://run-ai-docs.nvidia.com/self-hosted/platform-management/runai-scheduler/scheduling/workload-priority-control

To schedule the fractional GPU in Run:ai I have the following annotations.

annotations:
  gpu-fraction: "0.5"
  gpu-fraction-num-devices: "2"

Here's a simplified example of how I have deployed some of the workloads.

apiVersion: kubeflow.org/v1
kind: Notebook
metadata:
  name: demo-fraud-detection-workbench-train
  namespace: runai-demo-fraud-detection
  labels:
    app: demo-fraud-detection-workbench-train
    opendatahub.io/dashboard: "true"
    opendatahub.io/odh-managed: "true"
    opendatahub.io/workbenches: "true"
    kai.scheduler/preemptibility: preemptible
  annotations:
    notebooks.opendatahub.io/inject-oauth: "true"
    notebooks.opendatahub.io/last-image-selection: "minimal-gpu:2025.1"
    opendatahub.io/image-display-name: "CUDA"
    opendatahub.io/accelerator-name: "migrated-gpu"
    gpu-fraction: "0.5"
    gpu-fraction-num-devices: "2"
    openshift.io/display-name: "02-demo-fraud-detection-workbench-train"
    openshift.io/description: "XGBoost GPU fraud detection - RunAI 0.50 GPU"
spec:
  template:
    spec:
      serviceAccountName: demo-fraud-detection-workbench-train
      containers:
        - name: demo-fraud-detection-workbench-train
          image: image-registry.openshift-image-registry.svc:5000/runai-demo-fraud-detection/fraud-detection-image:latest
          imagePullPolicy: Always
          resources:
            requests:
              cpu: "2"
              memory: 12Gi
            limits:
              cpu: "4"
              memory: 24Gi