Files
vatrog-vm-signaling/src/discovery/discovery.c
T
lirent 9bde398b6c vmsig: management daemon, runtime endpoint lifecycle, roster, discovery, in-tree drivers, packaging
- core: runtime attach/detach of a per-endpoint adapter trio (runtime-safe add_adapter + vmsig_core_detach_endpoint, deferred reap)
- roster: VMSIG_EV_ROSTER + CAP_ROSTER, retained per-endpoint and replayed to late subscribers
- discovery: inotify trigger dir, vmid/endpoint slot allocator, host probe; vmsigd daemon with config + per-uid admission
- input driver and vgpu perception built in-tree; vgpu perception as a separate library
- memctx: own the supplied ro_fd (closed at detach)
- deb packaging: install rules, systemd unit, tmpfiles, default config
2026-06-22 17:25:06 +03:00

364 lines
15 KiB
C

/* discovery.c — runtime VM discovery state machine (see discovery.h).
*
* Single-threaded on the loop thread (inotify + timer sources via core_add_source). On a
* "vm-<vmid>-ram" file appearing it corroborates the candidate (host-probe seam), assigns a
* stable endpoint slot, hot-plugs the trio (sink), and publishes the roster; on the file
* disappearing it tears the endpoint down and publishes a roster DETACH. QMP-not-up-yet is a
* transient retry driven by a timerfd (no busy-wait); config errors / stale files drop. */
#define _GNU_SOURCE
#include "discovery.h"
#include "slot.h"
#include "core_internal.h" /* core_roster_publish */
#include "memctx.h" /* vmsig_memctx_cfg */
#include "vmhost.h" /* vmsig_vmhost_cfg */
#include "input.h" /* vmsig_input_cfg */
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <stdint.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <time.h>
#include <dirent.h>
#include <sys/inotify.h>
#include <sys/timerfd.h>
#define DISC_PATH_MAX 256
#define DISC_RETRY_MAX 40 /* give up after ~tens of seconds of QMP-not-up */
#define DISC_BACKOFF_BASE 50000000ull /* 50 ms */
#define DISC_BACKOFF_CAP 2000000000ull /* 2 s */
typedef enum { CAND_FREE = 0, CAND_PROBING, CAND_ATTACHED } cand_state;
typedef struct {
cand_state state;
uint32_t vmid;
int endpoint; /* -1 until attached */
int attempts;
uint64_t next_probe_ns; /* monotonic deadline for the next retry */
vmsig_host_facts facts; /* probe working copy */
} cand_ent;
struct vmsig_discovery {
vmsig_core* core;
char watch_dir[DISC_PATH_MAX];
char slots_path[DISC_PATH_MAX];
int persist;
vmsig_host_probe probe;
vmsig_discovery_sink sink;
int ifd; /* inotify */
int wd;
int tfd; /* retry timerfd */
slot_table slots;
cand_ent cand[VMSIG_SLOT_COUNT];
/* Stable per-endpoint home for the adapter cfg strings (ram_path/qmp_path): the adapters
* keep pointers, and detach is deferred, so this must outlive the candidate. Overwritten
* only on the NEXT attach to the endpoint, which never races a still-open prior adapter. */
vmsig_host_facts ep_facts[VMSIG_SLOT_COUNT];
};
static uint64_t now_ns(void) {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return (uint64_t)ts.tv_sec * 1000000000ull + (uint64_t)ts.tv_nsec;
}
static uint64_t backoff_ns(int attempts) {
uint64_t b = DISC_BACKOFF_BASE << (attempts < 6 ? attempts : 6);
return b > DISC_BACKOFF_CAP ? DISC_BACKOFF_CAP : b;
}
/* Parse exactly "vm-<digits>-ram" -> vmid; 0 if it does not match. */
static uint32_t parse_vmid(const char* name) {
if (strncmp(name, "vm-", 3) != 0) return 0;
const char* p = name + 3;
if (*p < '0' || *p > '9') return 0;
uint64_t v = 0;
while (*p >= '0' && *p <= '9') { v = v * 10 + (uint64_t)(*p - '0'); p++; if (v > 0xFFFFFFFFull) return 0; }
if (strcmp(p, "-ram") != 0) return 0;
return (uint32_t)v;
}
static cand_ent* cand_find(vmsig_discovery* d, uint32_t vmid) {
for (int i = 0; i < VMSIG_SLOT_COUNT; i++)
if (d->cand[i].state != CAND_FREE && d->cand[i].vmid == vmid) return &d->cand[i];
return NULL;
}
static cand_ent* cand_alloc(vmsig_discovery* d, uint32_t vmid) {
cand_ent* e = cand_find(d, vmid);
if (e) return e;
for (int i = 0; i < VMSIG_SLOT_COUNT; i++)
if (d->cand[i].state == CAND_FREE) {
memset(&d->cand[i], 0, sizeof d->cand[i]);
d->cand[i].vmid = vmid; d->cand[i].endpoint = -1;
return &d->cand[i];
}
return NULL; /* 64-candidate ceiling */
}
/* Arm the retry timer to the soonest pending probe, or disarm if none pending. */
static void rearm_timer(vmsig_discovery* d) {
uint64_t soonest = 0; int any = 0;
for (int i = 0; i < VMSIG_SLOT_COUNT; i++)
if (d->cand[i].state == CAND_PROBING && d->cand[i].next_probe_ns) {
if (!any || d->cand[i].next_probe_ns < soonest) soonest = d->cand[i].next_probe_ns;
any = 1;
}
struct itimerspec its;
memset(&its, 0, sizeof its);
if (any) {
uint64_t now = now_ns();
uint64_t dt = soonest > now ? soonest - now : 1000000ull; /* >=1ms */
its.it_value.tv_sec = (time_t)(dt / 1000000000ull);
its.it_value.tv_nsec = (long)(dt % 1000000000ull);
}
timerfd_settime(d->tfd, 0, &its, NULL); /* it_value 0 => disarm */
}
static void publish_roster(vmsig_discovery* d, uint32_t ep, uint32_t vmid, uint32_t state,
uint32_t action, const char* name) {
vmsig_roster r;
memset(&r, 0, sizeof r);
r.vmid = vmid; r.state = state; r.action = action;
if (name) {
size_t n = strlen(name);
if (n >= VMSIG_ROSTER_NAME_MAX) { n = VMSIG_ROSTER_NAME_MAX - 1; r.flags |= VMSIG_ROSTER_NAME_TRUNC; }
memcpy(r.name, name, n);
}
core_roster_publish(d->core, ep, &r);
}
static void cand_drop(cand_ent* c) {
c->state = CAND_FREE; c->vmid = 0; c->endpoint = -1; c->attempts = 0; c->next_probe_ns = 0;
}
static void do_attach(vmsig_discovery* d, cand_ent* c) {
int ep = slot_alloc(&d->slots, c->vmid);
if (ep < 0) {
fprintf(stderr, "vmsig discovery: no free endpoint for vmid %u (64-VM ceiling)\n", c->vmid);
cand_drop(c);
return;
}
d->ep_facts[ep] = c->facts; /* stable home for cfg strings the adapters keep */
if (d->sink.attach(d->sink.ud, d->core, c->vmid, (uint32_t)ep, &d->ep_facts[ep]) != 0) {
slot_free(&d->slots, c->vmid);
fprintf(stderr, "vmsig discovery: attach failed for vmid %u\n", c->vmid);
cand_drop(c);
return;
}
c->state = CAND_ATTACHED; c->endpoint = ep;
publish_roster(d, (uint32_t)ep, c->vmid, (uint32_t)c->facts.vm_state, VMSIG_ROSTER_ATTACH,
c->facts.name);
if (d->persist) slot_save(&d->slots, d->slots_path);
}
static void do_detach(vmsig_discovery* d, cand_ent* c) {
int ep = c->endpoint;
if (ep >= 0) {
publish_roster(d, (uint32_t)ep, c->vmid, VMSIG_VM_SHUTDOWN, VMSIG_ROSTER_DETACH,
c->facts.name);
d->sink.detach(d->sink.ud, d->core, c->vmid, (uint32_t)ep); /* deferred teardown */
slot_free(&d->slots, c->vmid); /* bit vacated (ordered) */
if (d->persist) slot_save(&d->slots, d->slots_path);
/* ep_facts[ep] is intentionally NOT cleared: the deferred adapter reap still reads the
* cfg strings; it is overwritten on the next attach to this endpoint. */
}
cand_drop(c);
}
static void try_probe(vmsig_discovery* d, cand_ent* c) {
d->probe.config(&d->probe, c->vmid, &c->facts);
if (!c->facts.ok) { cand_drop(c); return; } /* not ours / no share=on */
d->probe.live(&d->probe, &c->facts);
if (c->facts.retry) {
if (++c->attempts > DISC_RETRY_MAX) {
fprintf(stderr, "vmsig discovery: vmid %u QMP never came up, giving up\n", c->vmid);
cand_drop(c);
return;
}
c->next_probe_ns = now_ns() + backoff_ns(c->attempts);
rearm_timer(d);
return;
}
if (!c->facts.ok) { cand_drop(c); return; } /* stale: file present, VM dead/unparsable */
do_attach(d, c);
}
static void on_file_appear(vmsig_discovery* d, uint32_t vmid) {
cand_ent* c = cand_alloc(d, vmid);
if (!c) { fprintf(stderr, "vmsig discovery: candidate table full, vmid %u ignored\n", vmid); return; }
if (c->state == CAND_ATTACHED) return; /* already live (duplicate event) */
if (c->state == CAND_FREE) { c->state = CAND_PROBING; c->attempts = 0; }
c->next_probe_ns = 0;
try_probe(d, c);
}
static void on_file_gone(vmsig_discovery* d, uint32_t vmid) {
cand_ent* c = cand_find(d, vmid);
if (!c) return;
if (c->state == CAND_ATTACHED) do_detach(d, c);
else cand_drop(c); /* was still probing */
}
/* ---- loop sources ------------------------------------------------------------ */
static void on_inotify(void* user, uint32_t events) {
(void)events;
vmsig_discovery* d = user;
char buf[4096] __attribute__((aligned(__alignof__(struct inotify_event))));
for (;;) {
ssize_t n = read(d->ifd, buf, sizeof buf);
if (n <= 0) { if (n < 0 && errno == EINTR) continue; break; }
for (char* p = buf; p < buf + n; ) {
struct inotify_event* ev = (struct inotify_event*)p;
if (ev->len) {
uint32_t vmid = parse_vmid(ev->name);
if (vmid) {
if (ev->mask & (IN_CREATE | IN_MOVED_TO | IN_CLOSE_WRITE)) on_file_appear(d, vmid);
else if (ev->mask & (IN_DELETE | IN_MOVED_FROM)) on_file_gone(d, vmid);
}
}
p += sizeof(struct inotify_event) + ev->len;
}
}
}
static void on_timer(void* user, uint32_t events) {
(void)events;
vmsig_discovery* d = user;
uint64_t v;
while (read(d->tfd, &v, sizeof v) == (ssize_t)sizeof v) { /* drain */ }
uint64_t now = now_ns();
for (int i = 0; i < VMSIG_SLOT_COUNT; i++) {
cand_ent* c = &d->cand[i];
if (c->state == CAND_PROBING && c->next_probe_ns && c->next_probe_ns <= now)
try_probe(d, c);
}
rearm_timer(d);
}
static void bootstrap_scan(vmsig_discovery* d) {
DIR* dir = opendir(d->watch_dir);
if (!dir) return;
struct dirent* de;
while ((de = readdir(dir)) != NULL) {
uint32_t vmid = parse_vmid(de->d_name);
if (vmid) on_file_appear(d, vmid);
}
closedir(dir);
/* GC persisted-but-not-live slots: a vmid bound in .slots with no live file (it died while
* the daemon was down) keeps its bit pinned; free it so the ceiling is not leaked. */
for (int e = 0; e < VMSIG_SLOT_COUNT; e++) {
uint32_t vmid = d->slots.ent[e].vmid;
if (!vmid) continue;
cand_ent* c = cand_find(d, vmid);
if (!c || c->state != CAND_ATTACHED) slot_free(&d->slots, vmid);
}
if (d->persist) slot_save(&d->slots, d->slots_path);
}
/* ---- default sink: wire the core adapter trio ------------------------------- */
static int default_attach(void* ud, vmsig_core* core, uint32_t vmid, uint32_t endpoint,
const vmsig_host_facts* f) {
(void)ud; (void)vmid;
vmsig_memctx_cfg mc; memset(&mc, 0, sizeof mc);
mc.stub = 0; mc.ram_path = f->ram_path; mc.low = f->low; mc.ro_fd = -1;
vmsig_vmhost_cfg vh; memset(&vh, 0, sizeof vh);
vh.stub = 0; vh.qmp_path = f->qmp_path;
vmsig_input_cfg in; memset(&in, 0, sizeof in);
in.stub = 0; in.qmp_path = NULL; /* input is uinput; power/lifecycle via the vmhost seam */
if (vmsig_core_add_adapter(core, vmsig_memctx_ops(), &mc, endpoint) < 0) goto fail;
if (vmsig_core_add_adapter(core, vmsig_vmhost_ops(), &vh, endpoint) < 0) goto fail;
if (vmsig_core_add_adapter(core, vmsig_input_ops(), &in, endpoint) < 0) goto fail;
return 0;
fail:
vmsig_core_detach_endpoint(core, endpoint); /* roll back any partial trio (deferred) */
return -1;
}
static void default_detach(void* ud, vmsig_core* core, uint32_t vmid, uint32_t endpoint) {
(void)ud; (void)vmid;
vmsig_core_detach_endpoint(core, endpoint);
}
/* ---- lifecycle --------------------------------------------------------------- */
void vmsig_discovery_free(void* user) {
vmsig_discovery* d = user;
if (!d) return;
if (d->ifd >= 0) close(d->ifd);
if (d->tfd >= 0) close(d->tfd);
free(d);
}
vmsig_discovery* vmsig_discovery_new(vmsig_core* core,
const char* watch_dir, const char* pve_conf,
const char* qmp_dir, const char* slots_path,
const vmsig_host_probe* probe,
const vmsig_discovery_sink* sink) {
if (!core || !watch_dir) return NULL;
vmsig_discovery* d = calloc(1, sizeof *d);
if (!d) return NULL;
d->core = core;
d->ifd = d->tfd = d->wd = -1;
snprintf(d->watch_dir, sizeof d->watch_dir, "%s", watch_dir);
if (slots_path && *slots_path) {
snprintf(d->slots_path, sizeof d->slots_path, "%s", slots_path);
d->persist = 1;
}
for (int i = 0; i < VMSIG_SLOT_COUNT; i++) d->cand[i].endpoint = -1;
if (probe) d->probe = *probe;
else d->probe = host_probe_proxmox(d->watch_dir,
pve_conf ? pve_conf : "/etc/pve/qemu-server",
qmp_dir ? qmp_dir : "/var/run/qemu-server");
if (sink) d->sink = *sink;
else { d->sink.attach = default_attach; d->sink.detach = default_detach; d->sink.ud = NULL; }
slot_load(&d->slots, d->persist ? d->slots_path : NULL);
d->ifd = inotify_init1(IN_NONBLOCK | IN_CLOEXEC);
if (d->ifd < 0) { vmsig_discovery_free(d); return NULL; }
d->wd = inotify_add_watch(d->ifd, d->watch_dir,
IN_CREATE | IN_MOVED_TO | IN_DELETE | IN_MOVED_FROM | IN_CLOSE_WRITE | IN_ONLYDIR);
/* a missing watch dir is not fatal: the dir may be created later; bootstrap finds nothing. */
d->tfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
if (d->tfd < 0) { vmsig_discovery_free(d); return NULL; }
/* The inotify source owns the discovery lifetime (on_free frees ifd+tfd+d); the timer
* source shares the handle with on_free=NULL. */
if (core_add_source(core, d->ifd, on_inotify, d, vmsig_discovery_free) != 0) {
vmsig_discovery_free(d); return NULL;
}
if (core_add_source(core, d->tfd, on_timer, d, NULL) != 0) {
/* ifd already enrolled with the on_free; closing here would double-free at core_free.
* Leave it to core_free to reap. Return NULL to signal partial failure is not clean. */
return NULL;
}
bootstrap_scan(d);
rearm_timer(d);
return d;
}
int vmsig_discovery_slot_of_vmid(vmsig_discovery* d, uint32_t vmid) {
if (!d) return -1;
return slot_lookup(&d->slots, vmid);
}
/* ---- TEST-ONLY hooks: drive the state machine deterministically (no inotify/timer) ---- */
void vmsig_discovery_feed(vmsig_discovery* d, uint32_t vmid, int present) {
if (present) on_file_appear(d, vmid); else on_file_gone(d, vmid);
}
void vmsig_discovery_tick(vmsig_discovery* d) { /* force a re-probe of every probing candidate */
for (int i = 0; i < VMSIG_SLOT_COUNT; i++)
if (d->cand[i].state == CAND_PROBING) try_probe(d, &d->cand[i]);
}