/* discovery.c — runtime VM discovery state machine (see discovery.h). * * Single-threaded on the loop thread (inotify + timer sources via core_add_source). On a * "vm--ram" file appearing it corroborates the candidate (host-probe seam), assigns a * stable endpoint slot, hot-plugs the trio (sink), and publishes the roster; on the file * disappearing it tears the endpoint down and publishes a roster DETACH. QMP-not-up-yet is a * transient retry driven by a timerfd (no busy-wait); config errors / stale files drop. */ #define _GNU_SOURCE #include "discovery.h" #include "slot.h" #include "core_internal.h" /* core_roster_publish */ #include "memctx.h" /* vmsig_memctx_cfg */ #include "vmhost.h" /* vmsig_vmhost_cfg */ #include "input.h" /* vmsig_input_cfg */ #include #include #include #include #include #include #include #include #include #include #include #define DISC_PATH_MAX 256 #define DISC_RETRY_MAX 40 /* give up after ~tens of seconds of QMP-not-up */ #define DISC_BACKOFF_BASE 50000000ull /* 50 ms */ #define DISC_BACKOFF_CAP 2000000000ull /* 2 s */ typedef enum { CAND_FREE = 0, CAND_PROBING, CAND_ATTACHED } cand_state; typedef struct { cand_state state; uint32_t vmid; int endpoint; /* -1 until attached */ int attempts; uint64_t next_probe_ns; /* monotonic deadline for the next retry */ vmsig_host_facts facts; /* probe working copy */ } cand_ent; struct vmsig_discovery { vmsig_core* core; char watch_dir[DISC_PATH_MAX]; char slots_path[DISC_PATH_MAX]; int persist; vmsig_host_probe probe; vmsig_discovery_sink sink; int ifd; /* inotify */ int wd; int tfd; /* retry timerfd */ slot_table slots; cand_ent cand[VMSIG_SLOT_COUNT]; /* Stable per-endpoint home for the adapter cfg strings (ram_path/qmp_path): the adapters * keep pointers, and detach is deferred, so this must outlive the candidate. Overwritten * only on the NEXT attach to the endpoint, which never races a still-open prior adapter. */ vmsig_host_facts ep_facts[VMSIG_SLOT_COUNT]; }; static uint64_t now_ns(void) { struct timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); return (uint64_t)ts.tv_sec * 1000000000ull + (uint64_t)ts.tv_nsec; } static uint64_t backoff_ns(int attempts) { uint64_t b = DISC_BACKOFF_BASE << (attempts < 6 ? attempts : 6); return b > DISC_BACKOFF_CAP ? DISC_BACKOFF_CAP : b; } /* Parse exactly "vm--ram" -> vmid; 0 if it does not match. */ static uint32_t parse_vmid(const char* name) { if (strncmp(name, "vm-", 3) != 0) return 0; const char* p = name + 3; if (*p < '0' || *p > '9') return 0; uint64_t v = 0; while (*p >= '0' && *p <= '9') { v = v * 10 + (uint64_t)(*p - '0'); p++; if (v > 0xFFFFFFFFull) return 0; } if (strcmp(p, "-ram") != 0) return 0; return (uint32_t)v; } static cand_ent* cand_find(vmsig_discovery* d, uint32_t vmid) { for (int i = 0; i < VMSIG_SLOT_COUNT; i++) if (d->cand[i].state != CAND_FREE && d->cand[i].vmid == vmid) return &d->cand[i]; return NULL; } static cand_ent* cand_alloc(vmsig_discovery* d, uint32_t vmid) { cand_ent* e = cand_find(d, vmid); if (e) return e; for (int i = 0; i < VMSIG_SLOT_COUNT; i++) if (d->cand[i].state == CAND_FREE) { memset(&d->cand[i], 0, sizeof d->cand[i]); d->cand[i].vmid = vmid; d->cand[i].endpoint = -1; return &d->cand[i]; } return NULL; /* 64-candidate ceiling */ } /* Arm the retry timer to the soonest pending probe, or disarm if none pending. */ static void rearm_timer(vmsig_discovery* d) { uint64_t soonest = 0; int any = 0; for (int i = 0; i < VMSIG_SLOT_COUNT; i++) if (d->cand[i].state == CAND_PROBING && d->cand[i].next_probe_ns) { if (!any || d->cand[i].next_probe_ns < soonest) soonest = d->cand[i].next_probe_ns; any = 1; } struct itimerspec its; memset(&its, 0, sizeof its); if (any) { uint64_t now = now_ns(); uint64_t dt = soonest > now ? soonest - now : 1000000ull; /* >=1ms */ its.it_value.tv_sec = (time_t)(dt / 1000000000ull); its.it_value.tv_nsec = (long)(dt % 1000000000ull); } timerfd_settime(d->tfd, 0, &its, NULL); /* it_value 0 => disarm */ } static void publish_roster(vmsig_discovery* d, uint32_t ep, uint32_t vmid, uint32_t state, uint32_t action, const char* name) { vmsig_roster r; memset(&r, 0, sizeof r); r.vmid = vmid; r.state = state; r.action = action; if (name) { size_t n = strlen(name); if (n >= VMSIG_ROSTER_NAME_MAX) { n = VMSIG_ROSTER_NAME_MAX - 1; r.flags |= VMSIG_ROSTER_NAME_TRUNC; } memcpy(r.name, name, n); } core_roster_publish(d->core, ep, &r); } static void cand_drop(cand_ent* c) { c->state = CAND_FREE; c->vmid = 0; c->endpoint = -1; c->attempts = 0; c->next_probe_ns = 0; } static void do_attach(vmsig_discovery* d, cand_ent* c) { int ep = slot_alloc(&d->slots, c->vmid); if (ep < 0) { fprintf(stderr, "vmsig discovery: no free endpoint for vmid %u (64-VM ceiling)\n", c->vmid); cand_drop(c); return; } d->ep_facts[ep] = c->facts; /* stable home for cfg strings the adapters keep */ if (d->sink.attach(d->sink.ud, d->core, c->vmid, (uint32_t)ep, &d->ep_facts[ep]) != 0) { slot_free(&d->slots, c->vmid); fprintf(stderr, "vmsig discovery: attach failed for vmid %u\n", c->vmid); cand_drop(c); return; } c->state = CAND_ATTACHED; c->endpoint = ep; publish_roster(d, (uint32_t)ep, c->vmid, (uint32_t)c->facts.vm_state, VMSIG_ROSTER_ATTACH, c->facts.name); if (d->persist) slot_save(&d->slots, d->slots_path); } static void do_detach(vmsig_discovery* d, cand_ent* c) { int ep = c->endpoint; if (ep >= 0) { publish_roster(d, (uint32_t)ep, c->vmid, VMSIG_VM_SHUTDOWN, VMSIG_ROSTER_DETACH, c->facts.name); d->sink.detach(d->sink.ud, d->core, c->vmid, (uint32_t)ep); /* deferred teardown */ slot_free(&d->slots, c->vmid); /* bit vacated (ordered) */ if (d->persist) slot_save(&d->slots, d->slots_path); /* ep_facts[ep] is intentionally NOT cleared: the deferred adapter reap still reads the * cfg strings; it is overwritten on the next attach to this endpoint. */ } cand_drop(c); } static void try_probe(vmsig_discovery* d, cand_ent* c) { d->probe.config(&d->probe, c->vmid, &c->facts); if (!c->facts.ok) { cand_drop(c); return; } /* not ours / no share=on */ d->probe.live(&d->probe, &c->facts); if (c->facts.retry) { if (++c->attempts > DISC_RETRY_MAX) { fprintf(stderr, "vmsig discovery: vmid %u QMP never came up, giving up\n", c->vmid); cand_drop(c); return; } c->next_probe_ns = now_ns() + backoff_ns(c->attempts); rearm_timer(d); return; } if (!c->facts.ok) { cand_drop(c); return; } /* stale: file present, VM dead/unparsable */ do_attach(d, c); } static void on_file_appear(vmsig_discovery* d, uint32_t vmid) { cand_ent* c = cand_alloc(d, vmid); if (!c) { fprintf(stderr, "vmsig discovery: candidate table full, vmid %u ignored\n", vmid); return; } if (c->state == CAND_ATTACHED) return; /* already live (duplicate event) */ if (c->state == CAND_FREE) { c->state = CAND_PROBING; c->attempts = 0; } c->next_probe_ns = 0; try_probe(d, c); } static void on_file_gone(vmsig_discovery* d, uint32_t vmid) { cand_ent* c = cand_find(d, vmid); if (!c) return; if (c->state == CAND_ATTACHED) do_detach(d, c); else cand_drop(c); /* was still probing */ } /* ---- loop sources ------------------------------------------------------------ */ static void on_inotify(void* user, uint32_t events) { (void)events; vmsig_discovery* d = user; char buf[4096] __attribute__((aligned(__alignof__(struct inotify_event)))); for (;;) { ssize_t n = read(d->ifd, buf, sizeof buf); if (n <= 0) { if (n < 0 && errno == EINTR) continue; break; } for (char* p = buf; p < buf + n; ) { struct inotify_event* ev = (struct inotify_event*)p; if (ev->len) { uint32_t vmid = parse_vmid(ev->name); if (vmid) { if (ev->mask & (IN_CREATE | IN_MOVED_TO | IN_CLOSE_WRITE)) on_file_appear(d, vmid); else if (ev->mask & (IN_DELETE | IN_MOVED_FROM)) on_file_gone(d, vmid); } } p += sizeof(struct inotify_event) + ev->len; } } } static void on_timer(void* user, uint32_t events) { (void)events; vmsig_discovery* d = user; uint64_t v; while (read(d->tfd, &v, sizeof v) == (ssize_t)sizeof v) { /* drain */ } uint64_t now = now_ns(); for (int i = 0; i < VMSIG_SLOT_COUNT; i++) { cand_ent* c = &d->cand[i]; if (c->state == CAND_PROBING && c->next_probe_ns && c->next_probe_ns <= now) try_probe(d, c); } rearm_timer(d); } static void bootstrap_scan(vmsig_discovery* d) { DIR* dir = opendir(d->watch_dir); if (!dir) return; struct dirent* de; while ((de = readdir(dir)) != NULL) { uint32_t vmid = parse_vmid(de->d_name); if (vmid) on_file_appear(d, vmid); } closedir(dir); /* GC persisted-but-not-live slots: a vmid bound in .slots with no live file (it died while * the daemon was down) keeps its bit pinned; free it so the ceiling is not leaked. */ for (int e = 0; e < VMSIG_SLOT_COUNT; e++) { uint32_t vmid = d->slots.ent[e].vmid; if (!vmid) continue; cand_ent* c = cand_find(d, vmid); if (!c || c->state != CAND_ATTACHED) slot_free(&d->slots, vmid); } if (d->persist) slot_save(&d->slots, d->slots_path); } /* ---- default sink: wire the core adapter trio ------------------------------- */ static int default_attach(void* ud, vmsig_core* core, uint32_t vmid, uint32_t endpoint, const vmsig_host_facts* f) { (void)ud; (void)vmid; vmsig_memctx_cfg mc; memset(&mc, 0, sizeof mc); mc.stub = 0; mc.ram_path = f->ram_path; mc.low = f->low; mc.ro_fd = -1; vmsig_vmhost_cfg vh; memset(&vh, 0, sizeof vh); vh.stub = 0; vh.qmp_path = f->qmp_path; vmsig_input_cfg in; memset(&in, 0, sizeof in); in.stub = 0; in.qmp_path = NULL; /* input is uinput; power/lifecycle via the vmhost seam */ if (vmsig_core_add_adapter(core, vmsig_memctx_ops(), &mc, endpoint) < 0) goto fail; if (vmsig_core_add_adapter(core, vmsig_vmhost_ops(), &vh, endpoint) < 0) goto fail; if (vmsig_core_add_adapter(core, vmsig_input_ops(), &in, endpoint) < 0) goto fail; return 0; fail: vmsig_core_detach_endpoint(core, endpoint); /* roll back any partial trio (deferred) */ return -1; } static void default_detach(void* ud, vmsig_core* core, uint32_t vmid, uint32_t endpoint) { (void)ud; (void)vmid; vmsig_core_detach_endpoint(core, endpoint); } /* ---- lifecycle --------------------------------------------------------------- */ void vmsig_discovery_free(void* user) { vmsig_discovery* d = user; if (!d) return; if (d->ifd >= 0) close(d->ifd); if (d->tfd >= 0) close(d->tfd); free(d); } vmsig_discovery* vmsig_discovery_new(vmsig_core* core, const char* watch_dir, const char* pve_conf, const char* qmp_dir, const char* slots_path, const vmsig_host_probe* probe, const vmsig_discovery_sink* sink) { if (!core || !watch_dir) return NULL; vmsig_discovery* d = calloc(1, sizeof *d); if (!d) return NULL; d->core = core; d->ifd = d->tfd = d->wd = -1; snprintf(d->watch_dir, sizeof d->watch_dir, "%s", watch_dir); if (slots_path && *slots_path) { snprintf(d->slots_path, sizeof d->slots_path, "%s", slots_path); d->persist = 1; } for (int i = 0; i < VMSIG_SLOT_COUNT; i++) d->cand[i].endpoint = -1; if (probe) d->probe = *probe; else d->probe = host_probe_proxmox(d->watch_dir, pve_conf ? pve_conf : "/etc/pve/qemu-server", qmp_dir ? qmp_dir : "/var/run/qemu-server"); if (sink) d->sink = *sink; else { d->sink.attach = default_attach; d->sink.detach = default_detach; d->sink.ud = NULL; } slot_load(&d->slots, d->persist ? d->slots_path : NULL); d->ifd = inotify_init1(IN_NONBLOCK | IN_CLOEXEC); if (d->ifd < 0) { vmsig_discovery_free(d); return NULL; } d->wd = inotify_add_watch(d->ifd, d->watch_dir, IN_CREATE | IN_MOVED_TO | IN_DELETE | IN_MOVED_FROM | IN_CLOSE_WRITE | IN_ONLYDIR); /* a missing watch dir is not fatal: the dir may be created later; bootstrap finds nothing. */ d->tfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); if (d->tfd < 0) { vmsig_discovery_free(d); return NULL; } /* The inotify source owns the discovery lifetime (on_free frees ifd+tfd+d); the timer * source shares the handle with on_free=NULL. */ if (core_add_source(core, d->ifd, on_inotify, d, vmsig_discovery_free) != 0) { vmsig_discovery_free(d); return NULL; } if (core_add_source(core, d->tfd, on_timer, d, NULL) != 0) { /* ifd already enrolled with the on_free; closing here would double-free at core_free. * Leave it to core_free to reap. Return NULL to signal partial failure is not clean. */ return NULL; } bootstrap_scan(d); rearm_timer(d); return d; } int vmsig_discovery_slot_of_vmid(vmsig_discovery* d, uint32_t vmid) { if (!d) return -1; return slot_lookup(&d->slots, vmid); } /* ---- TEST-ONLY hooks: drive the state machine deterministically (no inotify/timer) ---- */ void vmsig_discovery_feed(vmsig_discovery* d, uint32_t vmid, int present) { if (present) on_file_appear(d, vmid); else on_file_gone(d, vmid); } void vmsig_discovery_tick(vmsig_discovery* d) { /* force a re-probe of every probing candidate */ for (int i = 0; i < VMSIG_SLOT_COUNT; i++) if (d->cand[i].state == CAND_PROBING) try_probe(d, &d->cand[i]); }