vmsig: a neutral signaling layer between sensors/input and controls

An epoll-driven, neutral transfer-event bus that connects sensors and input
actuators to one or more controls, bidirectionally. It owns the transfer context
and events — delivery order, priority, protocol-level timing, and an
interrupt-driven event model over fd sources (eventfd/timerfd/sockets) — and
stays agnostic to both the sensor/input drivers and the control.

What lives here:
- memctx: a coherent address-space context per endpoint — the guest address-space
  root paired with a pre-opened read-only RAM-region fd, with per-endpoint epoch
  invalidation and retained replay to late subscribers. Perception lives in
  out-of-tree sensor libraries that consume this datum read-only.
- exclusive-ownership leases for destructive resource classes (input, power,
  memory-write).
- write-signaled memory writes (MEMWRITE): an atomic write to guest memory routed
  through the seam under an exclusive lease, never a writable mapping.
- a host-management seam for VM lifecycle/status and a neutral input-injection
  command path.
- multi-VM endpoints; capability-gated, audited control authorization over an
  in-process or unix-socket transport.

Builds against headers only by default (a stub mode that exercises the seam
without a VM); armed builds link the real sensor/input libraries behind flags.
This commit is contained in:
2026-06-20 21:21:20 +03:00
commit e9aee057c7
36 changed files with 5820 additions and 0 deletions
+318
View File
@@ -0,0 +1,318 @@
/* socket.c — out-of-process control over a unix socket.
*
* The listener registers in the core as a SLOT_SOURCE (listen-fd). On accept the
* peer is authenticated via SO_PEERCRED, the policy issues a neutral grant; an empty
* grant => the connection is closed (not a valid poller). Otherwise a per-conn
* control is created: its fd is driven by the epoll core, DOWN frames are parsed and
* dispatched through emit_down (enforced by the grant), UP events are serialized into
* a frame. On EOF — deferred reap.
*
* DoS protection: per-uid limit of concurrent connections (against eviction of
* legitimate ones); a janitor timerfd detaches "stuck" partial frames (slowloris).
* The global ceiling and slot reuse live in the core. */
#define _GNU_SOURCE
#include "vmsig_socket.h"
#include "core_internal.h" /* core_add_source, core_request_drop, add_control */
#include <sys/socket.h>
#include <sys/uio.h>
#include <sys/un.h>
#include <sys/timerfd.h>
#include <sys/stat.h> /* umask */
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <stddef.h>
#include <errno.h>
#include <stdint.h>
#include <time.h>
#define VMSIG_SOCK_PER_UID_MAX 8 /* concurrent connections per uid */
#define VMSIG_SOCK_IDLE_NS (10ull * 1000000000ull) /* timeout for a stuck partial frame */
#define VMSIG_SOCK_JANITOR_S 5 /* sweep period */
typedef struct sock_listener sock_listener;
static uint64_t now_ns(void) {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return (uint64_t)ts.tv_sec * 1000000000ull + (uint64_t)ts.tv_nsec;
}
/* ===== wire codec (public — also for external clients) ===== */
void vmsig_wire_encode(vmsig_wire* w, const vmsig_event* ev) {
memset(w, 0, sizeof *w);
w->magic = VMSIG_WIRE_MAGIC; w->version = VMSIG_WIRE_VERSION;
w->kind = ev->kind; w->source = ev->source; w->dir = ev->dir; w->prio = ev->prio;
w->endpoint = ev->endpoint; w->corr = ev->corr;
memcpy(w->inln, ev->inln, sizeof w->inln);
}
int vmsig_wire_decode(const vmsig_wire* w, vmsig_event* ev) {
if (w->magic != VMSIG_WIRE_MAGIC || w->version != VMSIG_WIRE_VERSION) return -1;
memset(ev, 0, sizeof *ev);
ev->kind = w->kind; ev->source = w->source; ev->dir = w->dir; ev->prio = w->prio;
ev->endpoint = w->endpoint; ev->corr = w->corr;
ev->payload.flags = VMSIG_PL_INLINE;
memcpy(ev->inln, w->inln, sizeof ev->inln);
return 0;
}
/* ===== per-conn control ===== */
typedef struct sock_conn {
int fd;
vmsig_core* core;
int id;
uint32_t uid;
uint64_t last_ns; /* activity for the janitor */
sock_listener* L;
struct sock_conn* lnext; /* listener's connection list */
int (*emit_down)(void* token, vmsig_event*);
void* token;
uint8_t buf[sizeof(vmsig_wire)];
size_t buflen;
} sock_conn;
static int conn_fd(void* ctl) { return ((sock_conn*)ctl)->fd; }
static int conn_subscribe(void* ctl, vmsig_sub* out) {
(void)ctl; memset(out, 0, sizeof *out); return 0; /* everything; the grant gates it */
}
static int conn_deliver(void* ctl, const vmsig_event* ev) {
sock_conn* c = ctl;
vmsig_wire w;
vmsig_wire_encode(&w, ev);
ssize_t r = write(c->fd, &w, sizeof w); /* best-effort; EAGAIN => frame dropped */
(void)r;
return 0;
}
static void conn_set_emit_down(void* ctl, int (*emit)(void* token, vmsig_event*), void* token) {
sock_conn* c = ctl; c->emit_down = emit; c->token = token;
}
static int conn_on_readable(void* ctl) {
sock_conn* c = ctl;
for (;;) {
ssize_t n = read(c->fd, c->buf + c->buflen, sizeof c->buf - c->buflen);
if (n == 0) { core_request_drop(c->core, c->id); return 0; } /* EOF */
if (n < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) break;
core_request_drop(c->core, c->id);
return 0;
}
c->last_ns = now_ns();
c->buflen += (size_t)n;
if (c->buflen == sizeof c->buf) {
vmsig_event ev;
if (vmsig_wire_decode((const vmsig_wire*)c->buf, &ev) == 0) {
ev.dir = VMSIG_DIR_DOWN; /* from a poller — DOWN only */
if (c->emit_down) c->emit_down(c->token, &ev); /* enforced by the grant */
}
c->buflen = 0;
}
}
return 0;
}
/* ===== listener ===== */
struct sock_listener {
int listen_fd;
int janitor_fd;
vmsig_core* core;
vmsig_socket_policy policy;
void* ud;
sock_conn* conns; /* singly-linked list of active connections */
};
static void listener_unlink(sock_listener* L, sock_conn* c) {
sock_conn** pp = &L->conns;
while (*pp) { if (*pp == c) { *pp = c->lnext; return; } pp = &(*pp)->lnext; }
}
static int listener_uid_count(sock_listener* L, uint32_t uid) {
int n = 0;
for (sock_conn* c = L->conns; c; c = c->lnext) if (c->uid == uid) n++;
return n;
}
static void conn_close(void* ctl) {
sock_conn* c = ctl;
if (c->L) listener_unlink(c->L, c);
if (c->fd >= 0) close(c->fd);
free(c);
}
/* Send a SINGLE 80-byte vmsig_wire frame + ONE RO-fd in a cmsg (SCM_RIGHTS). This keeps
* the control-socket stream fixed-framed at sizeof(vmsig_wire): the client reads one
* frame via recvmsg and extracts the fd only on an fd-carrying frame. Partial cmsg
* transfer is not allowed (the fd is all-or-nothing): a short sendmsg -> -1. Shared
* primitive for the memctx handoff (one SCM_RIGHTS mechanism). */
static int conn_send_fd_frame(sock_conn* c, const vmsig_wire* w, int fd) {
struct iovec iov;
iov.iov_base = (void*)w;
iov.iov_len = sizeof *w;
union {
char buf[CMSG_SPACE(sizeof(int))];
struct cmsghdr align;
} cm;
memset(&cm, 0, sizeof cm);
struct msghdr mh;
memset(&mh, 0, sizeof mh);
mh.msg_iov = &iov;
mh.msg_iovlen = 1;
mh.msg_control = cm.buf;
mh.msg_controllen = sizeof cm.buf;
struct cmsghdr* cmsg = CMSG_FIRSTHDR(&mh);
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;
cmsg->cmsg_len = CMSG_LEN(sizeof(int));
memcpy(CMSG_DATA(cmsg), &fd, sizeof(int));
for (;;) {
ssize_t n = sendmsg(c->fd, &mh, MSG_NOSIGNAL);
if (n < 0) {
if (errno == EINTR) continue;
return -1;
}
return ((size_t)n == sizeof *w) ? 0 : -1; /* partial frame -> failure */
}
}
/* Core -> socket-control: handoff of an address-space context (kind=MEMCTX, inln=vmsig_memctx
* POD) + RO-fd of the RAM region in a cmsg. The segs payload does NOT go on the wire (the
* fixed-framed vmsig_wire carries only inln); the holder opens it at `low`. */
static int conn_attach_memctx(void* ctl, const vmsig_event* ev, int fd) {
sock_conn* c = ctl;
if (fd < 0 || !ev) return -1;
vmsig_wire w;
vmsig_wire_encode(&w, ev); /* kind=MEMCTX, inln=vmsig_memctx; payload is not serialized */
return conn_send_fd_frame(c, &w, fd);
}
static const vmsig_control_ops CONN_OPS = {
.name = "socket",
.fd = conn_fd, .subscribe = conn_subscribe, .deliver = conn_deliver,
.on_readable = conn_on_readable, .set_emit_down = conn_set_emit_down, .close = conn_close,
.attach_memctx = conn_attach_memctx
};
static void on_accept(void* user, uint32_t events) {
(void)events;
sock_listener* L = user;
for (;;) {
int fd = accept4(L->listen_fd, NULL, NULL, SOCK_NONBLOCK | SOCK_CLOEXEC);
if (fd < 0) break; /* EAGAIN / other — done */
uint32_t uid = (uint32_t)-1, pid = 0;
struct ucred uc; socklen_t ul = sizeof uc;
if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &uc, &ul) == 0) {
uid = (uint32_t)uc.uid; pid = (uint32_t)uc.pid;
}
vmsig_grant g;
if (L->policy) g = L->policy(uid, pid, L->ud);
else memset(&g, 0, sizeof g);
if (g.cap_mask == 0 || g.endpoint_mask == 0) { /* not a valid poller */
vmsig_audit a = { VMSIG_AUDIT_REJECT, uid, 0, 0, pid };
core_audit(L->core, &a);
close(fd);
continue;
}
if (listener_uid_count(L, uid) >= VMSIG_SOCK_PER_UID_MAX) { /* anti-eviction */
vmsig_audit a = { VMSIG_AUDIT_REJECT, uid, 0, 0, pid };
core_audit(L->core, &a);
close(fd);
continue;
}
sock_conn* conn = calloc(1, sizeof *conn);
if (!conn) { close(fd); continue; }
conn->fd = fd; conn->core = L->core; conn->id = -1;
conn->uid = uid; conn->last_ns = now_ns(); conn->L = L;
conn->lnext = L->conns; L->conns = conn;
int id = vmsig_core_add_control(L->core, &CONN_OPS, conn, &g);
if (id < 0) { /* no slot — reject */
vmsig_audit a = { VMSIG_AUDIT_REJECT, uid, 0, 0, pid };
core_audit(L->core, &a);
listener_unlink(L, conn); close(fd); free(conn); continue;
}
conn->id = id;
vmsig_audit a = { VMSIG_AUDIT_ADMIT, g.principal, 0, 0, pid };
core_audit(L->core, &a);
}
}
/* janitor: detach connections with a stuck partial frame (slowloris) */
static void on_janitor(void* user, uint32_t events) {
(void)events;
sock_listener* L = user;
uint64_t v;
while (read(L->janitor_fd, &v, sizeof v) == (ssize_t)sizeof v) { /* drain */ }
uint64_t now = now_ns();
for (sock_conn* c = L->conns; c; c = c->lnext)
if (c->buflen > 0 && now - c->last_ns > VMSIG_SOCK_IDLE_NS)
core_request_drop(c->core, c->id);
}
/* listener cleanup on core_free (owner = the core, via on_free of the first source) */
static void listener_free(void* user) {
sock_listener* L = user;
if (L->janitor_fd >= 0) close(L->janitor_fd);
if (L->listen_fd >= 0) close(L->listen_fd);
free(L);
}
int vmsig_socket_attach(vmsig_core* core, const char* path,
vmsig_socket_policy policy, void* ud) {
if (!core || !path || !*path) return -1;
int fd = socket(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
if (fd < 0) return -1;
struct sockaddr_un addr;
memset(&addr, 0, sizeof addr);
addr.sun_family = AF_UNIX;
socklen_t alen;
size_t n = strlen(path);
if (path[0] == '@') { /* abstract namespace */
if (n > sizeof addr.sun_path) { close(fd); return -1; }
addr.sun_path[0] = 0;
memcpy(addr.sun_path + 1, path + 1, n - 1);
alen = (socklen_t)(offsetof(struct sockaddr_un, sun_path) + n);
} else { /* filesystem path */
if (n >= sizeof addr.sun_path) { close(fd); return -1; }
unlink(path);
memcpy(addr.sun_path, path, n);
alen = (socklen_t)sizeof addr;
}
/* Create the filesystem socket with restrictive perms (0600): the path must not be
* the only gate — connect requires write, so we open it to the owner only.
* (An abstract socket has no FS perms; its access is bounded by the net namespace.) */
mode_t old_um = 0;
int restrict_perm = (path[0] != '@');
if (restrict_perm) old_um = umask(0177);
int br = bind(fd, (struct sockaddr*)&addr, alen);
if (restrict_perm) umask(old_um);
if (br < 0) { close(fd); return -1; }
if (listen(fd, 64) < 0) { close(fd); return -1; }
int jfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
if (jfd < 0) { close(fd); return -1; }
struct itimerspec its;
memset(&its, 0, sizeof its);
its.it_interval.tv_sec = VMSIG_SOCK_JANITOR_S;
its.it_value = its.it_interval;
if (timerfd_settime(jfd, 0, &its, NULL) < 0) { close(jfd); close(fd); return -1; }
sock_listener* L = calloc(1, sizeof *L);
if (!L) { close(jfd); close(fd); return -1; }
L->listen_fd = fd; L->janitor_fd = jfd; L->core = core; L->policy = policy; L->ud = ud;
/* the listen source owns the listener (on_free=listener_free closes both fds + free) */
if (core_add_source(core, fd, on_accept, L, listener_free) < 0) {
close(jfd); close(fd); free(L); return -1;
}
/* janitor without on_free (L already belongs to the core); on error core_free releases it */
if (core_add_source(core, jfd, on_janitor, L, NULL) < 0) return -1;
return 0;
}