/* loop.c — non-blocking epoll loop, dispatch, pump up/down, emit hooks, * graceful shutdown. No sleep/polling/busy-wait: every wakeup is an fd. */ #include "core_internal.h" #include #include #include #include #include static void drain_counter_fd(int fd) { uint64_t v; while (read(fd, &v, sizeof v) == (ssize_t)sizeof v) { /* drain */ } } void core_wake(vmsig_core* c) { uint64_t one = 1; ssize_t r = write(c->wake_fd, &one, sizeof one); (void)r; } int core_emit_up(void* token, vmsig_event* ev) { vmsig_core* c = token; int r = vmsig_ctx_submit(c->ctx, VMSIG_DIR_UP, ev); core_wake(c); /* nudge in case of emission off the loop thread */ return r; } /* origin = (gen<<16)|(id+1): low 16 bits are the control's id+1, high bits the slot * generation. Lets a reply be addressed to the initiator and stale reuse filtered out. */ static uint32_t origin_pack(int id, uint16_t gen) { return ((uint32_t)gen << 16) | ((uint32_t)(id + 1) & 0xFFFFu); } /* Live control by origin with generation check; NULL if gone/slot reused. */ static core_control_ent* origin_ctl(vmsig_core* c, uint32_t origin) { if (!origin) return NULL; int id = (int)(origin & 0xFFFFu) - 1; uint16_t gen = (uint16_t)(origin >> 16); if (id < 0 || id >= c->ncontrols) return NULL; core_control_ent* e = &c->controls[id]; if (!e->active || e->gen != gen) return NULL; return e; } /* Capability for a DOWN command (unknown => deny). Destructive CMD_LIFECYCLE * (powerdown/reset, code in inln[0]) requires CAP_POWER, safe ones CAP_LIFECYCLE. */ static uint32_t cap_for_down(const vmsig_event* ev) { switch (ev->kind) { case VMSIG_EV_CMD_INPUT: case VMSIG_EV_CMD_QUERY_INPUT: return VMSIG_CAP_INPUT; /* injection / held-key query */ case VMSIG_EV_CMD_LIFECYCLE: return (ev->inln[0] == VMSIG_LIFE_POWERDOWN || ev->inln[0] == VMSIG_LIFE_RESET) ? VMSIG_CAP_POWER : VMSIG_CAP_LIFECYCLE; case VMSIG_EV_CMD_VM: /* op in inln[0] (vmsig_vm_cmd, op<256) */ return (ev->inln[0] == VMSIG_VMOP_RESET || ev->inln[0] == VMSIG_VMOP_POWERDOWN || ev->inln[0] == VMSIG_VMOP_QUIT) ? VMSIG_CAP_POWER : VMSIG_CAP_VM; case VMSIG_EV_CMD_MEMWRITE: return VMSIG_CAP_MEMWRITE; /* atomic guest-memory write */ default: return 0; } } /* ===== Lease layer: classification and helpers ===== */ /* Lease class for a DOWN command. MIRRORS cap_for_down by destructiveness: * - CMD_INPUT -> INPUT; * - CMD_LIFECYCLE powerdown/reset -> POWER; * - CMD_VM reset/powerdown/quit -> POWER; * - everything else (safe/read-only/stream/query) -> -1 (not lease-gated). * CMD_LIFECYCLE and CMD_VM route to DIFFERENT adapters (INPUT/VMHOST) but share ONE * POWER class per endpoint: a single owner of VM destruction (intentional). */ static int lease_class_for_down(const vmsig_event* ev) { switch (ev->kind) { case VMSIG_EV_CMD_INPUT: return VMSIG_LEASE_INPUT; case VMSIG_EV_CMD_LIFECYCLE: return (ev->inln[0] == VMSIG_LIFE_POWERDOWN || ev->inln[0] == VMSIG_LIFE_RESET) ? VMSIG_LEASE_POWER : -1; case VMSIG_EV_CMD_VM: return (ev->inln[0] == VMSIG_VMOP_RESET || ev->inln[0] == VMSIG_VMOP_POWERDOWN || ev->inln[0] == VMSIG_VMOP_QUIT) ? VMSIG_LEASE_POWER : -1; case VMSIG_EV_CMD_MEMWRITE: return VMSIG_LEASE_MEMWRITE; /* always destructive (write to shared guest memory) */ default: return -1; } } /* Cap required to lease a class (probing/holding a class without the cap is forbidden). */ static uint32_t cap_for_lease_class(int cls) { return cls == VMSIG_LEASE_INPUT ? VMSIG_CAP_INPUT : cls == VMSIG_LEASE_POWER ? VMSIG_CAP_POWER : cls == VMSIG_LEASE_MEMWRITE ? VMSIG_CAP_MEMWRITE : 0u; } /* Source bitmask permitted to hold a lease class: mirrors the grant's source ceiling * (which grant_allows_down enforces on the command itself). Leasing is intercepted * BEFORE grant_allows_down, so source is checked HERE — otherwise a principal without * the required seam could hold someone else's cell (DoS), bypassing source_mask. * INPUT -> SRC_INPUT; POWER -> SRC_INPUT (lifecycle) OR SRC_VMHOST (vm) — one * destructive path suffices; MEMWRITE -> SRC_MEMCTX (lives on the MEMCTX seam). */ static uint32_t source_mask_for_lease_class(int cls) { return cls == VMSIG_LEASE_INPUT ? (1u << VMSIG_SRC_INPUT) : cls == VMSIG_LEASE_POWER ? ((1u << VMSIG_SRC_INPUT) | (1u << VMSIG_SRC_VMHOST)) : cls == VMSIG_LEASE_MEMWRITE ? (1u << VMSIG_SRC_MEMCTX) : 0u; } /* Capability to receive an UP event: address-space context (MEMCTX/MEMCTX_INVALIDATED) * -> CAP_MEMCTX; cursor is screen data, available to a GUI observer (OBSERVE) OR an * input actor (INPUT); otherwise CAP_OBSERVE (frames/SEAM/generic). The grant_allows_up * gate checks intersection, so OBSERVE|INPUT means "either of the two". */ static uint32_t cap_for_up(const vmsig_event* ev) { if (ev->kind == VMSIG_EV_ROSTER) return VMSIG_CAP_ROSTER; /* host-wide inventory */ if (ev->kind == VMSIG_EV_CURSOR_STATE) return VMSIG_CAP_OBSERVE | VMSIG_CAP_INPUT; return (ev->source == VMSIG_SRC_MEMCTX) ? VMSIG_CAP_MEMCTX : VMSIG_CAP_OBSERVE; } static int grant_allows_down(const vmsig_grant* g, const vmsig_event* ev) { if (ev->endpoint >= 64) return 0; /* 64-bit mask: <=64 VMs/cores */ if (!(g->endpoint_mask & (1ull << ev->endpoint))) return 0; if (!(g->source_mask & (1u << ev->source))) return 0; /* source ceiling on DOWN too */ uint32_t need = cap_for_down(ev); return need && (g->cap_mask & need); } static int grant_allows_up(const vmsig_grant* g, const vmsig_event* ev) { if (ev->endpoint >= 64) return 0; if (!(g->cap_mask & cap_for_up(ev))) return 0; if (!(g->endpoint_mask & (1ull << ev->endpoint))) return 0; if (!(g->source_mask & (1u << ev->source))) return 0; return 1; } /* Find an adapter by (endpoint, source). NULL if none. Used by pump_down to route a * DOWN command to its adapter. */ static core_adapter_ent* core_find_adapter(vmsig_core* c, uint32_t endpoint, vmsig_source source) { for (int i = 0; i < c->nadapters; i++) { core_adapter_ent* e = &c->adapters[i]; if (e->active && e->ops->source == source && e->endpoint == endpoint) return e; } return NULL; } /* ===== Lease layer: grant/release/status/finalization/reclaim ===== * Intercepted in core_emit_down BEFORE grant_allows_down (synchronous, not in ctx, does * not touch pending). Addressed UP replies to the initiator via core_emit_up * (origin+generation). */ /* Addressed UP reply to the initiator of a lease request. */ static void lease_reply(vmsig_core* c, const vmsig_event* req, vmsig_kind kind, uint32_t cls, uint32_t reason) { vmsig_event up; memset(&up, 0, sizeof up); up.kind = kind; up.source = VMSIG_SRC_CORE; up.dir = VMSIG_DIR_UP; up.prio = VMSIG_PRIO_URGENT; up.endpoint = req->endpoint; up.origin = req->origin; vmsig_lease_req lr = { cls, reason }; memcpy(up.inln, &lr, sizeof lr); core_emit_up(c, &up); } /* Lease denial: audit (visibility of authorization/contention denials — capability/ * endpoint enumeration via ACQUIRE is observable) + addressed LEASE_DENIED to initiator. */ static void lease_deny(vmsig_core* c, const vmsig_event* req, uint32_t principal, uint32_t cls, uint32_t reason) { vmsig_audit a = { VMSIG_AUDIT_LEASE_DENIED, principal, req->endpoint, cls, reason }; core_audit(c, &a); lease_reply(c, req, VMSIG_EV_LEASE_DENIED, cls, reason); } /* Principal of the cell owner (for STATUS); 0 if owner is dead/absent. */ static uint32_t lease_owner_principal(vmsig_core* c, uint32_t owner) { core_control_ent* e = origin_ctl(c, owner); return e ? e->grant.principal : 0u; } /* IMPORTANT (layer isolation): signaling does NOT release held keys on lease loss and * does NOT track held state at all. held is the ACTUATOR's record (vmctl); release is the * control's decision. On owner change/reset the cell is simply freed; stuck keys remain * the control's concern (it can issue CMD_QUERY_INPUT and release its own while owner). */ void core_lease_acquire(vmsig_core* c, int ctl_id, const vmsig_event* ev) { core_control_ent* e = &c->controls[ctl_id]; uint32_t cls = ((const vmsig_lease_req*)ev->inln)->cls; uint32_t ep = ev->endpoint; /* 1. validate class/endpoint/grant (default-deny; every denial is audited). */ if (cls >= VMSIG_LEASE_CLASS_MAX) { lease_deny(c, ev, e->grant.principal, cls, VMSIG_LEASE_DENY_BADCLASS); return; } if (ep >= 64 || !(e->grant.endpoint_mask & (1ull << ep))) { lease_deny(c, ev, e->grant.principal, cls, VMSIG_LEASE_DENY_NOGRANT); return; } if (!(e->grant.cap_mask & cap_for_lease_class((int)cls))) { lease_deny(c, ev, e->grant.principal, cls, VMSIG_LEASE_DENY_NOCAP); return; } /* source ceiling: holding a class without rights to its seam is forbidden (else a * DoS hold of someone else's cell bypassing source_mask, since interception is * BEFORE grant_allows_down). */ if (!(e->grant.source_mask & source_mask_for_lease_class((int)cls))) { lease_deny(c, ev, e->grant.principal, cls, VMSIG_LEASE_DENY_NOGRANT); return; } core_lease_cell* cell = &c->lease[ep][cls]; uint32_t me = ev->origin; /* 2a. free OR dead owner (origin_ctl==NULL) => take as if free. */ core_control_ent* owner_e = cell->owner ? origin_ctl(c, cell->owner) : NULL; if (cell->owner == 0 || !owner_e) { cell->owner = me; cell->owner_prio = e->grant.arb_prio; vmsig_audit a = { VMSIG_AUDIT_LEASE_GRANTED, e->grant.principal, ep, cls, 0 }; core_audit(c, &a); lease_reply(c, ev, VMSIG_EV_LEASE_GRANTED, cls, 0); return; } /* 2b. owner is the caller itself => idempotent GRANTED. */ if (cell->owner == me) { lease_reply(c, ev, VMSIG_EV_LEASE_GRANTED, cls, 0); return; } /* 2c. held by a LIVE other owner => policy. incumbent is the live grant. */ vmsig_arb_decision dec; if (c->arb_cb) { dec = c->arb_cb(c->arb_ud, ep, cls, &owner_e->grant, &e->grant); } else { dec = (e->grant.arb_prio > cell->owner_prio) ? VMSIG_ARB_PREEMPT : VMSIG_ARB_DENY; } if (dec != VMSIG_ARB_PREEMPT) { /* equal priority => owner keeps it (HELD); strictly lower => LOWER_PRIO. */ uint32_t reason = (e->grant.arb_prio < cell->owner_prio) ? VMSIG_LEASE_DENY_LOWER_PRIO : VMSIG_LEASE_DENY_HELD; lease_deny(c, ev, e->grant.principal, cls, reason); return; } /* PREEMPT: notify the old owner (REVOKED), switch owner, grant to the new one. * signaling does NOT release held keys (that is the control's decision): the * ex-owner is responsible for its stuck keys; the new owner can query held * (CMD_QUERY_INPUT) and release them. */ uint32_t old_owner = cell->owner; { vmsig_event rv; memset(&rv, 0, sizeof rv); rv.endpoint = ep; rv.origin = old_owner; lease_reply(c, &rv, VMSIG_EV_LEASE_REVOKED, cls, 0); } { vmsig_audit a = { VMSIG_AUDIT_LEASE_REVOKED, owner_e->grant.principal, ep, cls, 0 }; core_audit(c, &a); } cell->owner = me; cell->owner_prio = e->grant.arb_prio; { vmsig_audit a = { VMSIG_AUDIT_LEASE_GRANTED, e->grant.principal, ep, cls, 0 }; core_audit(c, &a); } lease_reply(c, ev, VMSIG_EV_LEASE_GRANTED, cls, 0); } void core_lease_release(vmsig_core* c, int ctl_id, const vmsig_event* ev) { core_control_ent* e = &c->controls[ctl_id]; uint32_t cls = ((const vmsig_lease_req*)ev->inln)->cls; uint32_t ep = ev->endpoint; /* cross-endpoint isolation + cap/source gate BEFORE any action (like acquire). */ if (cls >= VMSIG_LEASE_CLASS_MAX || ep >= 64) return; if (!(e->grant.endpoint_mask & (1ull << ep))) return; if (!(e->grant.cap_mask & cap_for_lease_class((int)cls))) return; if (!(e->grant.source_mask & source_mask_for_lease_class((int)cls))) return; core_lease_cell* cell = &c->lease[ep][cls]; if (cell->owner != ev->origin) return; /* not owner => no-op */ /* signaling does NOT release held keys — that is the control's decision (it releases * its own keys before release if needed). Here we only free the cell. */ cell->owner = 0; cell->owner_prio = 0; lease_reply(c, ev, VMSIG_EV_LEASE_RELEASED, cls, 0); } void core_lease_status(vmsig_core* c, int ctl_id, const vmsig_event* ev) { core_control_ent* e = &c->controls[ctl_id]; uint32_t cls = ((const vmsig_lease_req*)ev->inln)->cls; uint32_t ep = ev->endpoint; /* busy-state can be probed only within one's own endpoint and with the class cap * (else a principal without CAP_INPUT/CAP_POWER would leak busy-state/other principal). */ if (cls >= VMSIG_LEASE_CLASS_MAX || ep >= 64) return; if (!(e->grant.endpoint_mask & (1ull << ep))) return; if (!(e->grant.cap_mask & cap_for_lease_class((int)cls))) return; if (!(e->grant.source_mask & source_mask_for_lease_class((int)cls))) return; core_lease_cell* cell = &c->lease[ep][cls]; uint32_t busy = (cell->owner && origin_ctl(c, cell->owner)) ? 1u : 0u; vmsig_event up; memset(&up, 0, sizeof up); up.kind = VMSIG_EV_LEASE_STATUS; up.source = VMSIG_SRC_CORE; up.dir = VMSIG_DIR_UP; up.prio = VMSIG_PRIO_URGENT; up.endpoint = ep; up.origin = ev->origin; vmsig_lease_status st = { cls, busy, busy ? lease_owner_principal(c, cell->owner) : 0u }; memcpy(up.inln, &st, sizeof st); core_emit_up(c, &up); } void core_lease_reap_control(vmsig_core* c, int ctl_id) { /* Clear all cells owned by this (still live) slot, BEFORE active=0. * origin is compared by the slot's current gen (active, gen valid at reap time). */ uint32_t owner = origin_pack(ctl_id, c->controls[ctl_id].gen); for (uint32_t ep = 0; ep < 64; ep++) { for (int cls = 0; cls < VMSIG_LEASE_CLASSES; cls++) { core_lease_cell* cell = &c->lease[ep][cls]; if (cell->owner != owner) continue; /* only free the cell; the dead owner's held keys are NOT our concern (vmctl's * record; the next owner sees them via CMD_QUERY_INPUT and decides itself). */ cell->owner = 0; cell->owner_prio = 0; vmsig_audit a = { VMSIG_AUDIT_LEASE_RECLAIMED, c->controls[ctl_id].grant.principal, ep, (uint32_t)cls, 0 }; core_audit(c, &a); } } } /* Release ALL lease classes held on `endpoint` (from endpoint detach, BEFORE the adapters * close). Symmetric to core_lease_reap_control but keyed by endpoint, not owner: when a VM * disappears its leases must not survive to auto-transfer onto whatever VM later reuses the * same endpoint bit. The owner principal is recorded for the audit. */ static void core_lease_reap_endpoint(vmsig_core* c, uint32_t endpoint) { if (endpoint >= 64) return; for (int cls = 0; cls < VMSIG_LEASE_CLASSES; cls++) { core_lease_cell* cell = &c->lease[endpoint][cls]; if (!cell->owner) continue; uint32_t principal = lease_owner_principal(c, cell->owner); cell->owner = 0; cell->owner_prio = 0; vmsig_audit a = { VMSIG_AUDIT_LEASE_RECLAIMED, principal, endpoint, (uint32_t)cls, 0 }; core_audit(c, &a); } } /* DOWN emit from a control: enforcement against THIS control's grant. */ int core_emit_down(void* token, vmsig_event* ev) { core_down_ctx* d = token; vmsig_core* c = d->core; core_control_ent* e = &c->controls[d->ctl_id]; if (!e->active) { vmsig_payload_release(ev); return -1; } /* Lease arbitration is intercepted HERE (synchronous, not in ctx, does not touch * pending). origin is needed for the addressed reply and as the owner key. */ if (ev->kind == VMSIG_EV_CMD_ACQUIRE || ev->kind == VMSIG_EV_CMD_RELEASE || ev->kind == VMSIG_EV_CMD_LEASE_STATUS) { ev->origin = origin_pack(d->ctl_id, e->gen); if (ev->kind == VMSIG_EV_CMD_ACQUIRE) core_lease_acquire(c, d->ctl_id, ev); else if (ev->kind == VMSIG_EV_CMD_RELEASE) core_lease_release(c, d->ctl_id, ev); else core_lease_status(c, d->ctl_id, ev); vmsig_payload_release(ev); return 0; } if (!grant_allows_down(&e->grant, ev)) { vmsig_audit a = { VMSIG_AUDIT_DOWN_DENIED, e->grant.principal, ev->endpoint, (uint32_t)ev->kind, 0 }; core_audit(c, &a); /* rejected by policy (endpoint/source/class) */ vmsig_payload_release(ev); return -1; } /* Lease GATE: destruction is passed ONLY by the class's current owner. * A non-owner (or an owner whose slot is dead) => drop + audit LEASE_DENIED * (distinguishable from grant-deny). A free cell => also drop: destruction cannot be * used without an explicit lease. Safe/read-only commands (cls<0) are not gated. */ { int cls = lease_class_for_down(ev); if (cls >= 0 && ev->endpoint < 64) { uint32_t me = origin_pack(d->ctl_id, e->gen); uint32_t owner = c->lease[ev->endpoint][cls].owner; if (owner != me || !origin_ctl(c, owner)) { vmsig_audit a = { VMSIG_AUDIT_LEASE_DENIED, e->grant.principal, ev->endpoint, (uint32_t)ev->kind, 0 }; core_audit(c, &a); vmsig_payload_release(ev); return -1; } } } if (e->pending >= VMSIG_DOWN_PENDING_MAX) { /* fairness/DoS: DOWN cap per poller */ vmsig_audit a = { VMSIG_AUDIT_DOWN_DENIED, e->grant.principal, ev->endpoint, (uint32_t)ev->kind, 0 }; core_audit(c, &a); vmsig_payload_release(ev); return -1; } ev->origin = origin_pack(d->ctl_id, e->gen); /* addressed reply + pending accounting */ e->pending++; int r = vmsig_ctx_submit(c->ctx, VMSIG_DIR_DOWN, ev); if (r != 0) e->pending--; /* not enqueued (drop/err) */ core_wake(c); return r; } static int sub_match(const vmsig_sub* sub, const vmsig_event* ev) { if (sub->source_mask && !(sub->source_mask & (1u << ev->source))) return 0; if (ev->prio < sub->prio_min) return 0; if (sub->endpoint_mask) { if (ev->endpoint >= 64 || !(sub->endpoint_mask & (1ull << ev->endpoint))) return 0; } return 1; } /* ===== Address-space context (MEMCTX seam): multicast / retain-replay / epoch ===== * The core vends ONE coherent datum per-endpoint: kcr3+locator paired with an RO-fd. A * MEMCTX trigger from the adapter => the core builds the AUTHORITATIVE locator from the * adapter snapshot (reg.describe) + stamps the epoch (single source of truth) and * distributes to qualified subscribers with re-sharing of the RO-fd. The same path serves * replay to a late subscriber. */ /* Build a MEMCTX delivery event for endpoint ep. segs are borrowed from the adapter's * buffer (delivery is synchronous on the loop thread; ownership is not transferred). * 1 — built. */ static int core_memctx_build(vmsig_core* c, uint32_t ep, vmsig_event* ev) { if (ep >= 64) return 0; core_memctx_cell* cell = &c->memctx[ep]; if (!cell->registered || !cell->reg.describe) return 0; vmsig_memctx pod; memset(&pod, 0, sizeof pod); const vmsig_memseg* segs = NULL; uint32_t nseg = 0; cell->reg.describe(cell->reg.ctx, &pod, &segs, &nseg); pod.epoch = c->epoch[ep]; /* core stamps the epoch */ pod.nseg = nseg; pod.flags |= VMSIG_MEMCTX_RDONLY; /* outward — always read-only */ memset(ev, 0, sizeof *ev); ev->kind = VMSIG_EV_MEMCTX; ev->source = VMSIG_SRC_MEMCTX; ev->dir = VMSIG_DIR_UP; ev->prio = VMSIG_PRIO_NORMAL; ev->endpoint = ep; memcpy(ev->inln, &pod, sizeof pod); ev->payload.data = (void*)segs; /* borrowed: owner is the adapter */ ev->payload.len = (size_t)nseg * sizeof(vmsig_memseg); ev->payload.codec = VMSIG_CODEC_MEMCTX; ev->payload.flags = VMSIG_PL_BORROWED; ev->payload.release = NULL; return 1; } /* Deliver MEMCTX to one qualified control: fresh RO-fd from reg.share_fd * (socket -> cmsg, in-proc -> direct int), attach_memctx, close fd (the core does not own * the fd). On success — audit MEMCTX_GRANTED. */ static void core_memctx_deliver_one(vmsig_core* c, core_memctx_cell* cell, core_control_ent* e, const vmsig_event* ev) { if (!e->ops->attach_memctx) return; /* control does not accept MEMCTX */ int fd = cell->reg.share_fd ? cell->reg.share_fd(cell->reg.ctx) : -1; int r = e->ops->attach_memctx(e->ctl, ev, fd); if (fd >= 0) close(fd); /* the core does not own the ro-fd */ if (r == 0) { vmsig_audit a = { VMSIG_AUDIT_MEMCTX_GRANTED, e->grant.principal, ev->endpoint, 0, 0 }; core_audit(c, &a); } } void core_memctx_route(vmsig_core* c, const vmsig_event* trigger) { uint32_t ep = trigger->endpoint; if (ep >= 64) return; core_memctx_cell* cell = &c->memctx[ep]; if (!cell->registered) return; vmsig_event ev; if (!core_memctx_build(c, ep, &ev)) return; cell->valid = 1; /* epoch context published */ cell->epoch = c->epoch[ep]; for (int i = 0; i < c->ncontrols; i++) { core_control_ent* e = &c->controls[i]; if (!e->active) continue; if (grant_allows_up(&e->grant, &ev) && sub_match(&e->sub, &ev)) core_memctx_deliver_one(c, cell, e, &ev); } } void core_memctx_replay(vmsig_core* c, int ctl_id) { if (ctl_id < 0 || ctl_id >= c->ncontrols) return; core_control_ent* e = &c->controls[ctl_id]; if (!e->active) return; for (uint32_t ep = 0; ep < 64; ep++) { core_memctx_cell* cell = &c->memctx[ep]; if (!cell->registered || !cell->valid) continue; vmsig_event ev; if (!core_memctx_build(c, ep, &ev)) continue; if (grant_allows_up(&e->grant, &ev) && sub_match(&e->sub, &ev)) core_memctx_deliver_one(c, cell, e, &ev); } } /* ===== VM roster (inventory coherence): retain + broadcast + replay-to-late ===== * * Mirrors the MEMCTX retain cell, but the datum is a pure inline POD (no fd, no borrowed * buffer): delivery is the ordinary broadcast (ops->deliver), with NO interception in * pump_up. Publish is SYNCHRONOUS (like core_memctx_route) so a control gets the datum * exactly once: current subscribers via this broadcast, a late one via core_roster_replay. */ static void core_roster_build(uint32_t ep, const vmsig_roster* r, vmsig_event* ev) { memset(ev, 0, sizeof *ev); ev->kind = VMSIG_EV_ROSTER; ev->source = VMSIG_SRC_CORE; ev->dir = VMSIG_DIR_UP; ev->prio = VMSIG_PRIO_URGENT; ev->endpoint = ep; ev->payload.flags = VMSIG_PL_INLINE; memcpy(ev->inln, r, sizeof *r); } void core_roster_publish(vmsig_core* c, uint32_t endpoint, const vmsig_roster* entry) { if (!c || endpoint >= 64 || !entry) return; core_roster_cell* cell = &c->roster[endpoint]; cell->entry = *entry; /* DETACH clears the retained datum (a vacated slot is not replayed to a late subscriber), * but the DETACH event is still broadcast to current subscribers so they drop the VM. */ cell->valid = (entry->action != VMSIG_ROSTER_DETACH); vmsig_event ev; core_roster_build(endpoint, entry, &ev); for (int i = 0; i < c->ncontrols; i++) { core_control_ent* e = &c->controls[i]; if (!e->active || !e->ops->deliver) continue; if (grant_allows_up(&e->grant, &ev) && sub_match(&e->sub, &ev)) e->ops->deliver(e->ctl, &ev); } } void core_roster_replay(vmsig_core* c, int ctl_id) { if (!c || ctl_id < 0 || ctl_id >= c->ncontrols) return; core_control_ent* e = &c->controls[ctl_id]; if (!e->active || !e->ops->deliver) return; for (uint32_t ep = 0; ep < 64; ep++) { core_roster_cell* cell = &c->roster[ep]; if (!cell->valid) continue; vmsig_event ev; core_roster_build(ep, &cell->entry, &ev); if (grant_allows_up(&e->grant, &ev) && sub_match(&e->sub, &ev)) e->ops->deliver(e->ctl, &ev); } } /* Bump the endpoint epoch and broadcast MEMCTX_INVALIDATED to holders. When `rebootstrap` * is set, ask the adapter to re-bootstrap (it re-emits MEMCTX{epoch+1} when ready) — the * normal destructive-lifecycle path. On endpoint TEARDOWN (detach) `rebootstrap` is 0: the * adapter is about to be closed, so kicking a re-bootstrap on a worker we are joining would * be wasted; holders still settle via the INVALIDATED broadcast + the bumped epoch. */ static void core_epoch_invalidate_emit(vmsig_core* c, uint32_t endpoint, int rebootstrap) { if (endpoint >= 64) return; c->epoch[endpoint]++; core_memctx_cell* cell = &c->memctx[endpoint]; cell->valid = 0; /* prior-epoch context is not replayed */ vmsig_event up; memset(&up, 0, sizeof up); up.kind = VMSIG_EV_MEMCTX_INVALIDATED; up.source = VMSIG_SRC_MEMCTX; up.dir = VMSIG_DIR_UP; up.prio = VMSIG_PRIO_URGENT; up.endpoint = endpoint; vmsig_memctx_inv inv = { endpoint, c->epoch[endpoint] }; memcpy(up.inln, &inv, sizeof inv); core_emit_up(c, &up); /* broadcast to holders (CAP_MEMCTX gate) */ if (rebootstrap && cell->registered && cell->reg.invalidate) cell->reg.invalidate(cell->reg.ctx, c->epoch[endpoint]); } void core_epoch_bump(vmsig_core* c, uint32_t endpoint) { core_epoch_invalidate_emit(c, endpoint, 1); /* destructive lifecycle: re-bootstrap */ } /* UP: drain the context queue and dispatch to subscribed controls */ static void pump_up(vmsig_core* c) { vmsig_event ev; while (vmsig_ctx_next(c->ctx, VMSIG_DIR_UP, &ev) == 1) { if (ev.kind == VMSIG_EV_MEMCTX) { /* Context trigger: the core builds the authoritative locator (adapter snapshot * + epoch stamp) and distributes to qualified holders with re-sharing of the * RO-fd. The trigger itself is NOT delivered as an ordinary event. */ core_memctx_route(c, &ev); vmsig_payload_release(&ev); /* inline trigger (release=NULL) — harmless */ continue; } if (ev.kind == VMSIG_EV_VM_LIFECYCLE && ev.origin == 0) { /* Epoch-transition observation: a destructive async transition (VMHOST * broadcast) invalidates the address-space context. NOT continue — VM_LIFECYCLE * still goes to subscribers below via the normal broadcast. */ const vmsig_vm_state* vs = (const vmsig_vm_state*)ev.inln; if (vs->state == VMSIG_VM_RESET || vs->state == VMSIG_VM_POWERDOWN || vs->state == VMSIG_VM_SHUTDOWN) core_epoch_bump(c, ev.endpoint); } if (ev.origin) { /* addressed reply ONLY to the initiator (origin+generation). The command was * already authorized by the grant => we deliver the reply without re-check; if * the initiator is gone/slot reused — we drop (private data, not broadcast). */ core_control_ent* e = origin_ctl(c, ev.origin); if (e && e->ops->deliver) e->ops->deliver(e->ctl, &ev); } else { /* unaddressed event — broadcast; effective = grant ∩ sub */ for (int i = 0; i < c->ncontrols; i++) { core_control_ent* e = &c->controls[i]; if (!e->active) continue; if (grant_allows_up(&e->grant, &ev) && sub_match(&e->sub, &ev) && e->ops->deliver) e->ops->deliver(e->ctl, &ev); } } vmsig_payload_release(&ev); } } /* DOWN: drain the queue and route the command to the adapter (source+endpoint) */ static void pump_down(vmsig_core* c) { vmsig_event ev; while (vmsig_ctx_next(c->ctx, VMSIG_DIR_DOWN, &ev) == 1) { core_control_ent* oe = origin_ctl(c, ev.origin); /* command has left ctx */ if (oe && oe->pending) oe->pending--; /* THE ONLY decrement */ /* In-flight fencing: destruction whose origin is NO LONGER the class owner (lease * lost between the emit_down gate and dequeue) is dropped BEFORE actuation. Does * NOT finalize (finalization is done by acquire/reap) — else a double key-up. * pending is NOT touched here (already decremented above). */ int cls = lease_class_for_down(&ev); if (cls >= 0 && ev.endpoint < 64 && c->lease[ev.endpoint][cls].owner != ev.origin) { /* dropping destruction that lost the lease is observable (origin owner's principal). */ vmsig_audit a = { VMSIG_AUDIT_LEASE_DENIED, lease_owner_principal(c, ev.origin), ev.endpoint, (uint32_t)ev.kind, (uint32_t)cls }; core_audit(c, &a); vmsig_payload_release(&ev); continue; } core_adapter_ent* e = core_find_adapter(c, ev.endpoint, ev.source); if (e && e->ops->submit) e->ops->submit(e->a, &ev); vmsig_payload_release(&ev); } } /* Deferred reap of detached controls: after the batch (safe — not inside their own * on_readable). epoll DEL + mark slot dead + ops->close. */ static void core_reap(vmsig_core* c) { for (int i = 0; i < c->ncontrols; i++) { core_control_ent* e = &c->controls[i]; if (!e->reap || !e->active) continue; if (e->slot) { epoll_ctl(c->epfd, EPOLL_CTL_DEL, e->slot->fd, NULL); e->slot->role = SLOT_DEAD; } core_lease_reap_control(c, i); /* return leases + finalize held BEFORE active=0 */ if (e->ops->close) e->ops->close(e->ctl); e->active = 0; e->reap = 0; } } /* Deferred reap of runtime-detached adapters (after the batch). Two passes: * 1) per-endpoint coherence settle ONCE: release leases + bump epoch / broadcast * MEMCTX_INVALIDATED (no re-bootstrap — we are tearing down). Done while the memctx * cell is still registered. * 2) per-adapter teardown: SEAM_DOWN (close is silent on administrative detach), epoll * DEL + mark slots dead (so the loop never dispatches a half-closed adapter), then * ops->close (joins the worker, closes the SI handle AFTER the join). * Deferred (reap flag set elsewhere) so no live slot is flipped to DEAD inside the batch. */ static void core_reap_adapters(vmsig_core* c) { uint64_t settled = 0; /* endpoints already coherence-settled this pass */ for (int i = 0; i < c->nadapters; i++) { core_adapter_ent* e = &c->adapters[i]; if (!e->reap || !e->active) continue; uint32_t ep = e->endpoint; if (ep < 64 && !(settled & (1ull << ep))) { settled |= (1ull << ep); core_lease_reap_endpoint(c, ep); core_epoch_invalidate_emit(c, ep, 0); /* settle holders; no re-bootstrap */ } } for (int i = 0; i < c->nadapters; i++) { core_adapter_ent* e = &c->adapters[i]; if (!e->reap || !e->active) continue; vmsig_event sd; memset(&sd, 0, sizeof sd); sd.kind = VMSIG_EV_SEAM_DOWN; sd.source = e->ops->source; sd.dir = VMSIG_DIR_UP; sd.prio = VMSIG_PRIO_URGENT; sd.endpoint = e->endpoint; core_emit_up(c, &sd); for (int k = 0; k < e->nslot; k++) { if (!e->slots[k]) continue; epoll_ctl(c->epfd, EPOLL_CTL_DEL, e->slots[k]->fd, NULL); e->slots[k]->role = SLOT_DEAD; } if (e->ops->close) e->ops->close(e->a); e->a = NULL; e->nslot = 0; e->active = 0; e->reap = 0; } } int vmsig_core_run(vmsig_core* c) { if (!c) return -1; struct epoll_event evs[VMSIG_MAX_EVENTS]; while (!__atomic_load_n(&c->stopping, __ATOMIC_ACQUIRE)) { int n = epoll_wait(c->epfd, evs, VMSIG_MAX_EVENTS, -1); if (n < 0) { if (errno == EINTR) continue; return -1; } for (int i = 0; i < n; i++) { core_slot* s = (core_slot*)evs[i].data.ptr; switch (s->role) { case SLOT_WAKEUP: drain_counter_fd(s->fd); /* stopping is checked in while */ break; case SLOT_ADAPTER: if (s->ops->on_readiness) s->ops->on_readiness(s->adapter, s->cookie, evs[i].events); break; case SLOT_CTX_TIMING: drain_counter_fd(s->fd); break; case SLOT_CONTROL: if (s->cops->on_readable) s->cops->on_readable(s->ctl); break; case SLOT_SOURCE: if (s->on_source) s->on_source(s->source_user, evs[i].events); break; case SLOT_DEAD: break; /* detached — ignore */ } } pump_up(c); pump_down(c); core_reap(c); core_reap_adapters(c); } return 0; } void vmsig_core_stop(vmsig_core* c) { if (!c) return; __atomic_store_n(&c->stopping, 1, __ATOMIC_RELEASE); /* cross-thread stop signal */ core_wake(c); }