mirror of
https://dev.lirent.ru/Vatrog/vm-automation-signaling.git
synced 2026-06-25 20:36:36 +03:00
9bde398b6c
- core: runtime attach/detach of a per-endpoint adapter trio (runtime-safe add_adapter + vmsig_core_detach_endpoint, deferred reap) - roster: VMSIG_EV_ROSTER + CAP_ROSTER, retained per-endpoint and replayed to late subscribers - discovery: inotify trigger dir, vmid/endpoint slot allocator, host probe; vmsigd daemon with config + per-uid admission - input driver and vgpu perception built in-tree; vgpu perception as a separate library - memctx: own the supplied ro_fd (closed at detach) - deb packaging: install rules, systemd unit, tmpfiles, default config
364 lines
15 KiB
C
364 lines
15 KiB
C
/* discovery.c — runtime VM discovery state machine (see discovery.h).
|
|
*
|
|
* Single-threaded on the loop thread (inotify + timer sources via core_add_source). On a
|
|
* "vm-<vmid>-ram" file appearing it corroborates the candidate (host-probe seam), assigns a
|
|
* stable endpoint slot, hot-plugs the trio (sink), and publishes the roster; on the file
|
|
* disappearing it tears the endpoint down and publishes a roster DETACH. QMP-not-up-yet is a
|
|
* transient retry driven by a timerfd (no busy-wait); config errors / stale files drop. */
|
|
#define _GNU_SOURCE
|
|
#include "discovery.h"
|
|
#include "slot.h"
|
|
#include "core_internal.h" /* core_roster_publish */
|
|
#include "memctx.h" /* vmsig_memctx_cfg */
|
|
#include "vmhost.h" /* vmsig_vmhost_cfg */
|
|
#include "input.h" /* vmsig_input_cfg */
|
|
#include <stdlib.h>
|
|
#include <string.h>
|
|
#include <stdio.h>
|
|
#include <stdint.h>
|
|
#include <unistd.h>
|
|
#include <fcntl.h>
|
|
#include <errno.h>
|
|
#include <time.h>
|
|
#include <dirent.h>
|
|
#include <sys/inotify.h>
|
|
#include <sys/timerfd.h>
|
|
|
|
#define DISC_PATH_MAX 256
|
|
#define DISC_RETRY_MAX 40 /* give up after ~tens of seconds of QMP-not-up */
|
|
#define DISC_BACKOFF_BASE 50000000ull /* 50 ms */
|
|
#define DISC_BACKOFF_CAP 2000000000ull /* 2 s */
|
|
|
|
typedef enum { CAND_FREE = 0, CAND_PROBING, CAND_ATTACHED } cand_state;
|
|
|
|
typedef struct {
|
|
cand_state state;
|
|
uint32_t vmid;
|
|
int endpoint; /* -1 until attached */
|
|
int attempts;
|
|
uint64_t next_probe_ns; /* monotonic deadline for the next retry */
|
|
vmsig_host_facts facts; /* probe working copy */
|
|
} cand_ent;
|
|
|
|
struct vmsig_discovery {
|
|
vmsig_core* core;
|
|
char watch_dir[DISC_PATH_MAX];
|
|
char slots_path[DISC_PATH_MAX];
|
|
int persist;
|
|
vmsig_host_probe probe;
|
|
vmsig_discovery_sink sink;
|
|
int ifd; /* inotify */
|
|
int wd;
|
|
int tfd; /* retry timerfd */
|
|
slot_table slots;
|
|
cand_ent cand[VMSIG_SLOT_COUNT];
|
|
/* Stable per-endpoint home for the adapter cfg strings (ram_path/qmp_path): the adapters
|
|
* keep pointers, and detach is deferred, so this must outlive the candidate. Overwritten
|
|
* only on the NEXT attach to the endpoint, which never races a still-open prior adapter. */
|
|
vmsig_host_facts ep_facts[VMSIG_SLOT_COUNT];
|
|
};
|
|
|
|
static uint64_t now_ns(void) {
|
|
struct timespec ts;
|
|
clock_gettime(CLOCK_MONOTONIC, &ts);
|
|
return (uint64_t)ts.tv_sec * 1000000000ull + (uint64_t)ts.tv_nsec;
|
|
}
|
|
|
|
static uint64_t backoff_ns(int attempts) {
|
|
uint64_t b = DISC_BACKOFF_BASE << (attempts < 6 ? attempts : 6);
|
|
return b > DISC_BACKOFF_CAP ? DISC_BACKOFF_CAP : b;
|
|
}
|
|
|
|
/* Parse exactly "vm-<digits>-ram" -> vmid; 0 if it does not match. */
|
|
static uint32_t parse_vmid(const char* name) {
|
|
if (strncmp(name, "vm-", 3) != 0) return 0;
|
|
const char* p = name + 3;
|
|
if (*p < '0' || *p > '9') return 0;
|
|
uint64_t v = 0;
|
|
while (*p >= '0' && *p <= '9') { v = v * 10 + (uint64_t)(*p - '0'); p++; if (v > 0xFFFFFFFFull) return 0; }
|
|
if (strcmp(p, "-ram") != 0) return 0;
|
|
return (uint32_t)v;
|
|
}
|
|
|
|
static cand_ent* cand_find(vmsig_discovery* d, uint32_t vmid) {
|
|
for (int i = 0; i < VMSIG_SLOT_COUNT; i++)
|
|
if (d->cand[i].state != CAND_FREE && d->cand[i].vmid == vmid) return &d->cand[i];
|
|
return NULL;
|
|
}
|
|
|
|
static cand_ent* cand_alloc(vmsig_discovery* d, uint32_t vmid) {
|
|
cand_ent* e = cand_find(d, vmid);
|
|
if (e) return e;
|
|
for (int i = 0; i < VMSIG_SLOT_COUNT; i++)
|
|
if (d->cand[i].state == CAND_FREE) {
|
|
memset(&d->cand[i], 0, sizeof d->cand[i]);
|
|
d->cand[i].vmid = vmid; d->cand[i].endpoint = -1;
|
|
return &d->cand[i];
|
|
}
|
|
return NULL; /* 64-candidate ceiling */
|
|
}
|
|
|
|
/* Arm the retry timer to the soonest pending probe, or disarm if none pending. */
|
|
static void rearm_timer(vmsig_discovery* d) {
|
|
uint64_t soonest = 0; int any = 0;
|
|
for (int i = 0; i < VMSIG_SLOT_COUNT; i++)
|
|
if (d->cand[i].state == CAND_PROBING && d->cand[i].next_probe_ns) {
|
|
if (!any || d->cand[i].next_probe_ns < soonest) soonest = d->cand[i].next_probe_ns;
|
|
any = 1;
|
|
}
|
|
struct itimerspec its;
|
|
memset(&its, 0, sizeof its);
|
|
if (any) {
|
|
uint64_t now = now_ns();
|
|
uint64_t dt = soonest > now ? soonest - now : 1000000ull; /* >=1ms */
|
|
its.it_value.tv_sec = (time_t)(dt / 1000000000ull);
|
|
its.it_value.tv_nsec = (long)(dt % 1000000000ull);
|
|
}
|
|
timerfd_settime(d->tfd, 0, &its, NULL); /* it_value 0 => disarm */
|
|
}
|
|
|
|
static void publish_roster(vmsig_discovery* d, uint32_t ep, uint32_t vmid, uint32_t state,
|
|
uint32_t action, const char* name) {
|
|
vmsig_roster r;
|
|
memset(&r, 0, sizeof r);
|
|
r.vmid = vmid; r.state = state; r.action = action;
|
|
if (name) {
|
|
size_t n = strlen(name);
|
|
if (n >= VMSIG_ROSTER_NAME_MAX) { n = VMSIG_ROSTER_NAME_MAX - 1; r.flags |= VMSIG_ROSTER_NAME_TRUNC; }
|
|
memcpy(r.name, name, n);
|
|
}
|
|
core_roster_publish(d->core, ep, &r);
|
|
}
|
|
|
|
static void cand_drop(cand_ent* c) {
|
|
c->state = CAND_FREE; c->vmid = 0; c->endpoint = -1; c->attempts = 0; c->next_probe_ns = 0;
|
|
}
|
|
|
|
static void do_attach(vmsig_discovery* d, cand_ent* c) {
|
|
int ep = slot_alloc(&d->slots, c->vmid);
|
|
if (ep < 0) {
|
|
fprintf(stderr, "vmsig discovery: no free endpoint for vmid %u (64-VM ceiling)\n", c->vmid);
|
|
cand_drop(c);
|
|
return;
|
|
}
|
|
d->ep_facts[ep] = c->facts; /* stable home for cfg strings the adapters keep */
|
|
if (d->sink.attach(d->sink.ud, d->core, c->vmid, (uint32_t)ep, &d->ep_facts[ep]) != 0) {
|
|
slot_free(&d->slots, c->vmid);
|
|
fprintf(stderr, "vmsig discovery: attach failed for vmid %u\n", c->vmid);
|
|
cand_drop(c);
|
|
return;
|
|
}
|
|
c->state = CAND_ATTACHED; c->endpoint = ep;
|
|
publish_roster(d, (uint32_t)ep, c->vmid, (uint32_t)c->facts.vm_state, VMSIG_ROSTER_ATTACH,
|
|
c->facts.name);
|
|
if (d->persist) slot_save(&d->slots, d->slots_path);
|
|
}
|
|
|
|
static void do_detach(vmsig_discovery* d, cand_ent* c) {
|
|
int ep = c->endpoint;
|
|
if (ep >= 0) {
|
|
publish_roster(d, (uint32_t)ep, c->vmid, VMSIG_VM_SHUTDOWN, VMSIG_ROSTER_DETACH,
|
|
c->facts.name);
|
|
d->sink.detach(d->sink.ud, d->core, c->vmid, (uint32_t)ep); /* deferred teardown */
|
|
slot_free(&d->slots, c->vmid); /* bit vacated (ordered) */
|
|
if (d->persist) slot_save(&d->slots, d->slots_path);
|
|
/* ep_facts[ep] is intentionally NOT cleared: the deferred adapter reap still reads the
|
|
* cfg strings; it is overwritten on the next attach to this endpoint. */
|
|
}
|
|
cand_drop(c);
|
|
}
|
|
|
|
static void try_probe(vmsig_discovery* d, cand_ent* c) {
|
|
d->probe.config(&d->probe, c->vmid, &c->facts);
|
|
if (!c->facts.ok) { cand_drop(c); return; } /* not ours / no share=on */
|
|
|
|
d->probe.live(&d->probe, &c->facts);
|
|
if (c->facts.retry) {
|
|
if (++c->attempts > DISC_RETRY_MAX) {
|
|
fprintf(stderr, "vmsig discovery: vmid %u QMP never came up, giving up\n", c->vmid);
|
|
cand_drop(c);
|
|
return;
|
|
}
|
|
c->next_probe_ns = now_ns() + backoff_ns(c->attempts);
|
|
rearm_timer(d);
|
|
return;
|
|
}
|
|
if (!c->facts.ok) { cand_drop(c); return; } /* stale: file present, VM dead/unparsable */
|
|
|
|
do_attach(d, c);
|
|
}
|
|
|
|
static void on_file_appear(vmsig_discovery* d, uint32_t vmid) {
|
|
cand_ent* c = cand_alloc(d, vmid);
|
|
if (!c) { fprintf(stderr, "vmsig discovery: candidate table full, vmid %u ignored\n", vmid); return; }
|
|
if (c->state == CAND_ATTACHED) return; /* already live (duplicate event) */
|
|
if (c->state == CAND_FREE) { c->state = CAND_PROBING; c->attempts = 0; }
|
|
c->next_probe_ns = 0;
|
|
try_probe(d, c);
|
|
}
|
|
|
|
static void on_file_gone(vmsig_discovery* d, uint32_t vmid) {
|
|
cand_ent* c = cand_find(d, vmid);
|
|
if (!c) return;
|
|
if (c->state == CAND_ATTACHED) do_detach(d, c);
|
|
else cand_drop(c); /* was still probing */
|
|
}
|
|
|
|
/* ---- loop sources ------------------------------------------------------------ */
|
|
|
|
static void on_inotify(void* user, uint32_t events) {
|
|
(void)events;
|
|
vmsig_discovery* d = user;
|
|
char buf[4096] __attribute__((aligned(__alignof__(struct inotify_event))));
|
|
for (;;) {
|
|
ssize_t n = read(d->ifd, buf, sizeof buf);
|
|
if (n <= 0) { if (n < 0 && errno == EINTR) continue; break; }
|
|
for (char* p = buf; p < buf + n; ) {
|
|
struct inotify_event* ev = (struct inotify_event*)p;
|
|
if (ev->len) {
|
|
uint32_t vmid = parse_vmid(ev->name);
|
|
if (vmid) {
|
|
if (ev->mask & (IN_CREATE | IN_MOVED_TO | IN_CLOSE_WRITE)) on_file_appear(d, vmid);
|
|
else if (ev->mask & (IN_DELETE | IN_MOVED_FROM)) on_file_gone(d, vmid);
|
|
}
|
|
}
|
|
p += sizeof(struct inotify_event) + ev->len;
|
|
}
|
|
}
|
|
}
|
|
|
|
static void on_timer(void* user, uint32_t events) {
|
|
(void)events;
|
|
vmsig_discovery* d = user;
|
|
uint64_t v;
|
|
while (read(d->tfd, &v, sizeof v) == (ssize_t)sizeof v) { /* drain */ }
|
|
uint64_t now = now_ns();
|
|
for (int i = 0; i < VMSIG_SLOT_COUNT; i++) {
|
|
cand_ent* c = &d->cand[i];
|
|
if (c->state == CAND_PROBING && c->next_probe_ns && c->next_probe_ns <= now)
|
|
try_probe(d, c);
|
|
}
|
|
rearm_timer(d);
|
|
}
|
|
|
|
static void bootstrap_scan(vmsig_discovery* d) {
|
|
DIR* dir = opendir(d->watch_dir);
|
|
if (!dir) return;
|
|
struct dirent* de;
|
|
while ((de = readdir(dir)) != NULL) {
|
|
uint32_t vmid = parse_vmid(de->d_name);
|
|
if (vmid) on_file_appear(d, vmid);
|
|
}
|
|
closedir(dir);
|
|
/* GC persisted-but-not-live slots: a vmid bound in .slots with no live file (it died while
|
|
* the daemon was down) keeps its bit pinned; free it so the ceiling is not leaked. */
|
|
for (int e = 0; e < VMSIG_SLOT_COUNT; e++) {
|
|
uint32_t vmid = d->slots.ent[e].vmid;
|
|
if (!vmid) continue;
|
|
cand_ent* c = cand_find(d, vmid);
|
|
if (!c || c->state != CAND_ATTACHED) slot_free(&d->slots, vmid);
|
|
}
|
|
if (d->persist) slot_save(&d->slots, d->slots_path);
|
|
}
|
|
|
|
/* ---- default sink: wire the core adapter trio ------------------------------- */
|
|
|
|
static int default_attach(void* ud, vmsig_core* core, uint32_t vmid, uint32_t endpoint,
|
|
const vmsig_host_facts* f) {
|
|
(void)ud; (void)vmid;
|
|
vmsig_memctx_cfg mc; memset(&mc, 0, sizeof mc);
|
|
mc.stub = 0; mc.ram_path = f->ram_path; mc.low = f->low; mc.ro_fd = -1;
|
|
vmsig_vmhost_cfg vh; memset(&vh, 0, sizeof vh);
|
|
vh.stub = 0; vh.qmp_path = f->qmp_path;
|
|
vmsig_input_cfg in; memset(&in, 0, sizeof in);
|
|
in.stub = 0; in.qmp_path = NULL; /* input is uinput; power/lifecycle via the vmhost seam */
|
|
|
|
if (vmsig_core_add_adapter(core, vmsig_memctx_ops(), &mc, endpoint) < 0) goto fail;
|
|
if (vmsig_core_add_adapter(core, vmsig_vmhost_ops(), &vh, endpoint) < 0) goto fail;
|
|
if (vmsig_core_add_adapter(core, vmsig_input_ops(), &in, endpoint) < 0) goto fail;
|
|
return 0;
|
|
fail:
|
|
vmsig_core_detach_endpoint(core, endpoint); /* roll back any partial trio (deferred) */
|
|
return -1;
|
|
}
|
|
|
|
static void default_detach(void* ud, vmsig_core* core, uint32_t vmid, uint32_t endpoint) {
|
|
(void)ud; (void)vmid;
|
|
vmsig_core_detach_endpoint(core, endpoint);
|
|
}
|
|
|
|
/* ---- lifecycle --------------------------------------------------------------- */
|
|
|
|
void vmsig_discovery_free(void* user) {
|
|
vmsig_discovery* d = user;
|
|
if (!d) return;
|
|
if (d->ifd >= 0) close(d->ifd);
|
|
if (d->tfd >= 0) close(d->tfd);
|
|
free(d);
|
|
}
|
|
|
|
vmsig_discovery* vmsig_discovery_new(vmsig_core* core,
|
|
const char* watch_dir, const char* pve_conf,
|
|
const char* qmp_dir, const char* slots_path,
|
|
const vmsig_host_probe* probe,
|
|
const vmsig_discovery_sink* sink) {
|
|
if (!core || !watch_dir) return NULL;
|
|
vmsig_discovery* d = calloc(1, sizeof *d);
|
|
if (!d) return NULL;
|
|
d->core = core;
|
|
d->ifd = d->tfd = d->wd = -1;
|
|
snprintf(d->watch_dir, sizeof d->watch_dir, "%s", watch_dir);
|
|
if (slots_path && *slots_path) {
|
|
snprintf(d->slots_path, sizeof d->slots_path, "%s", slots_path);
|
|
d->persist = 1;
|
|
}
|
|
for (int i = 0; i < VMSIG_SLOT_COUNT; i++) d->cand[i].endpoint = -1;
|
|
|
|
if (probe) d->probe = *probe;
|
|
else d->probe = host_probe_proxmox(d->watch_dir,
|
|
pve_conf ? pve_conf : "/etc/pve/qemu-server",
|
|
qmp_dir ? qmp_dir : "/var/run/qemu-server");
|
|
if (sink) d->sink = *sink;
|
|
else { d->sink.attach = default_attach; d->sink.detach = default_detach; d->sink.ud = NULL; }
|
|
|
|
slot_load(&d->slots, d->persist ? d->slots_path : NULL);
|
|
|
|
d->ifd = inotify_init1(IN_NONBLOCK | IN_CLOEXEC);
|
|
if (d->ifd < 0) { vmsig_discovery_free(d); return NULL; }
|
|
d->wd = inotify_add_watch(d->ifd, d->watch_dir,
|
|
IN_CREATE | IN_MOVED_TO | IN_DELETE | IN_MOVED_FROM | IN_CLOSE_WRITE | IN_ONLYDIR);
|
|
/* a missing watch dir is not fatal: the dir may be created later; bootstrap finds nothing. */
|
|
|
|
d->tfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
|
|
if (d->tfd < 0) { vmsig_discovery_free(d); return NULL; }
|
|
|
|
/* The inotify source owns the discovery lifetime (on_free frees ifd+tfd+d); the timer
|
|
* source shares the handle with on_free=NULL. */
|
|
if (core_add_source(core, d->ifd, on_inotify, d, vmsig_discovery_free) != 0) {
|
|
vmsig_discovery_free(d); return NULL;
|
|
}
|
|
if (core_add_source(core, d->tfd, on_timer, d, NULL) != 0) {
|
|
/* ifd already enrolled with the on_free; closing here would double-free at core_free.
|
|
* Leave it to core_free to reap. Return NULL to signal partial failure is not clean. */
|
|
return NULL;
|
|
}
|
|
|
|
bootstrap_scan(d);
|
|
rearm_timer(d);
|
|
return d;
|
|
}
|
|
|
|
int vmsig_discovery_slot_of_vmid(vmsig_discovery* d, uint32_t vmid) {
|
|
if (!d) return -1;
|
|
return slot_lookup(&d->slots, vmid);
|
|
}
|
|
|
|
/* ---- TEST-ONLY hooks: drive the state machine deterministically (no inotify/timer) ---- */
|
|
void vmsig_discovery_feed(vmsig_discovery* d, uint32_t vmid, int present) {
|
|
if (present) on_file_appear(d, vmid); else on_file_gone(d, vmid);
|
|
}
|
|
void vmsig_discovery_tick(vmsig_discovery* d) { /* force a re-probe of every probing candidate */
|
|
for (int i = 0; i < VMSIG_SLOT_COUNT; i++)
|
|
if (d->cand[i].state == CAND_PROBING) try_probe(d, &d->cand[i]);
|
|
}
|