"""Enhanced services module v2 with PBS, VM/LXC, storage pools, events.""" import asyncio from typing import Dict, Any, Optional, List import httpx from dataclasses import dataclass, field from datetime import datetime from collections import deque @dataclass class HealthStatus: name: str status: str response_time_ms: Optional[float] = None error: Optional[str] = None @dataclass class NodeStatus: name: str ip: str status: str cpu_percent: Optional[float] = None memory_percent: Optional[float] = None memory_used_gb: Optional[float] = None memory_total_gb: Optional[float] = None disk_percent: Optional[float] = None uptime_hours: Optional[float] = None vms: List[Dict] = field(default_factory=list) containers: List[Dict] = field(default_factory=list) @dataclass class DockerContainer: name: str status: str state: str image: str host: str @dataclass class UptimeMonitor: id: int name: str status: int ping: Optional[int] = None heartbeats: Optional[List[Dict]] = None @dataclass class PBSStatus: status: str datastore_usage: List[Dict] = field(default_factory=list) last_backup: Optional[str] = None total_size_gb: float = 0 used_size_gb: float = 0 @dataclass class StoragePool: name: str node: str total_gb: float used_gb: float avail_gb: float percent_used: float pool_type: str @dataclass class StatusEvent: timestamp: datetime service: str old_status: str new_status: str # Recent events storage (in-memory, last 20) recent_events: deque = deque(maxlen=20) last_status_cache: Dict[str, str] = {} SERVICE_CHECK_OVERRIDES = { "OPNsense": ("https://192.168.1.1:8443/", 10.0), "Vaultwarden": ("https://vault.deathstar-home.one/", 5.0), "Immich": ("http://192.168.1.54:2283/", 5.0), } async def check_service(client: httpx.AsyncClient, service) -> HealthStatus: """Check if a service is reachable.""" global last_status_cache, recent_events if service.name in SERVICE_CHECK_OVERRIDES: check_url, timeout = SERVICE_CHECK_OVERRIDES[service.name] else: https_ports = [443, 8006, 8007, 8443, 9443] scheme = "https" if service.port in https_ports else "http" check_url = f"{scheme}://{service.ip}:{service.port}/" timeout = 5.0 start = asyncio.get_event_loop().time() try: response = await client.get(check_url, timeout=timeout, follow_redirects=True) elapsed = (asyncio.get_event_loop().time() - start) * 1000 new_status = "online" if response.status_code < 500 else "degraded" result = HealthStatus(name=service.name, status=new_status, response_time_ms=round(elapsed, 1)) except: new_status = "offline" result = HealthStatus(name=service.name, status="offline") # Track status changes old_status = last_status_cache.get(service.name) if old_status and old_status != new_status: recent_events.append(StatusEvent( timestamp=datetime.now(), service=service.name, old_status=old_status, new_status=new_status )) last_status_cache[service.name] = new_status return result async def check_all_services(services) -> Dict[str, HealthStatus]: """Check all services concurrently.""" async with httpx.AsyncClient(verify=False, timeout=10.0) as client: tasks = [check_service(client, s) for s in services] results = await asyncio.gather(*tasks) return {r.name: r for r in results} async def get_proxmox_node_metrics(client: httpx.AsyncClient, node: Dict, token: str, secret: str) -> NodeStatus: """Get Proxmox node metrics including VMs and containers.""" base_url = f"https://{node['ip']}:{node['port']}/api2/json" headers = {"Authorization": f"PVEAPIToken={token}={secret}"} result = NodeStatus(name=node["name"], ip=node["ip"], status="offline") try: # Get node status response = await client.get(f"{base_url}/nodes/{node['name']}/status", headers=headers, timeout=5.0) if response.status_code == 200: data = response.json()["data"] cpu = data.get("cpu", 0) * 100 mem_used = data.get("memory", {}).get("used", 0) mem_total = data.get("memory", {}).get("total", 1) mem_pct = (mem_used / mem_total) * 100 if mem_total else 0 disk_used = data.get("rootfs", {}).get("used", 0) disk_total = data.get("rootfs", {}).get("total", 1) disk_pct = (disk_used / disk_total) * 100 if disk_total else 0 uptime_sec = data.get("uptime", 0) result.status = "online" result.cpu_percent = round(cpu, 1) result.memory_percent = round(mem_pct, 1) result.memory_used_gb = round(mem_used / (1024**3), 1) result.memory_total_gb = round(mem_total / (1024**3), 1) result.disk_percent = round(disk_pct, 1) result.uptime_hours = round(uptime_sec / 3600, 1) # Get VMs vm_response = await client.get(f"{base_url}/nodes/{node['name']}/qemu", headers=headers, timeout=5.0) if vm_response.status_code == 200: for vm in vm_response.json().get("data", []): result.vms.append({ "vmid": vm.get("vmid"), "name": vm.get("name", f"VM {vm.get('vmid')}"), "status": vm.get("status"), "mem": round(vm.get("mem", 0) / (1024**3), 1) if vm.get("mem") else 0, "cpu": round(vm.get("cpu", 0) * 100, 1) if vm.get("cpu") else 0, }) # Get containers ct_response = await client.get(f"{base_url}/nodes/{node['name']}/lxc", headers=headers, timeout=5.0) if ct_response.status_code == 200: for ct in ct_response.json().get("data", []): result.containers.append({ "vmid": ct.get("vmid"), "name": ct.get("name", f"CT {ct.get('vmid')}"), "status": ct.get("status"), "mem": round(ct.get("mem", 0) / (1024**3), 1) if ct.get("mem") else 0, "cpu": round(ct.get("cpu", 0) * 100, 1) if ct.get("cpu") else 0, }) except: pass return result async def get_all_proxmox_metrics(nodes, token: str, secret: str) -> List[NodeStatus]: """Get metrics for all Proxmox nodes.""" async with httpx.AsyncClient(verify=False) as client: tasks = [get_proxmox_node_metrics(client, n, token, secret) for n in nodes] return await asyncio.gather(*tasks) async def get_pbs_status(url: str, token: str, secret: str) -> PBSStatus: """Get PBS backup server status.""" result = PBSStatus(status="offline") headers = {"Authorization": f"PBSAPIToken={token}:{secret}"} try: async with httpx.AsyncClient(verify=False, timeout=10.0) as client: # Get datastore status ds_response = await client.get(f"{url}/api2/json/status/datastore-usage", headers=headers) if ds_response.status_code == 200: result.status = "online" for ds in ds_response.json().get("data", []): total = ds.get("total", 0) used = ds.get("used", 0) result.datastore_usage.append({ "name": ds.get("store"), "total_gb": round(total / (1024**3), 1), "used_gb": round(used / (1024**3), 1), "percent": round((used / total) * 100, 1) if total else 0, }) result.total_size_gb += total / (1024**3) result.used_size_gb += used / (1024**3) # Try to get last backup task tasks_response = await client.get(f"{url}/api2/json/nodes/localhost/tasks", headers=headers) if tasks_response.status_code == 200: tasks = tasks_response.json().get("data", []) backup_tasks = [t for t in tasks if t.get("type") == "backup"] if backup_tasks: last = backup_tasks[0] result.last_backup = datetime.fromtimestamp(last.get("starttime", 0)).strftime("%Y-%m-%d %H:%M") except: pass return result async def get_storage_pools(nodes, token: str, secret: str) -> List[StoragePool]: """Get storage pool info from all Proxmox nodes.""" pools = [] headers = {"Authorization": f"PVEAPIToken={token}={secret}"} async with httpx.AsyncClient(verify=False, timeout=10.0) as client: for node in nodes: try: url = f"https://{node['ip']}:{node['port']}/api2/json/nodes/{node['name']}/storage" response = await client.get(url, headers=headers) if response.status_code == 200: for storage in response.json().get("data", []): if storage.get("enabled") and storage.get("total"): total = storage.get("total", 0) used = storage.get("used", 0) avail = storage.get("avail", 0) pools.append(StoragePool( name=storage.get("storage"), node=node["name"], total_gb=round(total / (1024**3), 1), used_gb=round(used / (1024**3), 1), avail_gb=round(avail / (1024**3), 1), percent_used=round((used / total) * 100, 1) if total else 0, pool_type=storage.get("type", "unknown"), )) except: pass return pools async def get_docker_containers(hosts: List[Dict]) -> List[DockerContainer]: """Get Docker containers via docker-socket-proxy.""" containers = [] async with httpx.AsyncClient(timeout=5.0) as client: for host in hosts: try: url = f"http://{host['ip']}:{host['port']}/containers/json?all=true" response = await client.get(url) if response.status_code == 200: for c in response.json(): name = c.get("Names", ["/unknown"])[0].lstrip("/") if name == "docker-socket-proxy": continue containers.append(DockerContainer( name=name, status=c.get("Status", ""), state=c.get("State", "unknown"), image=c.get("Image", "").split("/")[-1].split(":")[0], host=host["name"] )) except: pass return containers async def get_docker_container_counts(hosts: List[Dict]) -> Dict[str, int]: """Get container counts per host.""" counts = {} async with httpx.AsyncClient(timeout=5.0) as client: for host in hosts: try: url = f"http://{host['ip']}:{host['port']}/containers/json" response = await client.get(url) if response.status_code == 200: # Subtract 1 for docker-socket-proxy count = len([c for c in response.json() if "docker-socket-proxy" not in c.get("Names", [""])[0]]) counts[host["name"]] = count except: counts[host["name"]] = 0 return counts async def get_uptime_kuma_status(url: str, status_page: str = "uptime") -> Dict: """Get Uptime Kuma status.""" result = {"monitors": [], "summary": {"up": 0, "down": 0, "total": 0}} try: async with httpx.AsyncClient(timeout=5.0) as client: hb_response = await client.get(f"{url}/api/status-page/heartbeat/{status_page}") info_response = await client.get(f"{url}/api/status-page/{status_page}") if hb_response.status_code == 200 and info_response.status_code == 200: heartbeats = hb_response.json().get("heartbeatList", {}) info = info_response.json() for group in info.get("publicGroupList", []): for monitor in group.get("monitorList", []): monitor_id = str(monitor.get("id")) monitor_heartbeats = heartbeats.get(monitor_id, []) latest_status = 0 latest_ping = None if monitor_heartbeats: latest = monitor_heartbeats[-1] latest_status = latest.get("status", 0) latest_ping = latest.get("ping") recent_hb = monitor_heartbeats[-20:] if monitor_heartbeats else [] result["monitors"].append(UptimeMonitor( id=monitor.get("id"), name=monitor.get("name"), status=latest_status, ping=latest_ping, heartbeats=[{"status": h.get("status", 0), "ping": h.get("ping")} for h in recent_hb] )) if latest_status == 1: result["summary"]["up"] += 1 else: result["summary"]["down"] += 1 result["summary"]["total"] += 1 except: pass return result async def get_prometheus_metrics(url: str, queries: Dict[str, str]) -> Dict[str, Any]: """Query Prometheus for metrics.""" results = {} try: async with httpx.AsyncClient(timeout=5.0) as client: for name, query in queries.items(): response = await client.get(f"{url}/api/v1/query", params={"query": query}) if response.status_code == 200: data = response.json().get("data", {}).get("result", []) if data: results[name] = float(data[0].get("value", [0, 0])[1]) except: pass return results async def get_camera_list(go2rtc_url: str) -> List[str]: """Get camera list from go2rtc.""" try: async with httpx.AsyncClient(timeout=5.0) as client: response = await client.get(f"{go2rtc_url}/api/streams") if response.status_code == 200: return list(response.json().keys()) except: pass return [] async def get_sabnzbd_queue(url: str, api_key: str = "") -> Dict: """Get Sabnzbd download queue.""" try: async with httpx.AsyncClient(timeout=5.0) as client: params = {"mode": "queue", "output": "json"} if api_key: params["apikey"] = api_key response = await client.get(f"{url}/api", params=params) if response.status_code == 200: data = response.json().get("queue", {}) return { "speed": data.get("speed", "0 B/s"), "size_left": data.get("sizeleft", "0 B"), "eta": data.get("timeleft", "Unknown"), "downloading": len(data.get("slots", [])), "items": [ {"name": s.get("filename", "Unknown")[:40], "progress": float(s.get("percentage", 0))} for s in data.get("slots", [])[:3] ] } except: pass return {"speed": "N/A", "downloading": 0, "items": []} def get_recent_events() -> List[StatusEvent]: """Get recent status change events.""" return list(recent_events) def get_cluster_uptime(nodes: List[NodeStatus]) -> float: """Calculate total cluster uptime in hours.""" total = 0 for node in nodes: if node.uptime_hours: total += node.uptime_hours return round(total, 1)