Files
vatrog-vm-signaling/src/core/core.c
T
lirent 9bde398b6c vmsig: management daemon, runtime endpoint lifecycle, roster, discovery, in-tree drivers, packaging
- core: runtime attach/detach of a per-endpoint adapter trio (runtime-safe add_adapter + vmsig_core_detach_endpoint, deferred reap)
- roster: VMSIG_EV_ROSTER + CAP_ROSTER, retained per-endpoint and replayed to late subscribers
- discovery: inotify trigger dir, vmid/endpoint slot allocator, host probe; vmsigd daemon with config + per-uid admission
- input driver and vgpu perception built in-tree; vgpu perception as a separate library
- memctx: own the supplied ro_fd (closed at detach)
- deb packaging: install rules, systemd unit, tmpfiles, default config
2026-06-22 17:25:06 +03:00

264 lines
9.5 KiB
C

/* core.c — core lifecycle and registration of adapters/controls.
* The loop and pumps live in loop.c. */
#include "core_internal.h"
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <sys/epoll.h>
#include <sys/eventfd.h>
core_slot* core_register_fd(vmsig_core* c, int fd, uint32_t epoll_events, slot_role role) {
/* reuse a detached (SLOT_DEAD) slot so c->slots[] does not grow on every
* connection */
core_slot* s = NULL;
for (int i = 0; i < c->nslots; i++)
if (c->slots[i]->role == SLOT_DEAD) { s = c->slots[i]; break; }
if (!s) {
if (c->nslots == c->cap_slots) {
int ncap = c->cap_slots ? c->cap_slots * 2 : 16;
core_slot** ns = realloc(c->slots, (size_t)ncap * sizeof *ns);
if (!ns) return NULL;
c->slots = ns;
c->cap_slots = ncap;
}
s = calloc(1, sizeof *s);
if (!s) return NULL;
c->slots[c->nslots++] = s;
}
memset(s, 0, sizeof *s);
s->role = role;
s->fd = fd;
struct epoll_event ee;
memset(&ee, 0, sizeof ee);
ee.events = epoll_events;
ee.data.ptr = s;
if (epoll_ctl(c->epfd, EPOLL_CTL_ADD, fd, &ee) < 0) { s->role = SLOT_DEAD; return NULL; }
return s;
}
vmsig_core* vmsig_core_new(vmsig_ctx* ctx) {
if (!ctx) return NULL;
vmsig_core* c = calloc(1, sizeof *c);
if (!c) return NULL;
c->ctx = ctx;
c->epfd = -1;
c->wake_fd = -1;
c->epfd = epoll_create1(EPOLL_CLOEXEC);
if (c->epfd < 0) { free(c); return NULL; }
c->wake_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (c->wake_fd < 0) { close(c->epfd); free(c); return NULL; }
if (!core_register_fd(c, c->wake_fd, EPOLLIN, SLOT_WAKEUP)) {
close(c->wake_fd); close(c->epfd); free(c); return NULL;
}
/* context pacing timerfds (created in ctx_new) as loop sources */
for (int d = VMSIG_DIR_UP; d <= VMSIG_DIR_DOWN; d++) {
int tfd = vmsig_ctx_timing_fd(ctx, (vmsig_dir)d);
if (tfd >= 0) core_register_fd(c, tfd, EPOLLIN, SLOT_CTX_TIMING);
}
return c;
}
int vmsig_core_add_adapter(vmsig_core* c, const vmsig_adapter_ops* ops,
const void* cfg, uint32_t endpoint) {
if (!c || !ops) return -1;
/* Reuse a reaped (inactive) adapter entry so runtime detach/re-attach churn does
* not exhaust the fixed table; otherwise grow up to the ceiling. */
int id = -1;
for (int i = 0; i < c->nadapters; i++)
if (!c->adapters[i].active) { id = i; break; }
if (id < 0) {
if (c->nadapters >= VMSIG_MAX_ADAPTERS) return -1;
id = c->nadapters++;
}
core_adapter_ent* e = &c->adapters[id];
uint16_t gen = e->gen; /* generation survives the memset below */
vmsig_adapter* a = ops->open(cfg, endpoint);
if (!a) return -1; /* entry stays inactive (reusable) */
vmsig_emit emit = { core_emit_up, core_register_memctx, core_unregister_memctx, c };
vmsig_fd_reg reg[VMSIG_ADAPTER_FDS];
memset(reg, 0, sizeof reg);
int n = ops->attach(a, &emit, reg, VMSIG_ADAPTER_FDS);
if (n < 0) { ops->close(a); return -1; }
memset(e, 0, sizeof *e);
e->ops = ops;
e->a = a;
e->endpoint = endpoint;
e->active = 1;
e->gen = (uint16_t)(gen + 1);
e->nslot = 0;
for (int i = 0; i < n; i++) {
uint32_t events = reg[i].epoll_events ? reg[i].epoll_events : (uint32_t)EPOLLIN;
core_slot* s = core_register_fd(c, reg[i].fd, events, SLOT_ADAPTER);
if (!s) {
/* roll back: deregister the fds enrolled so far, then close + free the entry. */
for (int k = 0; k < e->nslot; k++) {
epoll_ctl(c->epfd, EPOLL_CTL_DEL, e->slots[k]->fd, NULL);
e->slots[k]->role = SLOT_DEAD;
}
ops->close(a);
e->active = 0; e->a = NULL; e->nslot = 0;
return -1;
}
s->ops = ops;
s->adapter = a;
s->cookie = reg[i].cookie;
if (e->nslot < VMSIG_ADAPTER_FDS) e->slots[e->nslot++] = s;
}
return id;
}
/* Request runtime detach of every adapter on `endpoint` (deferred reap after the batch,
* mirrors core_request_drop). The teardown itself (epoch settle, SEAM_DOWN, lease release,
* epoll DEL, ops->close) runs in core_reap_adapters on the loop thread. */
void vmsig_core_detach_endpoint(vmsig_core* c, uint32_t endpoint) {
if (!c || endpoint >= 64) return;
int any = 0;
for (int i = 0; i < c->nadapters; i++) {
core_adapter_ent* e = &c->adapters[i];
if (e->active && e->endpoint == endpoint) { e->reap = 1; any = 1; }
}
if (any) core_wake(c);
}
int vmsig_core_add_control(vmsig_core* c, const vmsig_control_ops* ops, void* ctl,
const vmsig_grant* grant) {
if (!c || !ops) return -1;
/* reuse a freed (reaped) slot; otherwise grow up to the ceiling */
int id = -1;
for (int i = 0; i < c->ncontrols; i++)
if (!c->controls[i].active) { id = i; break; }
if (id < 0) {
if (c->ncontrols >= VMSIG_MAX_CONTROLS) return -1;
id = c->ncontrols++;
}
core_control_ent* e = &c->controls[id];
uint16_t gen = e->gen; /* generation survives the slot memset */
memset(e, 0, sizeof *e);
e->gen = (uint16_t)(gen + 1); /* new generation for this (re)use */
e->ops = ops;
e->ctl = ctl;
e->active = 1;
if (grant) e->grant = *grant; /* otherwise stays zero => default-deny */
e->dctx.core = c;
e->dctx.ctl_id = id;
if (ops->subscribe) ops->subscribe(ctl, &e->sub);
/* emit_down token is our down_ctx, so emit_down can find this control's grant */
if (ops->set_emit_down) ops->set_emit_down(ctl, core_emit_down, &e->dctx);
int fd = ops->fd ? ops->fd(ctl) : -1;
if (fd >= 0) {
core_slot* s = core_register_fd(c, fd, EPOLLIN, SLOT_CONTROL);
if (!s) return -1;
s->cops = ops;
s->ctl = ctl;
e->slot = s;
}
/* Late subscriber: replay retained MEMCTX (if a context is already published and
* this control is qualified). For a control added BEFORE the first publication,
* the cell is not yet valid — it receives MEMCTX via the normal multicast in pump_up. */
core_memctx_replay(c, id);
core_roster_replay(c, id); /* late subscriber: retained VM roster (CAP_ROSTER) */
return id; /* ncontrols already bumped when picking id (on growth); reuse does not grow it */
}
/* ===== MEMCTX registration: per-endpoint retain cell (called by the adapter on the loop thread) =====
* Registers the address-space context adapter's reg hooks. The core holds THIS and does
* NOT store a copy of the locator: on delivery/replay it calls reg.describe/share_fd.
* valid/epoch are maintained in route/epoch_bump (not here): register only records that
* "the adapter is connected". */
int core_register_memctx(void* token, const vmsig_memctx_reg* reg) {
vmsig_core* c = token;
if (!c || !reg || reg->endpoint >= 64) return -1;
core_memctx_cell* cell = &c->memctx[reg->endpoint];
cell->reg = *reg;
cell->registered = 1;
return 0;
}
void core_unregister_memctx(void* token, uint32_t endpoint) {
vmsig_core* c = token;
if (!c || endpoint >= 64) return;
core_memctx_cell* cell = &c->memctx[endpoint];
cell->registered = 0;
cell->valid = 0;
memset(&cell->reg, 0, sizeof cell->reg);
}
void vmsig_core_set_audit(vmsig_core* c, void (*cb)(void* ud, const vmsig_audit* a), void* ud) {
if (!c) return;
c->audit_cb = cb;
c->audit_ud = ud;
}
void core_audit(vmsig_core* c, const vmsig_audit* a) {
if (c && c->audit_cb) c->audit_cb(c->audit_ud, a);
}
void vmsig_core_set_arb_policy(vmsig_core* c, vmsig_arb_policy cb, void* ud) {
if (!c) return;
c->arb_cb = cb;
c->arb_ud = ud;
/* lease[][] is zeroed in vmsig_core_new (calloc) => all cells free. */
}
int core_add_source(vmsig_core* c, int fd, void (*cb)(void* user, uint32_t events),
void* user, void (*on_free)(void* user)) {
if (!c || fd < 0 || !cb) return -1;
core_slot* s = core_register_fd(c, fd, EPOLLIN, SLOT_SOURCE);
if (!s) return -1;
s->on_source = cb;
s->on_free = on_free;
s->source_user = user;
return 0;
}
void core_request_drop(vmsig_core* c, int ctl_id) {
if (!c || ctl_id < 0 || ctl_id >= c->ncontrols) return;
c->controls[ctl_id].reap = 1;
core_wake(c); /* wake the loop for a reap pass (without stop) */
}
void vmsig_core_free(vmsig_core* c) {
if (!c) return;
/* graceful: stop workers and close SI handles / sockets. Adapters are closed
* FIRST: their close stops off-loop workers and unregisters their seams (e.g.
* memctx) BEFORE destruction. */
for (int i = 0; i < c->nadapters; i++)
if (c->adapters[i].active && c->adapters[i].ops->close)
c->adapters[i].ops->close(c->adapters[i].a);
for (int i = 0; i < c->ncontrols; i++)
if (c->controls[i].active && c->controls[i].ops->close)
c->controls[i].ops->close(c->controls[i].ctl);
/* cleanup of fd sources (e.g. unix listener: close listen/janitor fd + free) */
for (int i = 0; i < c->nslots; i++)
if (c->slots[i]->role == SLOT_SOURCE && c->slots[i]->on_free)
c->slots[i]->on_free(c->slots[i]->source_user);
for (int i = 0; i < c->nslots; i++) free(c->slots[i]);
free(c->slots);
if (c->wake_fd >= 0) close(c->wake_fd);
if (c->epfd >= 0) close(c->epfd);
/* ctx is not ours: its owner frees it */
free(c);
}