From 897360cd50cfc3ec3048de5c2b68e9622d1d5932 Mon Sep 17 00:00:00 2001 From: Oleksander Piskun Date: Fri, 5 Dec 2025 13:26:52 +0200 Subject: [PATCH] initial K8s support Signed-off-by: Oleksander Piskun --- README.md | 2 +- development/docs/appapi-emulation-k8s.md | 339 +++++++ development/redeploy_host_k8s.sh | 28 + haproxy_agent.py | 1112 +++++++++++++++++++++- 4 files changed, 1478 insertions(+), 3 deletions(-) create mode 100644 development/docs/appapi-emulation-k8s.md create mode 100755 development/redeploy_host_k8s.sh diff --git a/README.md b/README.md index d9e65f9..fb9e8b4 100644 --- a/README.md +++ b/README.md @@ -405,7 +405,7 @@ Use the Docker Engine `/_ping` endpoint via HaRP’s ExApps HTTP frontend to con curl -fsS \ -H "harp-shared-key: " \ -H "docker-engine-port: 24000" \ - http://127.0.0.1:8780/exapps/app_api/v1.41/_ping + http://127.0.0.1:8780/exapps/app_api/v1.44/_ping ``` * `24000` is the **default** FRP remote port used by the HaRP container for the **built‑in/local** Docker Engine (enabled when `/var/run/docker.sock` is mounted). diff --git a/development/docs/appapi-emulation-k8s.md b/development/docs/appapi-emulation-k8s.md new file mode 100644 index 0000000..a48c8fe --- /dev/null +++ b/development/docs/appapi-emulation-k8s.md @@ -0,0 +1,339 @@ +# AppAPI Emulation Guide (HaRP + Kubernetes backend) + +This guide documents the `curl` commands used to emulate AppAPI when testing HaRP’s Kubernetes backend. + +## Prerequisites + +* HaRP is reachable at: `http://nextcloud.local/exapps` +* HaRP was started with the same shared key as used below (`HP_SHARED_KEY`) +* HaRP has Kubernetes backend enabled (`HP_K8S_ENABLED=true`) and can access the K8s API +* `kubectl` is configured to point to the same cluster HaRP uses +* Optional: `jq` for parsing JSON responses + +## Environment variables + +```bash +export EXAPPS_URL="http://nextcloud.local/exapps" +export APPAPI_URL="${EXAPPS_URL}/app_api" +export HP_SHARED_KEY="some_very_secure_password" + +# Optional: Nextcloud base (only used by ExApp container env in this guide) +export NEXTCLOUD_URL="http://nextcloud.local" +``` + +> Notes: +> +> * All AppAPI-emulation calls go to `$APPAPI_URL/...` and require the header `harp-shared-key`. +> * You can also hit the agent directly on `http://127.0.0.1:8200/...` for debugging, but that bypasses the HAProxy/AppAPI path and may skip shared-key enforcement depending on your routing. + +--- + +## 1) Check if ExApp is present (K8s Deployment exists) + +```bash +curl -sS \ + -H "harp-shared-key: $HP_SHARED_KEY" \ + -H "Content-Type: application/json" \ + -X POST \ + -d '{ + "name": "test-deploy", + "instance_id": "" + }' \ + "$APPAPI_URL/k8s/exapp/exists" +``` + +Expected output: + +```json +{"exists": true} +``` + +or + +```json +{"exists": false} +``` + +--- + +## 2) Create ExApp (PVC + Deployment with replicas=0) + +```bash +curl -sS \ + -H "harp-shared-key: $HP_SHARED_KEY" \ + -H "Content-Type: application/json" \ + -X POST \ + -d '{ + "name": "test-deploy", + "instance_id": "", + "image": "ghcr.io/nextcloud/test-deploy:latest", + "environment_variables": [ + "APP_ID=test-deploy", + "APP_DISPLAY_NAME=Test Deploy", + "APP_VERSION=1.2.1", + "APP_HOST=0.0.0.0", + "APP_PORT=23000", + "NEXTCLOUD_URL='"$NEXTCLOUD_URL"'", + "APP_SECRET=some-dev-secret", + "APP_PERSISTENT_STORAGE=/nc_app_test-deploy_data" + ], + "resource_limits": { "cpu": "500m", "memory": "512Mi" } + }' \ + "$APPAPI_URL/k8s/exapp/create" +``` + +Expected output (example): + +```json +{"name":"nc-app-test-deploy"} +``` + +--- + +## 3) Start ExApp (scale replicas to 1) + +```bash +curl -sS \ + -H "harp-shared-key: $HP_SHARED_KEY" \ + -H "Content-Type: application/json" \ + -X POST \ + -d '{ + "name": "test-deploy", + "instance_id": "" + }' \ + "$APPAPI_URL/k8s/exapp/start" +``` + +Expected: HTTP 204. + +--- + +## 4) Wait for ExApp to become Ready + +```bash +curl -sS \ + -H "harp-shared-key: $HP_SHARED_KEY" \ + -H "Content-Type: application/json" \ + -X POST \ + -d '{ + "name": "test-deploy", + "instance_id": "" + }' \ + "$APPAPI_URL/k8s/exapp/wait_for_start" +``` + +Expected output (example): + +```json +{ + "started": true, + "status": "running", + "health": "ready", + "reason": null, + "message": null +} +``` + +--- + +## 5) Expose + register in HaRP + +### 5.1 NodePort (default behavior) + +**Minimal (uses defaults, may auto-pick a node address):** + +```bash +EXPOSE_JSON=$( + curl -sS \ + -H "harp-shared-key: $HP_SHARED_KEY" \ + -H "Content-Type: application/json" \ + -X POST \ + -d '{ + "name": "test-deploy", + "instance_id": "", + "port": 23000, + "expose_type": "nodeport" + }' \ + "$APPAPI_URL/k8s/exapp/expose" +) + +echo "$EXPOSE_JSON" +``` + +**Recommended (provide a stable host reachable by HaRP):** + +```bash +# Example: edge node IP / VIP / L4 LB that forwards NodePort range +UPSTREAM_HOST="172.18.0.2" + +EXPOSE_JSON=$( + curl -sS \ + -H "harp-shared-key: $HP_SHARED_KEY" \ + -H "Content-Type: application/json" \ + -X POST \ + -d '{ + "name": "test-deploy", + "instance_id": "", + "port": 23000, + "expose_type": "nodeport", + "upstream_host": "'"$UPSTREAM_HOST"'" + }' \ + "$APPAPI_URL/k8s/exapp/expose" +) + +echo "$EXPOSE_JSON" +``` + +### 5.2 ClusterIP (only if HaRP can reach ClusterIP + resolve service DNS) + +```bash +EXPOSE_JSON=$( + curl -sS \ + -H "harp-shared-key: $HP_SHARED_KEY" \ + -H "Content-Type: application/json" \ + -X POST \ + -d '{ + "name": "test-deploy", + "instance_id": "", + "port": 23000, + "expose_type": "clusterip" + }' \ + "$APPAPI_URL/k8s/exapp/expose" +) + +echo "$EXPOSE_JSON" +``` + +### 5.3 Manual (HaRP does not create or inspect any Service) + +```bash +EXPOSE_JSON=$( + curl -sS \ + -H "harp-shared-key: $HP_SHARED_KEY" \ + -H "Content-Type: application/json" \ + -X POST \ + -d '{ + "name": "test-deploy", + "instance_id": "", + "port": 23000, + "expose_type": "manual", + "upstream_host": "exapp-test-deploy.internal", + "upstream_port": 23000 + }' \ + "$APPAPI_URL/k8s/exapp/expose" +) + +echo "$EXPOSE_JSON" +``` + +--- + +## 6) Extract exposed host/port for follow-up tests (requires `jq`) + +```bash +EXAPP_HOST=$(echo "$EXPOSE_JSON" | jq -r '.host') +EXAPP_PORT=$(echo "$EXPOSE_JSON" | jq -r '.port') + +echo "ExApp upstream endpoint: ${EXAPP_HOST}:${EXAPP_PORT}" +``` + +--- + +## 7) Check `/heartbeat` via HaRP routing (AppAPI-style direct routing headers) + +This checks HaRP’s ability to route to the ExApp given an explicit upstream host/port and AppAPI-style authorization header. + +### 7.1 Build `authorization-app-api` value + +HaRP typically expects this value to be the **base64 of `user_id:APP_SECRET`** (similar to HTTP Basic without the `Basic ` prefix). For an “anonymous” style request, use `:APP_SECRET`. + +```bash +# Option A: anonymous-style +AUTH_APP_API=$(printf '%s' ':some-dev-secret' | base64 | tr -d '\n') + +# Option B: user-scoped style (example user "admin") +# AUTH_APP_API=$(printf '%s' 'admin:some-dev-secret' | base64 | tr -d '\n') +``` + +### 7.2 Call heartbeat + +```bash +curl -sS \ + "http://nextcloud.local/exapps/test-deploy/heartbeat" \ + -H "harp-shared-key: $HP_SHARED_KEY" \ + -H "ex-app-version: 1.2.1" \ + -H "ex-app-id: test-deploy" \ + -H "ex-app-host: $EXAPP_HOST" \ + -H "ex-app-port: $EXAPP_PORT" \ + -H "authorization-app-api: $AUTH_APP_API" +``` + +If this fails with auth-related errors, verify: + +* `APP_SECRET` in the ExApp matches what you used here, +* your HaProxy config expectations for `authorization-app-api` (raw vs base64). + +--- + +## 8) Stop and remove (API-based cleanup) + +### Stop ExApp (scale replicas to 0) + +```bash +curl -sS \ + -H "harp-shared-key: $HP_SHARED_KEY" \ + -H "Content-Type: application/json" \ + -X POST \ + -d '{ + "name": "test-deploy", + "instance_id": "" + }' \ + "$APPAPI_URL/k8s/exapp/stop" +``` + +### Remove ExApp (Deployment + optional PVC; Service may be removed depending on HaRP version) + +```bash +curl -sS \ + -H "harp-shared-key: $HP_SHARED_KEY" \ + -H "Content-Type: application/json" \ + -X POST \ + -d '{ + "name": "test-deploy", + "instance_id": "", + "remove_data": true + }' \ + "$APPAPI_URL/k8s/exapp/remove" +``` + +--- + +## Useful `kubectl` commands (debug / manual cleanup) + +### Check resources + +```bash +kubectl get deploy,svc,pvc -n nextcloud-exapps -o wide | grep -E 'test-deploy|NAME' || true +kubectl get pods -n nextcloud-exapps -o wide +``` + +### Delete Service (if it was exposed and needs manual cleanup) + +```bash +kubectl delete svc nc-app-test-deploy -n nextcloud-exapps +``` + +### Delete Deployment + +```bash +kubectl delete deployment nc-app-test-deploy -n nextcloud-exapps +``` + +### Delete PVC (data) + +PVC name is derived from `nc_app_test-deploy_data` and sanitized for K8s, typically: +`nc-app-test-deploy-data` + +```bash +kubectl delete pvc nc-app-test-deploy-data -n nextcloud-exapps +``` diff --git a/development/redeploy_host_k8s.sh b/development/redeploy_host_k8s.sh new file mode 100755 index 0000000..67ef447 --- /dev/null +++ b/development/redeploy_host_k8s.sh @@ -0,0 +1,28 @@ +#!/bin/sh +# SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors +# SPDX-License-Identifier: AGPL-3.0-or-later + +# This file can be used for development for the "manual install" deployment type when FRP is disabled. +# For Julius Docker-Dev, you need to additionally edit the `data/nginx/vhost.d/nextcloud.local_location` file, +# changing `appapi-harp` to `172.17.0.1` and restart the "proxy" container. + +docker container remove --force appapi-harp + +docker build -t nextcloud-appapi-harp:local . + +docker run \ + -e HP_SHARED_KEY="some_very_secure_password" \ + -e NC_INSTANCE_URL="http://nextcloud.local" \ + -e HP_LOG_LEVEL="debug" \ + -e HP_VERBOSE_START="1" \ + -e HP_K8S_ENABLED="true" \ + -e HP_K8S_API_SERVER="https://127.0.0.1:37151" \ + -e HP_K8S_BEARER_TOKEN="eyJhbGciOiJSUzI1NiIsImtpZCI6InJwSHRRN04wV0RwcHFiVEtJLVdHblpGTllKMGJwc3NZZ2tZYjRrREdhcEkifQ.eyJhdWQiOlsiaHR0cHM6Ly9rdWJlcm5ldGVzLmRlZmF1bHQuc3ZjLmNsdXN0ZXIubG9jYWwiXSwiZXhwIjoxNzczNTgwODI1LCJpYXQiOjE3NjQ5NDA4MjUsImlzcyI6Imh0dHBzOi8va3ViZXJuZXRlcy5kZWZhdWx0LnN2Yy5jbHVzdGVyLmxvY2FsIiwianRpIjoiOTBmOTE0MjUtMDYxMy00YzM4LTllNmQtN2U2Y2I1Njk4MDhhIiwia3ViZXJuZXRlcy5pbyI6eyJuYW1lc3BhY2UiOiJuZXh0Y2xvdWQtZXhhcHBzIiwic2VydmljZWFjY291bnQiOnsibmFtZSI6ImhhcnAtZXhhcHBzIiwidWlkIjoiMzI2ZTA5NzEtMGIyOC00NzBkLTlmZTUtMDRjMTc0YjE2ODQ2In19LCJuYmYiOjE3NjQ5NDA4MjUsInN1YiI6InN5c3RlbTpzZXJ2aWNlYWNjb3VudDpuZXh0Y2xvdWQtZXhhcHBzOmhhcnAtZXhhcHBzIn0.TSDUSEe0NuFMhycrK1XHNgBV3-L70qqLfCR2-x0XSSXGSsms1ZzxKbSnsDDCstAGg6-ZtlJroWFZZiFeZ2E2j53z2-Tt4lXM-ZdH7qqhjsxSh5Ya7l3ncMSS0Tw1YPaEsOJmpXCiDH9KE4g-KyLeSJU5Rqonc5fuWJwDd68wpY8SB2qkgbtr250Srk4nYw28MyxhgXwHvOSIrDhqmGR-NPPmSeoa9u9etAD9qjfCPauF0BYDBcVHGKR2kJL5oGw9-tRs6FqBAt5U-y4Jx6y0Q1RPptdbpHGY9KmSGHIqsLrkJl7lgjlZh4mb2wofwypJvBd2hW_dgS1RTrzcTjoYsQ" \ + -e HP_K8S_NAMESPACE="nextcloud-exapps" \ + -e HP_K8S_VERIFY_SSL="false" \ + -v /var/run/docker.sock:/var/run/docker.sock \ + -v `pwd`/certs:/certs \ + --name appapi-harp -h appapi-harp \ + --restart unless-stopped \ + --network=host \ + -d nextcloud-appapi-harp:local diff --git a/haproxy_agent.py b/haproxy_agent.py index 8a32779..0cd16fc 100644 --- a/haproxy_agent.py +++ b/haproxy_agent.py @@ -12,6 +12,7 @@ import os import re import socket +import ssl import tarfile import time from base64 import b64encode @@ -28,8 +29,25 @@ APPID_PATTERN = re.compile(r"(?:^|/)exapps/([^/]+)") SHARED_KEY = os.environ.get("HP_SHARED_KEY") NC_INSTANCE_URL = os.environ.get("NC_INSTANCE_URL") +# Kubernetes environment variables +K8S_ENABLED = os.environ.get("HP_K8S_ENABLED", "false").lower() in {"1", "true", "yes"} +K8S_NAMESPACE = os.environ.get("HP_K8S_NAMESPACE", "nextcloud-exapps") +K8S_API_SERVER = os.environ.get("HP_K8S_API_SERVER") # e.g. https://kubernetes.default.svc +K8S_CA_FILE = os.environ.get("HP_K8S_CA_FILE", "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt") +K8S_TOKEN = os.environ.get("HP_K8S_BEARER_TOKEN") +K8S_TOKEN_FILE = os.environ.get("HP_K8S_BEARER_TOKEN_FILE", "/var/run/secrets/kubernetes.io/serviceaccount/token") +K8S_VERIFY_SSL = os.environ.get("HP_K8S_VERIFY_SSL", "true").lower() != "false" +K8S_STORAGE_CLASS = os.environ.get("HP_K8S_STORAGE_CLASS", "") +K8S_DEFAULT_STORAGE_SIZE = os.environ.get("HP_K8S_DEFAULT_STORAGE_SIZE", "10Gi") +if not K8S_API_SERVER and os.environ.get("KUBERNETES_SERVICE_HOST"): + host = os.environ["KUBERNETES_SERVICE_HOST"] + port = os.environ.get("KUBERNETES_SERVICE_PORT", "443") + K8S_API_SERVER = f"https://{host}:{port}" + +K8S_HTTP_TIMEOUT = aiohttp.ClientTimeout(total=60.0) +K8S_NAME_MAX_LENGTH = 63 # Set up the logging configuration -LOG_LEVEL = os.environ["HP_LOG_LEVEL"].upper() +LOG_LEVEL = os.environ.get("HP_LOG_LEVEL", "INFO").upper() logging.basicConfig(level=LOG_LEVEL) LOGGER = logging.getLogger(__name__) LOGGER.setLevel(level=LOG_LEVEL) @@ -56,7 +74,8 @@ LOGGER.error( "Invalid value for HP_TRUSTED_PROXY_IPS: %s. Client IP detection from headers is disabled. " "The X-Forwarded-For and X-Real-IP headers will not be respected. " - "This can lead to the outer proxy's IP being blocked during a bruteforce attempt instead of the actual client's IP.", + "This can lead to the outer proxy's IP being blocked " + "during a bruteforce attempt instead of the actual client's IP.", e, ) TRUSTED_PROXIES = [] @@ -102,6 +121,18 @@ class NcUser(BaseModel): access_level: AccessLevel = Field(..., description="ADMIN(2), USER(1), or PUBLIC(0)") +def _sanitize_k8s_name(raw: str) -> str: + """Convert an arbitrary string into a DNS-1123 compatible name for Kubernetes.""" + name = raw.lower().replace("_", "-") + name = re.sub(r"[^a-z0-9-]", "-", name) + name = re.sub(r"-+", "-", name).strip("-") + if not name: + name = "exapp" + if len(name) > K8S_NAME_MAX_LENGTH: + name = name[:K8S_NAME_MAX_LENGTH].rstrip("-") + return name + + class ExAppName(BaseModel): name: str = Field(..., description="ExApp name.") instance_id: str = Field("", description="Nextcloud instance ID.") @@ -116,6 +147,21 @@ def exapp_container_name(self) -> str: def exapp_container_volume(self) -> str: return f"{self.exapp_container_name}_data" + @computed_field + @property + def exapp_k8s_name(self) -> str: + """Name used for Deployment / Pods.""" + return _sanitize_k8s_name(self.exapp_container_name) + + @computed_field + @property + def exapp_k8s_volume_name(self) -> str: + """PVC name for ExApp's data volume.""" + base = _sanitize_k8s_name(self.exapp_container_volume) + if len(base) > K8S_NAME_MAX_LENGTH: + base = base[:K8S_NAME_MAX_LENGTH].rstrip("-") + return base + class CreateExAppMounts(BaseModel): source: str = Field(...) @@ -134,6 +180,24 @@ class CreateExAppPayload(ExAppName): mount_points: list[CreateExAppMounts] = Field([], description="List of mount points for the container.") resource_limits: dict[str, Any] = Field({}, description="Resource limits for the container.") + @model_validator(mode="before") + @classmethod + def accept_k8s_friendly_payload(cls, data: Any) -> Any: + """Allow K8s-style payloads like: + + { + "image": "ghcr.io/nextcloud/test-deploy:release", + "resource_limits": {"cpu": "500m", "memory": "512Mi"} + } + by mapping 'image' -> 'image_id' and defaulting network_mode. + """ + if isinstance(data, dict): + if "image_id" not in data and "image" in data: + data = {**data, "image_id": data["image"]} # Allow 'image' instead of 'image_id' + if "network_mode" not in data: + data = {**data, "network_mode": "bridge"} # Default network_mode (used only for Docker) + return data + class RemoveExAppPayload(ExAppName): remove_data: bool = Field(False, description="Flag indicating whether the Docker ExApp volume should be deleted.") @@ -144,6 +208,90 @@ class InstallCertificatesPayload(ExAppName): install_frp_certs: bool = Field(True, description="Flag to control installation of FRP certificates.") +class ExposeExAppPayload(ExAppName): + port: int = Field(..., ge=1, le=65535, description="Port on which the ExApp listens inside the Pod/container.") + expose_type: Literal["nodeport", "clusterip", "loadbalancer", "manual"] = Field( + "nodeport", + description="How HaRP should make the ExApp reachable (and which endpoint it registers).", + ) + upstream_host: str | None = Field( + None, + description=( + "Override the host that HaRP should use to reach the ExApp. " + "For expose_type=manual this is required. " + "For nodeport it is strongly recommended (stable VIP/LB/edge-node)." + ), + ) + upstream_port: int | None = Field( + None, + ge=1, + le=65535, + description=( + "Override the port that HaRP should use to reach the ExApp. " + "Only used for expose_type=manual (otherwise computed from Service)." + ), + ) + service_port: int | None = Field( + None, + ge=1, + le=65535, + description="Service 'port' value (defaults to payload.port). targetPort always equals payload.port.", + ) + node_port: int | None = Field( + None, + ge=30000, + le=32767, + description="Requested nodePort when expose_type=nodeport (optional).", + ) + external_traffic_policy: Literal["Cluster", "Local"] | None = Field( + None, + description="Service spec.externalTrafficPolicy (NodePort/LoadBalancer only).", + ) + load_balancer_ip: str | None = Field( + None, + description="Optional spec.loadBalancerIP when expose_type=loadbalancer (provider-specific).", + ) + service_annotations: dict[str, str] = Field( + default_factory=dict, + description="Annotations applied to the generated Service.", + ) + service_labels: dict[str, str] = Field( + default_factory=dict, + description="Extra labels applied to the generated Service.", + ) + wait_timeout_seconds: float = Field( + 60.0, + ge=0, + le=600, + description="How long to wait for a LoadBalancer ingress hostname/IP.", + ) + wait_interval_seconds: float = Field( + 1.0, + ge=0.1, + le=10.0, + description="Polling interval when waiting for a LoadBalancer address.", + ) + # Node auto-selection (only used if expose_type=nodeport AND upstream_host is not provided) + node_address_type: Literal["InternalIP", "ExternalIP"] = Field( + "InternalIP", + description="Which node address type to prefer when auto-picking a node address for NodePort.", + ) + node_name: str | None = Field( + None, + description="If set, pick this exact node by metadata.name when auto-picking node address.", + ) + node_label_selector: str | None = Field( + None, + description="If set, list nodes with this labelSelector when auto-picking node address.", + ) + + @model_validator(mode="after") + def validate_expose_payload(self) -> Self: + if self.expose_type == "manual" and not self.upstream_host: + raise ValueError("upstream_host is required when expose_type='manual'") + return self + + ############################################################################### # In-memory caches ############################################################################### @@ -1660,6 +1808,956 @@ def _get_certificate_update_command(os_info_content: str | None) -> list[str] | return None +############################################################################### +# Kubernetes helpers functions +############################################################################### + + +def _get_k8s_token() -> str | None: + """Get (and cache) the Kubernetes bearer token.""" + global K8S_TOKEN + if K8S_TOKEN: + return K8S_TOKEN.strip() + if K8S_TOKEN_FILE and os.path.exists(K8S_TOKEN_FILE): + try: + with open(K8S_TOKEN_FILE, encoding="utf-8") as f: + token = f.read().strip() + if token: + K8S_TOKEN = token + return token + except Exception as e: + LOGGER.error("Failed to read Kubernetes token file '%s': %s", K8S_TOKEN_FILE, e) + LOGGER.error( + "Kubernetes bearer token not found. " + "Set HP_K8S_BEARER_TOKEN or HP_K8S_BEARER_TOKEN_FILE when HP_K8S_ENABLED=true." + ) + return None + + +def _get_k8s_ssl_context() -> ssl.SSLContext | bool: + """Return SSL context (or False to disable verification) for K8s API.""" + if not K8S_API_SERVER or not K8S_API_SERVER.startswith("https"): + return False + if not K8S_VERIFY_SSL: + return False + try: + cafile = K8S_CA_FILE if K8S_CA_FILE and os.path.exists(K8S_CA_FILE) else None + return ssl.create_default_context(cafile=cafile) + except Exception as e: + LOGGER.warning("Failed to create SSL context for Kubernetes API: %s", e) + return ssl.create_default_context() + + +def _ensure_k8s_configured() -> None: + if not K8S_ENABLED: + LOGGER.error("Kubernetes backend requested but HP_K8S_ENABLED is not true.") + raise web.HTTPServiceUnavailable(text="Kubernetes backend is disabled in HaRP.") + if not K8S_API_SERVER: + LOGGER.error("Kubernetes backend requested but HP_K8S_API_SERVER is not configured.") + raise web.HTTPServiceUnavailable(text="Kubernetes API server is not configured.") + if not _get_k8s_token(): + raise web.HTTPServiceUnavailable(text="Kubernetes token is not configured.") + + +async def _k8s_request( + method: str, + path: str, + *, + query: dict[str, str] | None = None, + json_body: Any | None = None, + content_type: str | None = None, +) -> tuple[int, dict[str, Any] | None, str]: + """Low-level helper for talking to the Kubernetes API.""" + _ensure_k8s_configured() + token = _get_k8s_token() + assert token # ensured by _ensure_k8s_configured # noqa TO-DO + + headers: dict[str, str] = { + "Authorization": f"Bearer {token}", + "Accept": "application/json", + } + if json_body is not None: + headers["Content-Type"] = content_type or "application/json" + + url = f"{K8S_API_SERVER}{path}" + ssl_ctx = _get_k8s_ssl_context() + connector = aiohttp.TCPConnector(ssl=ssl_ctx) + + async with aiohttp.ClientSession(timeout=K8S_HTTP_TIMEOUT, connector=connector) as session: + try: + async with session.request(method.upper(), url, headers=headers, params=query, json=json_body) as resp: + text = await resp.text() + data: dict[str, Any] | None = None + if "application/json" in resp.headers.get("Content-Type", "") and text: + try: + data = json.loads(text) + except json.JSONDecodeError: + LOGGER.warning("Failed to parse JSON from Kubernetes API %s %s: %s", method, url, text[:200]) + return resp.status, data, text + except aiohttp.ClientError as e: + LOGGER.error("Error communicating with Kubernetes API (%s %s): %s", method, url, e) + raise web.HTTPServiceUnavailable(text="Error communicating with Kubernetes API") from e + + +def _k8s_parse_env(env_list: list[str]) -> list[dict[str, str]]: + """Convert ['KEY=VALUE', ...] to Kubernetes env entries.""" + result: list[dict[str, str]] = [] + for raw in env_list: + if not raw: + continue + if "=" in raw: + name, value = raw.split("=", 1) + result.append({"name": name, "value": value}) + else: + result.append({"name": raw, "value": ""}) # No '=', keep name and use empty value + return result + + +def _k8s_build_resources(resource_limits: dict[str, Any]) -> dict[str, Any]: + """Convert limits to Kubernetes resources. + + Supports both: + - Docker-style: {"memory": , "nanoCPUs": } + - K8s-style: {"memory": "512Mi", "cpu": "500m"} + """ + if not resource_limits: + return {} + limits: dict[str, str] = {} + requests: dict[str, str] = {} + + # Memory + mem_val = resource_limits.get("memory") + mem_str: str | None = None + if isinstance(mem_val, int) and mem_val > 0: + # bytes -> Mi (ceil) + mem_mi = (mem_val + (1024 * 1024 - 1)) // (1024 * 1024) + mem_str = f"{mem_mi}Mi" + elif isinstance(mem_val, str) and mem_val: + mem_str = mem_val # Already in K8s units, e.g. "512Mi" + + if mem_str: + limits["memory"] = mem_str + requests["memory"] = mem_str # conservative: same as limit + + # CPU + cpu_str: str | None = None + nano_cpus = resource_limits.get("nanoCPUs") + if isinstance(nano_cpus, int) and nano_cpus > 0: + milli = (nano_cpus * 1000 + 1_000_000_000 - 1) // 1_000_000_000 # 1e9 nanoCPUs = 1 CPU => millicores + milli = max(1, milli) + cpu_str = f"{milli}m" + else: + cpu_val = resource_limits.get("cpu") + if isinstance(cpu_val, str) and cpu_val: + cpu_str = cpu_val # Already in K8s units, e.g. "500m" + + if cpu_str: + limits["cpu"] = cpu_str + requests["cpu"] = cpu_str + + res: dict[str, Any] = {} + if limits: + res["limits"] = limits + if requests: + res["requests"] = requests + return res + + +def _k8s_build_deployment_manifest(payload: CreateExAppPayload, replicas: int) -> dict[str, Any]: + """Build a Deployment manifest from CreateExAppPayload.""" + deployment_name = payload.exapp_k8s_name + pvc_name = payload.exapp_k8s_volume_name + + labels = { + "app": deployment_name, + "app.kubernetes.io/name": deployment_name, + "app.kubernetes.io/component": "exapp", + } + if payload.instance_id: + labels["app.kubernetes.io/instance"] = payload.instance_id + + container: dict[str, Any] = { + "name": "app", + "image": payload.image_id, + "imagePullPolicy": "IfNotPresent", + "env": _k8s_parse_env(payload.environment_variables), + } + + resources = _k8s_build_resources(payload.resource_limits) + if resources: + container["resources"] = resources + + # Main data volume + volumes = [ + { + "name": "data", + "persistentVolumeClaim": {"claimName": pvc_name}, + } + ] + volume_mounts = [ + { + "name": "data", + "mountPath": f"/{payload.exapp_container_volume}", + } + ] + + if payload.mount_points: + LOGGER.warning( + "Kubernetes backend currently ignores additional mount_points for ExApp '%s'.", + deployment_name, + ) + + container["volumeMounts"] = volume_mounts + + manifest: dict[str, Any] = { + "apiVersion": "apps/v1", + "kind": "Deployment", + "metadata": {"name": deployment_name, "labels": labels}, + "spec": { + "replicas": replicas, + "selector": {"matchLabels": {"app": deployment_name}}, + "template": {"metadata": {"labels": labels}, "spec": {"containers": [container], "volumes": volumes}}, + }, + } + manifest["spec"]["template"]["spec"] = {"containers": [container], "volumes": volumes} + return manifest + + +def _k8s_build_service_manifest( + payload: ExposeExAppPayload, service_type: Literal["NodePort", "ClusterIP", "LoadBalancer"] +) -> dict[str, Any]: + service_name = payload.exapp_k8s_name + labels = { + "app": service_name, + "app.kubernetes.io/name": service_name, + "app.kubernetes.io/component": "exapp", + **(payload.service_labels or {}), + } + if payload.instance_id: + labels.setdefault("app.kubernetes.io/instance", payload.instance_id) + + metadata: dict[str, Any] = {"name": service_name, "labels": labels} + if payload.service_annotations: + metadata["annotations"] = payload.service_annotations + + svc_port = payload.service_port or payload.port + port_entry: dict[str, Any] = { + "name": "http", + "port": svc_port, + "targetPort": payload.port, + } + if service_type == "NodePort" and payload.node_port: + port_entry["nodePort"] = payload.node_port + + spec: dict[str, Any] = { + "type": service_type, + "selector": {"app": service_name}, + "ports": [port_entry], + } + + if payload.external_traffic_policy and service_type in ("NodePort", "LoadBalancer"): + spec["externalTrafficPolicy"] = payload.external_traffic_policy + + if service_type == "LoadBalancer" and payload.load_balancer_ip: + spec["loadBalancerIP"] = payload.load_balancer_ip + + return { + "apiVersion": "v1", + "kind": "Service", + "metadata": metadata, + "spec": spec, + } + + +def _k8s_service_dns_name(service_name: str, namespace: str) -> str: + # Cluster domain suffix is typically .svc.cluster.local, but .svc is enough inside most resolvers. + return f"{service_name}.{namespace}.svc" + + +async def _k8s_pick_node_address( + *, + preferred_type: Literal["InternalIP", "ExternalIP"], + node_name: str | None = None, + label_selector: str | None = None, +) -> str: + query = {"labelSelector": label_selector} if label_selector else None + status, nodes_data, text = await _k8s_request("GET", "/api/v1/nodes", query=query) + if status != 200 or not isinstance(nodes_data, dict): + msg = (nodes_data or {}).get("message") if isinstance(nodes_data, dict) else text + raise web.HTTPServiceUnavailable(text=f"Failed to list Kubernetes nodes: Status {status}, {msg}") + + items = nodes_data.get("items", []) + if node_name: + items = [n for n in items if n.get("metadata", {}).get("name") == node_name] + + if not items: + raise web.HTTPServiceUnavailable(text="No Kubernetes nodes found (after filtering).") + + def is_ready(node: dict[str, Any]) -> bool: + for cond in node.get("status", {}).get("conditions", []) or []: + if cond.get("type") == "Ready" and cond.get("status") == "True": + return True + return False + + ready_nodes = [n for n in items if is_ready(n)] + nodes = ready_nodes or items + + fallback_type = "ExternalIP" if preferred_type == "InternalIP" else "InternalIP" + address_type_order = [preferred_type, fallback_type, "Hostname"] + + for node in nodes: + for t in address_type_order: + for addr in node.get("status", {}).get("addresses", []) or []: + if addr.get("type") == t and addr.get("address"): + return str(addr["address"]) + + raise web.HTTPServiceUnavailable(text="Could not determine a node address (no InternalIP/ExternalIP/Hostname).") + + +def _k8s_extract_nodeport(service: dict[str, Any]) -> int: + ports = (service.get("spec") or {}).get("ports") or [] + if not ports or "nodePort" not in ports[0]: + raise web.HTTPServiceUnavailable(text="Service has no nodePort assigned.") + return int(ports[0]["nodePort"]) + + +def _k8s_extract_service_port(service: dict[str, Any]) -> int: + ports = (service.get("spec") or {}).get("ports") or [] + if not ports or "port" not in ports[0]: + raise web.HTTPServiceUnavailable(text="Service has no port defined.") + return int(ports[0]["port"]) + + +def _k8s_extract_loadbalancer_host(service: dict[str, Any]) -> str | None: + ingress = ((service.get("status") or {}).get("loadBalancer") or {}).get("ingress") or [] + if not ingress: + return None + first = ingress[0] or {} + return first.get("ip") or first.get("hostname") + + +async def _k8s_wait_for_loadbalancer_host(service_name: str, timeout_s: float, interval_s: float) -> str: + deadline = time.time() + max(0.0, timeout_s) + while True: + status, svc, text = await _k8s_request( + "GET", + f"/api/v1/namespaces/{K8S_NAMESPACE}/services/{service_name}", + ) + if status != 200 or not isinstance(svc, dict): + msg = (svc or {}).get("message") if isinstance(svc, dict) else text + raise web.HTTPServiceUnavailable(text=f"Failed to read Service '{service_name}': Status {status}, {msg}") + + host = _k8s_extract_loadbalancer_host(svc) + if host: + return host + + if time.time() >= deadline: + raise web.HTTPServiceUnavailable( + text=f"Timed out waiting for LoadBalancer address for Service '{service_name}'" + ) + + await asyncio.sleep(interval_s) + + +############################################################################### +# Endpoints for AppAPI to work with the Kubernetes API +############################################################################### + + +async def k8s_exapp_exists(request: web.Request): + try: + payload_dict = await request.json() + except json.JSONDecodeError: + raise web.HTTPBadRequest(text="Invalid JSON body") from None + try: + payload = ExAppName.model_validate(payload_dict) + except ValidationError as e: + raise web.HTTPBadRequest(text=f"Payload validation error: {e}") from None + + deployment_name = payload.exapp_k8s_name + LOGGER.debug( + "Checking for Kubernetes deployment '%s' in namespace '%s'.", + deployment_name, + K8S_NAMESPACE, + ) + status, data, _ = await _k8s_request( + "GET", + f"/apis/apps/v1/namespaces/{K8S_NAMESPACE}/deployments/{deployment_name}", + ) + if status == 200: + LOGGER.info("Kubernetes deployment '%s' exists.", deployment_name) + return web.json_response({"exists": True}) + if status == 404: + LOGGER.info("Kubernetes deployment '%s' does not exist.", deployment_name) + return web.json_response({"exists": False}) + msg = (data or {}).get("message", "") + LOGGER.error( + "Error checking Kubernetes deployment '%s' (status %s): %s", + deployment_name, + status, + msg, + ) + raise web.HTTPServiceUnavailable(text=f"Error checking Kubernetes deployment '{deployment_name}': Status {status}") + + +async def k8s_exapp_create(request: web.Request): + try: + payload_dict = await request.json() + except json.JSONDecodeError: + LOGGER.warning("Invalid JSON body received for /k8s/exapp/create") + raise web.HTTPBadRequest(text="Invalid JSON body") from None + try: + payload = CreateExAppPayload.model_validate(payload_dict) + except ValidationError as e: + LOGGER.warning("Payload validation error for /k8s/exapp/create: %s", e) + raise web.HTTPBadRequest(text=f"Payload validation error: {e}") from None + + deployment_name = payload.exapp_k8s_name + pvc_name = payload.exapp_k8s_volume_name + + LOGGER.info( + "Creating Kubernetes resources for ExApp '%s' (Deployment=%s, PVC=%s, namespace=%s).", + payload.name, + deployment_name, + pvc_name, + K8S_NAMESPACE, + ) + + # 1) PVC for data + pvc_manifest: dict[str, Any] = { + "apiVersion": "v1", + "kind": "PersistentVolumeClaim", + "metadata": { + "name": pvc_name, + "labels": { + "app": deployment_name, + "app.kubernetes.io/name": deployment_name, + "app.kubernetes.io/component": "exapp", + }, + }, + "spec": { + "accessModes": ["ReadWriteOnce"], + "resources": {"requests": {"storage": K8S_DEFAULT_STORAGE_SIZE}}, + }, + } + if K8S_STORAGE_CLASS: + pvc_manifest["spec"]["storageClassName"] = K8S_STORAGE_CLASS + + status, data, text = await _k8s_request( + "POST", + f"/api/v1/namespaces/{K8S_NAMESPACE}/persistentvolumeclaims", + json_body=pvc_manifest, + ) + if status in (200, 201): + LOGGER.info("PVC '%s' created for ExApp '%s'.", pvc_name, deployment_name) + elif status == 409: + LOGGER.info("PVC '%s' already exists for ExApp '%s'.", pvc_name, deployment_name) + else: + msg = (data or {}).get("message") if isinstance(data, dict) else text + LOGGER.error( + "Failed to create PVC '%s' for exapp '%s' (status %s): %s", + pvc_name, + deployment_name, + status, + msg, + ) + raise web.HTTPServiceUnavailable(text=f"Failed to create PVC '{pvc_name}': Status {status}") + + # 2) Deployment with replicas=0 (start/stop handled separately) + deployment_manifest = _k8s_build_deployment_manifest(payload, replicas=0) + status, data, text = await _k8s_request( + "POST", + f"/apis/apps/v1/namespaces/{K8S_NAMESPACE}/deployments", + json_body=deployment_manifest, + ) + if status in (200, 201): + LOGGER.info("Kubernetes deployment '%s' created.", deployment_name) + # Docker endpoint returns {"id": ..., "name": ...}; we don't have Deployment UID here, only name. + return web.json_response({"name": deployment_name}, status=201) + if status == 409: + LOGGER.warning("Kubernetes deployment '%s' already exists.", deployment_name) + raise web.HTTPConflict(text=f"Deployment '{deployment_name}' already exists.") + msg = (data or {}).get("message") if isinstance(data, dict) else text + LOGGER.error( + "Error creating Kubernetes deployment '%s' (status %s): %s", + deployment_name, + status, + msg, + ) + raise web.HTTPServiceUnavailable(text=f"Error creating deployment '{deployment_name}': Status {status}") + + +async def k8s_exapp_start(request: web.Request): + try: + payload_dict = await request.json() + except json.JSONDecodeError: + LOGGER.warning("Invalid JSON body received for /k8s/exapp/start") + raise web.HTTPBadRequest(text="Invalid JSON body") from None + try: + payload = ExAppName.model_validate(payload_dict) + except ValidationError as e: + LOGGER.warning("Payload validation error for /k8s/exapp/start: %s", e) + raise web.HTTPBadRequest(text=f"Payload validation error: {e}") from None + + deployment_name = payload.exapp_k8s_name + patch_body = {"spec": {"replicas": 1}} + + LOGGER.info( + "Scaling Kubernetes deployment '%s' to 1 replica in namespace '%s'.", + deployment_name, + K8S_NAMESPACE, + ) + + status, data, text = await _k8s_request( + "PATCH", + f"/apis/apps/v1/namespaces/{K8S_NAMESPACE}/deployments/{deployment_name}", + json_body=patch_body, + content_type="application/strategic-merge-patch+json", + ) + if status in (200, 201): + LOGGER.info("Deployment '%s' scaled to 1 replica.", deployment_name) + return web.HTTPNoContent() + if status == 404: + LOGGER.warning("Deployment '%s' not found when trying to start.", deployment_name) + raise web.HTTPNotFound(text=f"Deployment '{deployment_name}' not found.") + msg = (data or {}).get("message") if isinstance(data, dict) else text + LOGGER.error( + "Error scaling deployment '%s' to 1 replica (status %s): %s", + deployment_name, + status, + msg, + ) + raise web.HTTPServiceUnavailable(text=f"Error starting deployment '{deployment_name}': Status {status}") + + +async def k8s_exapp_stop(request: web.Request): + try: + payload_dict = await request.json() + except json.JSONDecodeError: + LOGGER.warning("Invalid JSON body received for /k8s/exapp/stop") + raise web.HTTPBadRequest(text="Invalid JSON body") from None + try: + payload = ExAppName.model_validate(payload_dict) + except ValidationError as e: + LOGGER.warning("Payload validation error for /k8s/exapp/stop: %s", e) + raise web.HTTPBadRequest(text=f"Payload validation error: {e}") from None + + deployment_name = payload.exapp_k8s_name + patch_body = {"spec": {"replicas": 0}} + + LOGGER.info( + "Scaling Kubernetes deployment '%s' to 0 replicas in namespace '%s'.", + deployment_name, + K8S_NAMESPACE, + ) + + status, data, text = await _k8s_request( + "PATCH", + f"/apis/apps/v1/namespaces/{K8S_NAMESPACE}/deployments/{deployment_name}", + json_body=patch_body, + content_type="application/strategic-merge-patch+json", + ) + if status in (200, 201): + LOGGER.info("Deployment '%s' scaled to 0 replicas.", deployment_name) + return web.HTTPNoContent() + if status == 404: + LOGGER.warning("Deployment '%s' not found when trying to stop.", deployment_name) + raise web.HTTPNotFound(text=f"Deployment '{deployment_name}' not found.") + msg = (data or {}).get("message") if isinstance(data, dict) else text + LOGGER.error( + "Error scaling deployment '%s' to 0 replicas (status %s): %s", + deployment_name, + status, + msg, + ) + raise web.HTTPServiceUnavailable(text=f"Error stopping deployment '{deployment_name}': Status {status}") + + +async def k8s_exapp_wait_for_start(request: web.Request): + try: + payload_dict = await request.json() + except json.JSONDecodeError: + LOGGER.warning("Invalid JSON body received for /k8s/exapp/wait_for_start") + raise web.HTTPBadRequest(text="Invalid JSON body") from None + try: + payload = ExAppName.model_validate(payload_dict) + except ValidationError as e: + LOGGER.warning("Payload validation error for /k8s/exapp/wait_for_start: %s", e) + raise web.HTTPBadRequest(text=f"Payload validation error: {e}") from None + + deployment_name = payload.exapp_k8s_name + label_selector = f"app={deployment_name}" + + max_tries = 180 + sleep_interval = 0.5 + + LOGGER.info( + "Waiting for Kubernetes pod(s) of deployment '%s' to become Ready " + "(namespace=%s, max_tries=%d, interval=%.1fs).", + deployment_name, + K8S_NAMESPACE, + max_tries, + sleep_interval, + ) + + last_phase: str | None = None + last_reason: str | None = None + last_message: str | None = None + + for attempt in range(max_tries): + status, data, text = await _k8s_request( + "GET", + f"/api/v1/namespaces/{K8S_NAMESPACE}/pods", + query={"labelSelector": label_selector}, + ) + if status != 200: + msg = (data or {}).get("message") if isinstance(data, dict) else text + LOGGER.error( + "Error listing pods for deployment '%s' (status %s, attempt %d): %s", + deployment_name, + status, + attempt + 1, + msg, + ) + raise web.HTTPServiceUnavailable( + text=f"Error listing pods for deployment '{deployment_name}': Status {status}" + ) + + items = (data or {}).get("items", []) if isinstance(data, dict) else [] + if not items: + LOGGER.debug( + "No pods yet for deployment '%s' (attempt %d/%d).", + deployment_name, + attempt + 1, + max_tries, + ) + last_phase = "Pending" + else: + # Take the first pod; for single-replica deployments this is enough. + pod = items[0] + pod_status = pod.get("status", {}) + phase = pod_status.get("phase", "Unknown") + last_phase = phase + conditions = pod_status.get("conditions", []) + ready = any(c.get("type") == "Ready" and c.get("status") == "True" for c in conditions) + last_reason = pod_status.get("reason") + last_message = pod_status.get("message") + + LOGGER.debug( + "Pod status for '%s' (attempt %d/%d): phase=%s, ready=%s, reason=%s, message=%s", + deployment_name, + attempt + 1, + max_tries, + phase, + ready, + last_reason, + last_message, + ) + + if phase == "Running" and ready: + LOGGER.info("Deployment '%s' pod is Running and Ready.", deployment_name) + return web.json_response( + { + "started": True, + "status": "running", + "health": "ready", + "reason": last_reason, + "message": last_message, + } + ) + + if phase in ("Failed", "Unknown", "Succeeded"): + LOGGER.warning( + "Deployment '%s' pod is in phase '%s', treating as not successfully started.", + deployment_name, + phase, + ) + return web.json_response( + { + "started": False, + "status": phase, + "health": "not_ready", + "reason": last_reason, + "message": last_message, + } + ) + + if attempt < max_tries - 1: + await asyncio.sleep(sleep_interval) + + LOGGER.warning( + "Deployment '%s' did not become Ready within %d attempts.", + deployment_name, + max_tries, + ) + return web.json_response( + { + "started": False, + "status": last_phase or "unknown", + "health": "timeout", + "reason": last_reason, + "message": last_message, + } + ) + + +async def k8s_exapp_remove(request: web.Request): + try: + payload_dict = await request.json() + except json.JSONDecodeError: + LOGGER.warning("Invalid JSON body received for /k8s/exapp/remove") + raise web.HTTPBadRequest(text="Invalid JSON body") from None + try: + payload = RemoveExAppPayload.model_validate(payload_dict) + except ValidationError as e: + LOGGER.warning("Payload validation error for /k8s/exapp/remove: %s", e) + raise web.HTTPBadRequest(text=f"Payload validation error: {e}") from None + + deployment_name = payload.exapp_k8s_name + pvc_name = payload.exapp_k8s_volume_name + service_name = payload.exapp_k8s_name + + LOGGER.info( + "Removing Kubernetes deployment '%s' (namespace=%s, remove_data=%s).", + deployment_name, + K8S_NAMESPACE, + payload.remove_data, + ) + + # Delete Deployment + status, data, text = await _k8s_request( + "DELETE", + f"/apis/apps/v1/namespaces/{K8S_NAMESPACE}/deployments/{deployment_name}", + ) + if status in (200, 202, 404): + LOGGER.info("Deployment '%s' removed or did not exist (status=%s).", deployment_name, status) + else: + msg = (data or {}).get("message") if isinstance(data, dict) else text + LOGGER.error( + "Error removing deployment '%s' (status %s): %s", + deployment_name, + status, + msg, + ) + raise web.HTTPServiceUnavailable(text=f"Error removing deployment '{deployment_name}': Status {status}") + + # Optionally delete PVC (data) + if payload.remove_data: + LOGGER.info("Removing PVC '%s' for deployment '%s'.", pvc_name, deployment_name) + status, data, text = await _k8s_request( + "DELETE", + f"/api/v1/namespaces/{K8S_NAMESPACE}/persistentvolumeclaims/{pvc_name}", + ) + if status in (200, 202, 404): + LOGGER.info("PVC '%s' removed or did not exist (status=%s).", pvc_name, status) + else: + msg = (data or {}).get("message") if isinstance(data, dict) else text + LOGGER.error( + "Error removing PVC '%s' (status %s): %s", + pvc_name, + status, + msg, + ) + raise web.HTTPServiceUnavailable(text=f"Error removing PVC '{pvc_name}': Status {status}") + + # Always try to delete Service (if ExApp was exposed) + LOGGER.info("Removing Service '%s' for deployment '%s'.", service_name, deployment_name) + status, data, text = await _k8s_request( + "DELETE", + f"/api/v1/namespaces/{K8S_NAMESPACE}/services/{service_name}", + ) + + if status in (200, 202, 404): + LOGGER.info("Service '%s' removed or did not exist (status=%s).", service_name, status) + return web.HTTPNoContent() + msg = (data or {}).get("message") if isinstance(data, dict) else text + LOGGER.error("Error removing Service '%s' (status %s): %s", service_name, status, msg) + raise web.HTTPServiceUnavailable(text=f"Error removing Service '{service_name}': Status {status}") + + +async def k8s_exapp_install_certificates(request: web.Request): + """Kubernetes backend: install_certificates is currently a no-op. + + For Kubernetes, we recommend handling system and FRP certificates via Secrets + and volume mounts in the Deployment spec rather than exec'ing into containers. + """ + try: + _ = InstallCertificatesPayload.model_validate(await request.json()) + except json.JSONDecodeError: + LOGGER.warning("Invalid JSON body received for /k8s/exapp/install_certificates") + raise web.HTTPBadRequest(text="Invalid JSON body") from None + except ValidationError as e: + LOGGER.warning("Payload validation error for /k8s/exapp/install_certificates: %s", e) + raise web.HTTPBadRequest(text=f"Payload validation error: {e}") from None + + LOGGER.info( + "Kubernetes backend: /k8s/exapp/install_certificates is a no-op. " + "Use Kubernetes Secrets + volume mounts instead." + ) + return web.HTTPNoContent() + + +async def k8s_exapp_expose(request: web.Request): + try: + payload_dict = await request.json() + except json.JSONDecodeError: + LOGGER.warning("Invalid JSON body received for /k8s/exapp/expose") + raise web.HTTPBadRequest(text="Invalid JSON body") from None + + try: + payload = ExposeExAppPayload.model_validate(payload_dict) + except ValidationError as e: + LOGGER.warning("Payload validation error for /k8s/exapp/expose: %s", e) + raise web.HTTPBadRequest(text=f"Payload validation error: {e}") from None + + app_id = payload.name.lower() + service_name = payload.exapp_k8s_name + + # 0) Manual mode: no Kubernetes calls, just register upstream endpoint + if payload.expose_type == "manual": + upstream_host = payload.upstream_host # validated non-empty + upstream_port = int(payload.upstream_port or payload.port) + LOGGER.info( + "Expose ExApp '%s' (manual): registering upstream %s:%d", + app_id, + upstream_host, + upstream_port, + ) + else: + _ensure_k8s_configured() + + # 1) Ensure Service exists with desired type + if payload.expose_type == "nodeport": + desired_type: Literal["NodePort", "ClusterIP", "LoadBalancer"] = "NodePort" + elif payload.expose_type == "clusterip": + desired_type = "ClusterIP" + elif payload.expose_type == "loadbalancer": + desired_type = "LoadBalancer" + else: + raise web.HTTPBadRequest(text=f"Unknown expose_type '{payload.expose_type}'") + + service_manifest = _k8s_build_service_manifest(payload, desired_type) + LOGGER.info( + "Ensuring Service for ExApp '%s' (service=%s, type=%s, namespace=%s).", + app_id, + service_name, + desired_type, + K8S_NAMESPACE, + ) + + status, data, text = await _k8s_request( + "POST", + f"/api/v1/namespaces/{K8S_NAMESPACE}/services", + json_body=service_manifest, + ) + if status in (200, 201): + LOGGER.info("Service '%s' created.", service_name) + elif status == 409: + LOGGER.info("Service '%s' already exists, will re-use it.", service_name) + else: + msg = (data or {}).get("message") if isinstance(data, dict) else text + LOGGER.error("Failed to create Service '%s' (status %s): %s", service_name, status, msg) + raise web.HTTPServiceUnavailable(text=f"Failed to create Service '{service_name}': Status {status}") + + # 2) Read Service back + status, svc, text = await _k8s_request( + "GET", + f"/api/v1/namespaces/{K8S_NAMESPACE}/services/{service_name}", + ) + if status != 200 or not isinstance(svc, dict): + msg = (svc or {}).get("message") if isinstance(svc, dict) else text + LOGGER.error("Failed to read Service '%s' (status %s): %s", service_name, status, msg) + raise web.HTTPServiceUnavailable(text=f"Failed to read Service '{service_name}': Status {status}") + + # 3) Determine upstream endpoint HaRP should use + if payload.expose_type == "nodeport": + node_port = _k8s_extract_nodeport(svc) + upstream_port = node_port + + # Prefer explicit upstream_host (recommended); else auto-pick a node address + if payload.upstream_host: + upstream_host = payload.upstream_host + else: + upstream_host = await _k8s_pick_node_address( + preferred_type=payload.node_address_type, + node_name=payload.node_name, + label_selector=payload.node_label_selector, + ) + + LOGGER.info( + "Expose ExApp '%s' (nodeport): upstream %s:%d (service=%s).", + app_id, + upstream_host, + upstream_port, + service_name, + ) + + elif payload.expose_type == "clusterip": + upstream_port = _k8s_extract_service_port(svc) + upstream_host = payload.upstream_host or _k8s_service_dns_name(service_name, K8S_NAMESPACE) + + LOGGER.info( + "Expose ExApp '%s' (clusterip): upstream %s:%d (service=%s).", + app_id, + upstream_host, + upstream_port, + service_name, + ) + + else: # loadbalancer + upstream_port = _k8s_extract_service_port(svc) + + if payload.upstream_host: + upstream_host = payload.upstream_host + else: + upstream_host = _k8s_extract_loadbalancer_host(svc) + if not upstream_host: + upstream_host = await _k8s_wait_for_loadbalancer_host( + service_name, + timeout_s=payload.wait_timeout_seconds, + interval_s=payload.wait_interval_seconds, + ) + + LOGGER.info( + "Expose ExApp '%s' (loadbalancer): upstream %s:%d (service=%s).", + app_id, + upstream_host, + upstream_port, + service_name, + ) + + # 4) Fetch ExApp metadata from Nextcloud and override host/port registered in HaRP cache + try: + exapp_meta = await nc_get_exapp(app_id) + if not exapp_meta: + LOGGER.error("No ExApp metadata for '%s' in Nextcloud.", app_id) + raise web.HTTPNotFound(text=f"No ExApp metadata for '{app_id}'") + except web.HTTPException: + raise + except Exception as e: + LOGGER.exception("Failed to fetch ExApp metadata for '%s'", app_id) + raise web.HTTPServiceUnavailable(text=f"Failed to fetch metadata for '{app_id}'") from e + + exapp_meta.host = upstream_host + exapp_meta.port = int(upstream_port) + exapp_meta.resolved_host = "" # force resolve_ip again + + async with EXAPP_CACHE_LOCK: + EXAPP_CACHE[app_id] = exapp_meta + + # Keep old response fields, add useful extras + return web.json_response( + { + "appId": app_id, + "host": upstream_host, + "port": int(upstream_port), + "exposeType": payload.expose_type, + "serviceName": service_name, + "namespace": K8S_NAMESPACE, + } + ) + + ############################################################################### # HTTP Server Setup ############################################################################### @@ -1685,6 +2783,16 @@ def create_web_app() -> web.Application: app.router.add_post("/docker/exapp/wait_for_start", docker_exapp_wait_for_start) app.router.add_post("/docker/exapp/remove", docker_exapp_remove) app.router.add_post("/docker/exapp/install_certificates", docker_exapp_install_certificates) + + # Kubernetes APIs wrappers + app.router.add_post("/k8s/exapp/exists", k8s_exapp_exists) + app.router.add_post("/k8s/exapp/create", k8s_exapp_create) + app.router.add_post("/k8s/exapp/start", k8s_exapp_start) + app.router.add_post("/k8s/exapp/stop", k8s_exapp_stop) + app.router.add_post("/k8s/exapp/wait_for_start", k8s_exapp_wait_for_start) + app.router.add_post("/k8s/exapp/remove", k8s_exapp_remove) + app.router.add_post("/k8s/exapp/install_certificates", k8s_exapp_install_certificates) + app.router.add_post("/k8s/exapp/expose", k8s_exapp_expose) return app