vmsig: management daemon, runtime endpoint lifecycle, roster, discovery, in-tree drivers, packaging

- core: runtime attach/detach of a per-endpoint adapter trio (runtime-safe add_adapter + vmsig_core_detach_endpoint, deferred reap)
- roster: VMSIG_EV_ROSTER + CAP_ROSTER, retained per-endpoint and replayed to late subscribers
- discovery: inotify trigger dir, vmid/endpoint slot allocator, host probe; vmsigd daemon with config + per-uid admission
- input driver and vgpu perception built in-tree; vgpu perception as a separate library
- memctx: own the supplied ro_fd (closed at detach)
- deb packaging: install rules, systemd unit, tmpfiles, default config
This commit is contained in:
2026-06-22 17:25:06 +03:00
parent 0d387a4249
commit 9bde398b6c
55 changed files with 4703 additions and 61 deletions
+363
View File
@@ -0,0 +1,363 @@
/* discovery.c — runtime VM discovery state machine (see discovery.h).
*
* Single-threaded on the loop thread (inotify + timer sources via core_add_source). On a
* "vm-<vmid>-ram" file appearing it corroborates the candidate (host-probe seam), assigns a
* stable endpoint slot, hot-plugs the trio (sink), and publishes the roster; on the file
* disappearing it tears the endpoint down and publishes a roster DETACH. QMP-not-up-yet is a
* transient retry driven by a timerfd (no busy-wait); config errors / stale files drop. */
#define _GNU_SOURCE
#include "discovery.h"
#include "slot.h"
#include "core_internal.h" /* core_roster_publish */
#include "memctx.h" /* vmsig_memctx_cfg */
#include "vmhost.h" /* vmsig_vmhost_cfg */
#include "input.h" /* vmsig_input_cfg */
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <stdint.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <time.h>
#include <dirent.h>
#include <sys/inotify.h>
#include <sys/timerfd.h>
#define DISC_PATH_MAX 256
#define DISC_RETRY_MAX 40 /* give up after ~tens of seconds of QMP-not-up */
#define DISC_BACKOFF_BASE 50000000ull /* 50 ms */
#define DISC_BACKOFF_CAP 2000000000ull /* 2 s */
typedef enum { CAND_FREE = 0, CAND_PROBING, CAND_ATTACHED } cand_state;
typedef struct {
cand_state state;
uint32_t vmid;
int endpoint; /* -1 until attached */
int attempts;
uint64_t next_probe_ns; /* monotonic deadline for the next retry */
vmsig_host_facts facts; /* probe working copy */
} cand_ent;
struct vmsig_discovery {
vmsig_core* core;
char watch_dir[DISC_PATH_MAX];
char slots_path[DISC_PATH_MAX];
int persist;
vmsig_host_probe probe;
vmsig_discovery_sink sink;
int ifd; /* inotify */
int wd;
int tfd; /* retry timerfd */
slot_table slots;
cand_ent cand[VMSIG_SLOT_COUNT];
/* Stable per-endpoint home for the adapter cfg strings (ram_path/qmp_path): the adapters
* keep pointers, and detach is deferred, so this must outlive the candidate. Overwritten
* only on the NEXT attach to the endpoint, which never races a still-open prior adapter. */
vmsig_host_facts ep_facts[VMSIG_SLOT_COUNT];
};
static uint64_t now_ns(void) {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return (uint64_t)ts.tv_sec * 1000000000ull + (uint64_t)ts.tv_nsec;
}
static uint64_t backoff_ns(int attempts) {
uint64_t b = DISC_BACKOFF_BASE << (attempts < 6 ? attempts : 6);
return b > DISC_BACKOFF_CAP ? DISC_BACKOFF_CAP : b;
}
/* Parse exactly "vm-<digits>-ram" -> vmid; 0 if it does not match. */
static uint32_t parse_vmid(const char* name) {
if (strncmp(name, "vm-", 3) != 0) return 0;
const char* p = name + 3;
if (*p < '0' || *p > '9') return 0;
uint64_t v = 0;
while (*p >= '0' && *p <= '9') { v = v * 10 + (uint64_t)(*p - '0'); p++; if (v > 0xFFFFFFFFull) return 0; }
if (strcmp(p, "-ram") != 0) return 0;
return (uint32_t)v;
}
static cand_ent* cand_find(vmsig_discovery* d, uint32_t vmid) {
for (int i = 0; i < VMSIG_SLOT_COUNT; i++)
if (d->cand[i].state != CAND_FREE && d->cand[i].vmid == vmid) return &d->cand[i];
return NULL;
}
static cand_ent* cand_alloc(vmsig_discovery* d, uint32_t vmid) {
cand_ent* e = cand_find(d, vmid);
if (e) return e;
for (int i = 0; i < VMSIG_SLOT_COUNT; i++)
if (d->cand[i].state == CAND_FREE) {
memset(&d->cand[i], 0, sizeof d->cand[i]);
d->cand[i].vmid = vmid; d->cand[i].endpoint = -1;
return &d->cand[i];
}
return NULL; /* 64-candidate ceiling */
}
/* Arm the retry timer to the soonest pending probe, or disarm if none pending. */
static void rearm_timer(vmsig_discovery* d) {
uint64_t soonest = 0; int any = 0;
for (int i = 0; i < VMSIG_SLOT_COUNT; i++)
if (d->cand[i].state == CAND_PROBING && d->cand[i].next_probe_ns) {
if (!any || d->cand[i].next_probe_ns < soonest) soonest = d->cand[i].next_probe_ns;
any = 1;
}
struct itimerspec its;
memset(&its, 0, sizeof its);
if (any) {
uint64_t now = now_ns();
uint64_t dt = soonest > now ? soonest - now : 1000000ull; /* >=1ms */
its.it_value.tv_sec = (time_t)(dt / 1000000000ull);
its.it_value.tv_nsec = (long)(dt % 1000000000ull);
}
timerfd_settime(d->tfd, 0, &its, NULL); /* it_value 0 => disarm */
}
static void publish_roster(vmsig_discovery* d, uint32_t ep, uint32_t vmid, uint32_t state,
uint32_t action, const char* name) {
vmsig_roster r;
memset(&r, 0, sizeof r);
r.vmid = vmid; r.state = state; r.action = action;
if (name) {
size_t n = strlen(name);
if (n >= VMSIG_ROSTER_NAME_MAX) { n = VMSIG_ROSTER_NAME_MAX - 1; r.flags |= VMSIG_ROSTER_NAME_TRUNC; }
memcpy(r.name, name, n);
}
core_roster_publish(d->core, ep, &r);
}
static void cand_drop(cand_ent* c) {
c->state = CAND_FREE; c->vmid = 0; c->endpoint = -1; c->attempts = 0; c->next_probe_ns = 0;
}
static void do_attach(vmsig_discovery* d, cand_ent* c) {
int ep = slot_alloc(&d->slots, c->vmid);
if (ep < 0) {
fprintf(stderr, "vmsig discovery: no free endpoint for vmid %u (64-VM ceiling)\n", c->vmid);
cand_drop(c);
return;
}
d->ep_facts[ep] = c->facts; /* stable home for cfg strings the adapters keep */
if (d->sink.attach(d->sink.ud, d->core, c->vmid, (uint32_t)ep, &d->ep_facts[ep]) != 0) {
slot_free(&d->slots, c->vmid);
fprintf(stderr, "vmsig discovery: attach failed for vmid %u\n", c->vmid);
cand_drop(c);
return;
}
c->state = CAND_ATTACHED; c->endpoint = ep;
publish_roster(d, (uint32_t)ep, c->vmid, (uint32_t)c->facts.vm_state, VMSIG_ROSTER_ATTACH,
c->facts.name);
if (d->persist) slot_save(&d->slots, d->slots_path);
}
static void do_detach(vmsig_discovery* d, cand_ent* c) {
int ep = c->endpoint;
if (ep >= 0) {
publish_roster(d, (uint32_t)ep, c->vmid, VMSIG_VM_SHUTDOWN, VMSIG_ROSTER_DETACH,
c->facts.name);
d->sink.detach(d->sink.ud, d->core, c->vmid, (uint32_t)ep); /* deferred teardown */
slot_free(&d->slots, c->vmid); /* bit vacated (ordered) */
if (d->persist) slot_save(&d->slots, d->slots_path);
/* ep_facts[ep] is intentionally NOT cleared: the deferred adapter reap still reads the
* cfg strings; it is overwritten on the next attach to this endpoint. */
}
cand_drop(c);
}
static void try_probe(vmsig_discovery* d, cand_ent* c) {
d->probe.config(&d->probe, c->vmid, &c->facts);
if (!c->facts.ok) { cand_drop(c); return; } /* not ours / no share=on */
d->probe.live(&d->probe, &c->facts);
if (c->facts.retry) {
if (++c->attempts > DISC_RETRY_MAX) {
fprintf(stderr, "vmsig discovery: vmid %u QMP never came up, giving up\n", c->vmid);
cand_drop(c);
return;
}
c->next_probe_ns = now_ns() + backoff_ns(c->attempts);
rearm_timer(d);
return;
}
if (!c->facts.ok) { cand_drop(c); return; } /* stale: file present, VM dead/unparsable */
do_attach(d, c);
}
static void on_file_appear(vmsig_discovery* d, uint32_t vmid) {
cand_ent* c = cand_alloc(d, vmid);
if (!c) { fprintf(stderr, "vmsig discovery: candidate table full, vmid %u ignored\n", vmid); return; }
if (c->state == CAND_ATTACHED) return; /* already live (duplicate event) */
if (c->state == CAND_FREE) { c->state = CAND_PROBING; c->attempts = 0; }
c->next_probe_ns = 0;
try_probe(d, c);
}
static void on_file_gone(vmsig_discovery* d, uint32_t vmid) {
cand_ent* c = cand_find(d, vmid);
if (!c) return;
if (c->state == CAND_ATTACHED) do_detach(d, c);
else cand_drop(c); /* was still probing */
}
/* ---- loop sources ------------------------------------------------------------ */
static void on_inotify(void* user, uint32_t events) {
(void)events;
vmsig_discovery* d = user;
char buf[4096] __attribute__((aligned(__alignof__(struct inotify_event))));
for (;;) {
ssize_t n = read(d->ifd, buf, sizeof buf);
if (n <= 0) { if (n < 0 && errno == EINTR) continue; break; }
for (char* p = buf; p < buf + n; ) {
struct inotify_event* ev = (struct inotify_event*)p;
if (ev->len) {
uint32_t vmid = parse_vmid(ev->name);
if (vmid) {
if (ev->mask & (IN_CREATE | IN_MOVED_TO | IN_CLOSE_WRITE)) on_file_appear(d, vmid);
else if (ev->mask & (IN_DELETE | IN_MOVED_FROM)) on_file_gone(d, vmid);
}
}
p += sizeof(struct inotify_event) + ev->len;
}
}
}
static void on_timer(void* user, uint32_t events) {
(void)events;
vmsig_discovery* d = user;
uint64_t v;
while (read(d->tfd, &v, sizeof v) == (ssize_t)sizeof v) { /* drain */ }
uint64_t now = now_ns();
for (int i = 0; i < VMSIG_SLOT_COUNT; i++) {
cand_ent* c = &d->cand[i];
if (c->state == CAND_PROBING && c->next_probe_ns && c->next_probe_ns <= now)
try_probe(d, c);
}
rearm_timer(d);
}
static void bootstrap_scan(vmsig_discovery* d) {
DIR* dir = opendir(d->watch_dir);
if (!dir) return;
struct dirent* de;
while ((de = readdir(dir)) != NULL) {
uint32_t vmid = parse_vmid(de->d_name);
if (vmid) on_file_appear(d, vmid);
}
closedir(dir);
/* GC persisted-but-not-live slots: a vmid bound in .slots with no live file (it died while
* the daemon was down) keeps its bit pinned; free it so the ceiling is not leaked. */
for (int e = 0; e < VMSIG_SLOT_COUNT; e++) {
uint32_t vmid = d->slots.ent[e].vmid;
if (!vmid) continue;
cand_ent* c = cand_find(d, vmid);
if (!c || c->state != CAND_ATTACHED) slot_free(&d->slots, vmid);
}
if (d->persist) slot_save(&d->slots, d->slots_path);
}
/* ---- default sink: wire the core adapter trio ------------------------------- */
static int default_attach(void* ud, vmsig_core* core, uint32_t vmid, uint32_t endpoint,
const vmsig_host_facts* f) {
(void)ud; (void)vmid;
vmsig_memctx_cfg mc; memset(&mc, 0, sizeof mc);
mc.stub = 0; mc.ram_path = f->ram_path; mc.low = f->low; mc.ro_fd = -1;
vmsig_vmhost_cfg vh; memset(&vh, 0, sizeof vh);
vh.stub = 0; vh.qmp_path = f->qmp_path;
vmsig_input_cfg in; memset(&in, 0, sizeof in);
in.stub = 0; in.qmp_path = NULL; /* input is uinput; power/lifecycle via the vmhost seam */
if (vmsig_core_add_adapter(core, vmsig_memctx_ops(), &mc, endpoint) < 0) goto fail;
if (vmsig_core_add_adapter(core, vmsig_vmhost_ops(), &vh, endpoint) < 0) goto fail;
if (vmsig_core_add_adapter(core, vmsig_input_ops(), &in, endpoint) < 0) goto fail;
return 0;
fail:
vmsig_core_detach_endpoint(core, endpoint); /* roll back any partial trio (deferred) */
return -1;
}
static void default_detach(void* ud, vmsig_core* core, uint32_t vmid, uint32_t endpoint) {
(void)ud; (void)vmid;
vmsig_core_detach_endpoint(core, endpoint);
}
/* ---- lifecycle --------------------------------------------------------------- */
void vmsig_discovery_free(void* user) {
vmsig_discovery* d = user;
if (!d) return;
if (d->ifd >= 0) close(d->ifd);
if (d->tfd >= 0) close(d->tfd);
free(d);
}
vmsig_discovery* vmsig_discovery_new(vmsig_core* core,
const char* watch_dir, const char* pve_conf,
const char* qmp_dir, const char* slots_path,
const vmsig_host_probe* probe,
const vmsig_discovery_sink* sink) {
if (!core || !watch_dir) return NULL;
vmsig_discovery* d = calloc(1, sizeof *d);
if (!d) return NULL;
d->core = core;
d->ifd = d->tfd = d->wd = -1;
snprintf(d->watch_dir, sizeof d->watch_dir, "%s", watch_dir);
if (slots_path && *slots_path) {
snprintf(d->slots_path, sizeof d->slots_path, "%s", slots_path);
d->persist = 1;
}
for (int i = 0; i < VMSIG_SLOT_COUNT; i++) d->cand[i].endpoint = -1;
if (probe) d->probe = *probe;
else d->probe = host_probe_proxmox(d->watch_dir,
pve_conf ? pve_conf : "/etc/pve/qemu-server",
qmp_dir ? qmp_dir : "/var/run/qemu-server");
if (sink) d->sink = *sink;
else { d->sink.attach = default_attach; d->sink.detach = default_detach; d->sink.ud = NULL; }
slot_load(&d->slots, d->persist ? d->slots_path : NULL);
d->ifd = inotify_init1(IN_NONBLOCK | IN_CLOEXEC);
if (d->ifd < 0) { vmsig_discovery_free(d); return NULL; }
d->wd = inotify_add_watch(d->ifd, d->watch_dir,
IN_CREATE | IN_MOVED_TO | IN_DELETE | IN_MOVED_FROM | IN_CLOSE_WRITE | IN_ONLYDIR);
/* a missing watch dir is not fatal: the dir may be created later; bootstrap finds nothing. */
d->tfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
if (d->tfd < 0) { vmsig_discovery_free(d); return NULL; }
/* The inotify source owns the discovery lifetime (on_free frees ifd+tfd+d); the timer
* source shares the handle with on_free=NULL. */
if (core_add_source(core, d->ifd, on_inotify, d, vmsig_discovery_free) != 0) {
vmsig_discovery_free(d); return NULL;
}
if (core_add_source(core, d->tfd, on_timer, d, NULL) != 0) {
/* ifd already enrolled with the on_free; closing here would double-free at core_free.
* Leave it to core_free to reap. Return NULL to signal partial failure is not clean. */
return NULL;
}
bootstrap_scan(d);
rearm_timer(d);
return d;
}
int vmsig_discovery_slot_of_vmid(vmsig_discovery* d, uint32_t vmid) {
if (!d) return -1;
return slot_lookup(&d->slots, vmid);
}
/* ---- TEST-ONLY hooks: drive the state machine deterministically (no inotify/timer) ---- */
void vmsig_discovery_feed(vmsig_discovery* d, uint32_t vmid, int present) {
if (present) on_file_appear(d, vmid); else on_file_gone(d, vmid);
}
void vmsig_discovery_tick(vmsig_discovery* d) { /* force a re-probe of every probing candidate */
for (int i = 0; i < VMSIG_SLOT_COUNT; i++)
if (d->cand[i].state == CAND_PROBING) try_probe(d, &d->cand[i]);
}
+46
View File
@@ -0,0 +1,46 @@
#ifndef VMSIG_DISCOVERY_H
#define VMSIG_DISCOVERY_H
#include "vmsig_core.h"
#include "host_probe.h"
/* discovery.h — runtime VM discovery (private to the discovery module).
*
* Watches a tmpfs trigger dir for "vm-<vmid>-ram" files, corroborates each candidate via the
* host-probe seam, assigns a stable endpoint slot, hot-plugs the VM (sink), and publishes the
* roster. The state machine + slot allocation are decoupled from actuation by a sink seam, so
* the orchestration is unit-testable without armed adapters. */
typedef struct vmsig_discovery vmsig_discovery;
/* Actuation seam: bring a discovered VM up / tear it down. Default (NULL) wires the core
* adapter trio (memctx+vmhost+input via vmsig_core_add_adapter) and detach_endpoint. A test
* injects a recording sink to verify the state machine without real adapters. Roster publish
* is owned by discovery (not the sink): ATTACH after a successful attach, DETACH before tear-down. */
typedef struct {
int (*attach)(void* ud, vmsig_core* core, uint32_t vmid, uint32_t endpoint,
const vmsig_host_facts* f); /* 0 = up, -1 = failed (slot freed) */
void (*detach)(void* ud, vmsig_core* core, uint32_t vmid, uint32_t endpoint);
void* ud;
} vmsig_discovery_sink;
/* Create discovery over `core`. `watch_dir` (e.g. /dev/shm/vmsig) is scanned once and
* inotify-watched. `probe` NULL => default Proxmox probe over (watch_dir, pve_conf, qmp_dir);
* `sink` NULL => default core trio; `slots_path` NULL => no persistence. Registers the inotify
* + retry-timer loop sources and runs a bootstrap scan. The core owns the lifetime (freed at
* vmsig_core_free via the source on_free). NULL on error. */
vmsig_discovery* vmsig_discovery_new(vmsig_core* core,
const char* watch_dir, const char* pve_conf,
const char* qmp_dir, const char* slots_path,
const vmsig_host_probe* probe,
const vmsig_discovery_sink* sink);
/* Resolve vmid -> endpoint for the admission policy (WS4); -1 if not currently attached. */
int vmsig_discovery_slot_of_vmid(vmsig_discovery* d, uint32_t vmid);
/* TEST-ONLY: drive a file appear(present=1)/gone(present=0) directly, bypassing inotify; and
* force a re-probe of every probing candidate, bypassing the retry timer. Lets the state
* machine be unit-tested deterministically without threads/timers. */
void vmsig_discovery_feed(vmsig_discovery* d, uint32_t vmid, int present);
void vmsig_discovery_tick(vmsig_discovery* d);
#endif /* VMSIG_DISCOVERY_H */
+48
View File
@@ -0,0 +1,48 @@
#ifndef VMSIG_HOST_PROBE_H
#define VMSIG_HOST_PROBE_H
#include <stdint.h>
/* host_probe.h — the platform-coupled discovery seam (private to the discovery module).
*
* This is the ONLY surface that knows the host's config convention (/etc/pve/qemu-server),
* the QMP socket path convention, and the `info mtree` text. It produces a NEUTRAL facts
* struct; discovery.c consumes ONLY that and never names a path convention. A non-Proxmox
* host (or a unit test) injects its own vmsig_host_probe with the same two-stage contract. */
#define VMSIG_HF_NAME_MAX 32
#define VMSIG_HF_PATH_MAX 128
typedef struct {
uint32_t vmid;
char name[VMSIG_HF_NAME_MAX]; /* host VM name (truncated) */
char ram_path[VMSIG_HF_PATH_MAX]; /* guest-RAM backing file (the trigger) */
char qmp_path[VMSIG_HF_PATH_MAX]; /* QMP socket ('@' prefix => abstract) */
uint64_t cfg_ram_bytes; /* RAM size from host config (sanity) */
uint64_t low; /* below-4G split (memctx locator); 0=unknown */
int vm_state; /* VMSIG_VM_* from the liveness oracle */
int share_on; /* memory-backend share=on verified */
int ok; /* 1 => all fail-closed gates passed (attach) */
int retry; /* 1 => transient (QMP not up yet) — back off */
} vmsig_host_facts;
/* Two-stage probe. Stage 1 reads host config (cheap, local). Stage 2 corroborates liveness
* and derives `low` (QMP round-trip, bounded). Splitting them lets the state machine treat
* "config error" (permanent, drop) apart from "QMP not up yet" (transient, retry). */
typedef struct vmsig_host_probe {
/* Populate paths + name + cfg_ram_bytes + share_on from host config; stat the RAM file.
* Sets out->ok=0 on any permanent gate failure (no share=on, missing/oversized file).
* Returns 0 when `out` was populated, -1 on a usage error. */
int (*config)(const struct vmsig_host_probe* p, uint32_t vmid, vmsig_host_facts* out);
/* Corroborate liveness + derive `low` via QMP. Mutates `io`: sets vm_state, low, ok; or
* retry=1 (QMP not reachable yet) / ok=0 (stale: file present but VM dead / unparsable). */
int (*live)(const struct vmsig_host_probe* p, vmsig_host_facts* io);
void* ud; /* implementation-private */
} vmsig_host_probe;
/* The default Proxmox probe over (watch_dir, pve_conf). `qmp_dir` is the QMP socket dir
* (Proxmox: /var/run/qemu-server, socket "<qmp_dir>/<vmid>.qmp"). The returned struct
* references the path strings by pointer — the caller keeps them alive. */
vmsig_host_probe host_probe_proxmox(const char* watch_dir, const char* pve_conf,
const char* qmp_dir);
#endif /* VMSIG_HOST_PROBE_H */
+49
View File
@@ -0,0 +1,49 @@
#ifndef VMSIG_SLOT_H
#define VMSIG_SLOT_H
#include <stdint.h>
/* slot.h — vmid <-> endpoint allocator (private to the discovery module).
*
* The signaling core addresses VMs by an ENDPOINT bit in a 64-bit mask (endpoint < 64). A
* Proxmox vmid (100..1e9) does NOT fit 6 bits, so the binding is a PINNED table, not a pure
* function: a vmid keeps the SAME endpoint across VM restarts (so a control's endpoint_mask
* stays coherent), and the table is persisted so a daemon restart re-derives the same map.
*
* Bit reuse is a coherence event, not a silent alias: a freed bit is handed to a DIFFERENT
* vmid only AFTER the roster DETACH for the old occupant has been published. The discovery
* loop is single-threaded and publishes DETACH synchronously before any later attach, so the
* ordering itself enforces this — the allocator only needs to never double-assign a live bit. */
#define VMSIG_SLOT_COUNT 64
typedef struct {
uint32_t vmid; /* 0 => slot free */
} slot_ent;
typedef struct {
slot_ent ent[VMSIG_SLOT_COUNT];
uint64_t used_mask; /* mirror: bit e set <=> ent[e].vmid != 0 */
} slot_table;
/* Reset to all-free. */
void slot_init(slot_table* t);
/* Endpoint pinned to `vmid`, or -1 if `vmid` is not bound (or 0). */
int slot_lookup(const slot_table* t, uint32_t vmid);
/* Pin `vmid` to a stable endpoint. Idempotent: if `vmid` is already bound, returns its
* existing endpoint. Otherwise assigns the lowest free bit. Returns the endpoint [0,64),
* or -1 if `vmid`==0 or the table is full (the 64-VM ceiling). */
int slot_alloc(slot_table* t, uint32_t vmid);
/* Release the slot bound to `vmid` (no-op if not bound). */
void slot_free(slot_table* t, uint32_t vmid);
/* Persist the table to `path` atomically (tmp + rename), mode 0600. 0 / -1. */
int slot_save(const slot_table* t, const char* path);
/* Load the table from `path`. On a missing/corrupt file, initializes empty and returns 0
* (a fresh start is valid). -1 only on a hard error. */
int slot_load(slot_table* t, const char* path);
#endif /* VMSIG_SLOT_H */
+244
View File
@@ -0,0 +1,244 @@
/* host_probe.c — the default Proxmox host-probe (see host_probe.h). The ONLY TU that knows
* /etc/pve/qemu-server, the QMP socket path convention, and `info mtree`. Pure libc +
* AF_UNIX + files; no vmie/vmctl. config() is cheap+local; live() does a bounded blocking
* QMP round-trip (query-status + info mtree) and is fail-closed: anything it cannot confirm
* leaves ok=0 (the VM is not brought up rather than guessed). */
#define _GNU_SOURCE
#include "host_probe.h"
#include "vmsig_event.h" /* VMSIG_VM_* */
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <stdint.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <stddef.h>
#include <errno.h>
typedef struct {
const char* watch_dir; /* /dev/shm/vmsig */
const char* pve_conf; /* /etc/pve/qemu-server */
const char* qmp_dir; /* /var/run/qemu-server */
} hp_cfg;
/* ---- /etc/pve config (stage 1) ----------------------------------------------- */
/* Read a whole small file into a heap buffer (NUL-terminated). NULL on error/oversize. */
static char* read_file(const char* path, size_t cap) {
int fd = open(path, O_RDONLY | O_CLOEXEC);
if (fd < 0) return NULL;
char* buf = malloc(cap + 1);
if (!buf) { close(fd); return NULL; }
size_t got = 0;
for (;;) {
ssize_t n = read(fd, buf + got, cap - got);
if (n < 0) { if (errno == EINTR) continue; free(buf); close(fd); return NULL; }
if (n == 0) break;
got += (size_t)n;
if (got >= cap) break;
}
close(fd);
buf[got] = 0;
return buf;
}
/* Value of a top-level "key:" line (Proxmox ini), copied trimmed into out. 1 if found. */
static int conf_val(const char* conf, const char* key, char* out, size_t cap) {
size_t klen = strlen(key);
const char* p = conf;
while (p && *p) {
const char* line = p;
const char* nl = strchr(p, '\n');
size_t llen = nl ? (size_t)(nl - line) : strlen(line);
if (llen > klen && strncmp(line, key, klen) == 0 && line[klen] == ':') {
const char* v = line + klen + 1;
while (*v == ' ' || *v == '\t') v++;
size_t vlen = (size_t)((line + llen) - v);
while (vlen && (v[vlen-1] == ' ' || v[vlen-1] == '\t' || v[vlen-1] == '\r')) vlen--;
if (vlen >= cap) vlen = cap - 1;
memcpy(out, v, vlen); out[vlen] = 0;
return 1;
}
p = nl ? nl + 1 : NULL;
}
return 0;
}
static int hp_config(const struct vmsig_host_probe* p, uint32_t vmid, vmsig_host_facts* out) {
const hp_cfg* c = p->ud;
memset(out, 0, sizeof *out);
out->vmid = vmid;
snprintf(out->ram_path, sizeof out->ram_path, "%s/vm-%u-ram", c->watch_dir, vmid);
snprintf(out->qmp_path, sizeof out->qmp_path, "%s/%u.qmp", c->qmp_dir, vmid);
char path[VMSIG_HF_PATH_MAX + 32];
snprintf(path, sizeof path, "%s/%u.conf", c->pve_conf, vmid);
char* conf = read_file(path, 64 * 1024);
if (!conf) { out->ok = 0; return 0; } /* no host config => not a known VM */
char tmp[VMSIG_HF_NAME_MAX];
if (conf_val(conf, "name", out->name, sizeof out->name) == 0)
snprintf(out->name, sizeof out->name, "vm-%u", vmid);
if (conf_val(conf, "memory", tmp, sizeof tmp))
out->cfg_ram_bytes = (uint64_t)strtoull(tmp, NULL, 10) * 1024ull * 1024ull;
/* share=on is mandatory: without it the host mmap is a private copy, not guest RAM. */
out->share_on = (strstr(conf, "share=on") != NULL) ? 1 : 0;
free(conf);
out->ok = out->share_on ? 1 : 0; /* config-level pass; liveness is stage 2 */
return 0;
}
/* ---- QMP liveness + mtree low (stage 2) -------------------------------------- */
static int qmp_connect(const char* path) {
int fd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0);
if (fd < 0) return -1;
struct timeval tv = { .tv_sec = 0, .tv_usec = 250000 }; /* 250ms bound on each recv */
setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof tv);
setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof tv);
struct sockaddr_un a;
memset(&a, 0, sizeof a);
a.sun_family = AF_UNIX;
size_t n = strlen(path);
socklen_t alen;
if (path[0] == '@') { /* abstract namespace */
if (n > sizeof a.sun_path) { close(fd); return -1; }
a.sun_path[0] = 0;
memcpy(a.sun_path + 1, path + 1, n - 1);
alen = (socklen_t)(offsetof(struct sockaddr_un, sun_path) + n);
} else {
if (n >= sizeof a.sun_path) { close(fd); return -1; }
memcpy(a.sun_path, path, n);
alen = (socklen_t)(offsetof(struct sockaddr_un, sun_path) + n + 1);
}
if (connect(fd, (struct sockaddr*)&a, alen) < 0) { close(fd); return -1; }
return fd;
}
/* Read ONE '\n'-terminated QMP message into buf (QMP frames each JSON object on a line;
* an HMP string return keeps its newlines escaped, so it is still a single line). 1 / 0 / -1. */
static int qmp_read_line(int fd, char* buf, size_t cap, size_t* out_len) {
size_t got = 0;
while (got + 1 < cap) {
ssize_t r = read(fd, buf + got, cap - 1 - got);
if (r < 0) { if (errno == EINTR) continue; return -1; } /* timeout/error */
if (r == 0) return (got > 0) ? 1 : 0;
got += (size_t)r;
char* nl = memchr(buf, '\n', got);
if (nl) { *out_len = got; buf[got] = 0; return 1; }
}
*out_len = got; buf[got] = 0;
return 1; /* line longer than cap: truncated but usable for our scans */
}
/* Read messages until one carries "return"/"error", skipping async "event"s. 1 if a return,
* 0 if an error/closed, -1 on transport error. The matched message is left in buf. */
static int qmp_await_return(int fd, char* buf, size_t cap) {
for (int i = 0; i < 64; i++) {
size_t len = 0;
int r = qmp_read_line(fd, buf, cap, &len);
if (r <= 0) return r;
if (strstr(buf, "\"error\"")) return 0;
if (strstr(buf, "\"return\"")) return 1;
/* greeting {"QMP":...} or async {"event":...} -> keep reading */
}
return -1;
}
static int qmp_cmd(int fd, const char* json, char* buf, size_t cap) {
size_t n = strlen(json);
if (write(fd, json, n) != (ssize_t)n) return -1;
return qmp_await_return(fd, buf, cap);
}
/* Map a QEMU query-status "status" word to VMSIG_VM_*. Alive = running|paused. */
static int qmp_status_word(const char* buf) {
const char* s = strstr(buf, "\"status\"");
if (!s) return VMSIG_VM_UNKNOWN;
s = strchr(s, ':'); if (!s) return VMSIG_VM_UNKNOWN;
s = strchr(s, '"'); if (!s) return VMSIG_VM_UNKNOWN;
s++;
if (!strncmp(s, "running", 7)) return VMSIG_VM_RUNNING;
if (!strncmp(s, "paused", 6)) return VMSIG_VM_PAUSED;
if (!strncmp(s, "prelaunch", 9)) return VMSIG_VM_PAUSED;
if (!strncmp(s, "shutdown", 8)) return VMSIG_VM_SHUTDOWN;
if (!strncmp(s, "guest-panicked", 14) || !strncmp(s, "internal-error", 14))
return VMSIG_VM_CRASHED;
return VMSIG_VM_UNKNOWN;
}
/* Derive the below-4G split from `info mtree` text: the size of the RAM region whose guest
* physical range starts at address 0. Standard QEMU split-RAM layout puts low RAM at
* [0, low) and high RAM above 4G at file offset @low. FAIL-CLOSED: 0 if not found.
* NOTE: parses HMP text (not a stable QMP schema) — verify against real `info mtree` output. */
static uint64_t mtree_low(const char* ret) {
/* The return is a JSON string; lines inside are escaped "\n". Scan for the GPA-0 ram run:
* " 0000000000000000-<end16> (prio N, ram): ..." */
const char* p = ret;
while ((p = strstr(p, "0000000000000000-")) != NULL) {
const char* end_hex = p + 17; /* 16 zeros + '-' */
char* stop = NULL;
unsigned long long end = strtoull(end_hex, &stop, 16);
/* the descriptor after the range must mark it RAM (not the i/o "system" root) */
const char* tail = stop ? stop : end_hex;
const char* nl = strstr(tail, "\\n");
const char* lim = nl ? nl : (tail + 64);
int is_ram = 0;
for (const char* q = tail; q < lim && *q; q++)
if (!strncmp(q, "ram)", 4)) { is_ram = 1; break; }
if (is_ram && end > 0 && end != ~0ull) return end + 1ull; /* [0, end] => low=end+1 */
p = end_hex;
}
return 0;
}
static int hp_live(const struct vmsig_host_probe* p, vmsig_host_facts* io) {
(void)p;
io->retry = 0;
int fd = qmp_connect(io->qmp_path);
if (fd < 0) { io->retry = 1; io->ok = 0; return 0; } /* QMP not up yet => transient */
char* buf = malloc(256 * 1024);
if (!buf) { close(fd); io->retry = 1; io->ok = 0; return 0; }
int alive = 0;
if (qmp_cmd(fd, "{\"execute\":\"qmp_capabilities\"}\n", buf, 256 * 1024) == 1 &&
qmp_cmd(fd, "{\"execute\":\"query-status\"}\n", buf, 256 * 1024) == 1) {
io->vm_state = qmp_status_word(buf);
alive = (io->vm_state == VMSIG_VM_RUNNING || io->vm_state == VMSIG_VM_PAUSED);
} else {
io->retry = 1; /* handshake failed mid-way => transient */
}
if (alive) {
if (qmp_cmd(fd,
"{\"execute\":\"human-monitor-command\","
"\"arguments\":{\"command-line\":\"info mtree -f\"}}\n", buf, 256 * 1024) == 1) {
io->low = mtree_low(buf);
}
}
free(buf);
close(fd);
/* fail-closed: alive AND a parsed split => bring up; else not (stale / unparsable). */
io->ok = (alive && io->low != 0) ? 1 : 0;
return 0;
}
vmsig_host_probe host_probe_proxmox(const char* watch_dir, const char* pve_conf,
const char* qmp_dir) {
static hp_cfg cfg; /* single daemon-wide probe; paths are process-lifetime strings */
cfg.watch_dir = watch_dir;
cfg.pve_conf = pve_conf;
cfg.qmp_dir = qmp_dir;
vmsig_host_probe p = { hp_config, hp_live, &cfg };
return p;
}
+91
View File
@@ -0,0 +1,91 @@
/* slot.c — vmid <-> endpoint allocator (see slot.h). Pure logic + a tiny pointer-free
* on-disk format; no core dependency. */
#define _GNU_SOURCE
#include "slot.h"
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/stat.h>
void slot_init(slot_table* t) {
memset(t, 0, sizeof *t);
}
int slot_lookup(const slot_table* t, uint32_t vmid) {
if (!vmid) return -1;
for (int e = 0; e < VMSIG_SLOT_COUNT; e++)
if (t->ent[e].vmid == vmid) return e;
return -1;
}
int slot_alloc(slot_table* t, uint32_t vmid) {
if (!vmid) return -1;
int e = slot_lookup(t, vmid);
if (e >= 0) return e; /* idempotent pin */
/* lowest free bit: ffsll of the complement (1-based; 0 => none free) */
int b = __builtin_ffsll((long long)~t->used_mask);
if (b == 0) return -1; /* table full (64-VM ceiling) */
e = b - 1;
t->ent[e].vmid = vmid;
t->used_mask |= (1ull << e);
return e;
}
void slot_free(slot_table* t, uint32_t vmid) {
int e = slot_lookup(t, vmid);
if (e < 0) return;
t->ent[e].vmid = 0;
t->used_mask &= ~(1ull << e);
}
/* ---- persistence: magic + version + 64 * uint32 vmid (native byte order, tmpfs-local) ---- */
#define SLOT_MAGIC 0x534C4F54u /* "SLOT" */
#define SLOT_VERSION 1u
typedef struct {
uint32_t magic;
uint32_t version;
uint32_t vmid[VMSIG_SLOT_COUNT];
} slot_blob;
int slot_save(const slot_table* t, const char* path) {
if (!path) return -1;
slot_blob b;
memset(&b, 0, sizeof b);
b.magic = SLOT_MAGIC; b.version = SLOT_VERSION;
for (int e = 0; e < VMSIG_SLOT_COUNT; e++) b.vmid[e] = t->ent[e].vmid;
char tmp[512];
int n = snprintf(tmp, sizeof tmp, "%s.tmp", path);
if (n < 0 || (size_t)n >= sizeof tmp) return -1;
int fd = open(tmp, O_WRONLY | O_CREAT | O_TRUNC | O_CLOEXEC, 0600);
if (fd < 0) return -1;
ssize_t w = write(fd, &b, sizeof b);
int rc = (w == (ssize_t)sizeof b) ? 0 : -1;
if (close(fd) != 0) rc = -1;
if (rc == 0 && rename(tmp, path) != 0) rc = -1;
if (rc != 0) unlink(tmp);
return rc;
}
int slot_load(slot_table* t, const char* path) {
slot_init(t);
if (!path) return 0;
int fd = open(path, O_RDONLY | O_CLOEXEC);
if (fd < 0) return 0; /* no file => fresh start (valid) */
slot_blob b;
ssize_t r = read(fd, &b, sizeof b);
close(fd);
if (r != (ssize_t)sizeof b || b.magic != SLOT_MAGIC || b.version != SLOT_VERSION) {
slot_init(t); /* corrupt/old => fresh start */
return 0;
}
for (int e = 0; e < VMSIG_SLOT_COUNT; e++) {
t->ent[e].vmid = b.vmid[e];
if (b.vmid[e]) t->used_mask |= (1ull << e);
}
return 0;
}