/* socket.c — out-of-process control over a unix socket. * * The listener registers in the core as a SLOT_SOURCE (listen-fd). On accept the * peer is authenticated via SO_PEERCRED, the policy issues a neutral grant; an empty * grant => the connection is closed (not a valid poller). Otherwise a per-conn * control is created: its fd is driven by the epoll core, DOWN frames are parsed and * dispatched through emit_down (enforced by the grant), UP events are serialized into * a frame. On EOF — deferred reap. * * DOWN framing: every kind is a single fixed vmsig_wire frame, EXCEPT a CMD_MEMWRITE * carrying VMSIG_MW_SRC_PAYLOAD — then mw.len SRC bytes follow the frame (length-prefixed * by the contract's mw.len, no separate wire prefix). The per-conn receiver is a 2-phase * state machine (FRAME -> TAIL): it accumulates the frame, and for a PAYLOAD MEMWRITE it * accumulates the SRC tail into a fixed conn-owned blob, then emits a BORROWED-payload * event. The blob lives in the conn so it outlives the DOWN queue until pump_down copies it. * * DoS protection: per-uid limit of concurrent connections (against eviction of * legitimate ones); a janitor timerfd detaches "stuck" partial frames / SRC tails * (slowloris). The global ceiling and slot reuse live in the core. */ #define _GNU_SOURCE #include "vmsig_socket.h" #include "core_internal.h" /* core_add_source, core_request_drop, add_control */ #include "memctx.h" /* VMSIG_MEMWRITE_MAX: SRC-tail bound (one source of truth) */ #include #include #include #include #include /* umask */ #include #include #include #include #include #include #include #define VMSIG_SOCK_PER_UID_MAX 8 /* concurrent connections per uid */ #define VMSIG_SOCK_IDLE_NS (10ull * 1000000000ull) /* timeout for a stuck partial frame */ #define VMSIG_SOCK_JANITOR_S 5 /* sweep period */ typedef struct sock_listener sock_listener; static uint64_t now_ns(void) { struct timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); return (uint64_t)ts.tv_sec * 1000000000ull + (uint64_t)ts.tv_nsec; } /* ===== wire codec (public — also for external clients) ===== */ void vmsig_wire_encode(vmsig_wire* w, const vmsig_event* ev) { memset(w, 0, sizeof *w); w->magic = VMSIG_WIRE_MAGIC; w->version = VMSIG_WIRE_VERSION; w->kind = ev->kind; w->source = ev->source; w->dir = ev->dir; w->prio = ev->prio; w->endpoint = ev->endpoint; w->corr = ev->corr; memcpy(w->inln, ev->inln, sizeof w->inln); } int vmsig_wire_decode(const vmsig_wire* w, vmsig_event* ev) { if (w->magic != VMSIG_WIRE_MAGIC || w->version != VMSIG_WIRE_VERSION) return -1; memset(ev, 0, sizeof *ev); ev->kind = w->kind; ev->source = w->source; ev->dir = w->dir; ev->prio = w->prio; ev->endpoint = w->endpoint; ev->corr = w->corr; ev->payload.flags = VMSIG_PL_INLINE; memcpy(ev->inln, w->inln, sizeof ev->inln); return 0; } /* ===== per-conn control ===== */ /* DOWN receive phases: read the fixed frame, then (only for a PAYLOAD MEMWRITE) the * length-prefixed SRC tail. State persists in the conn across EPOLLIN (partial recv). */ typedef enum { CONN_RX_FRAME = 0, CONN_RX_TAIL = 1 } conn_rx_phase; typedef struct sock_conn { int fd; vmsig_core* core; int id; uint32_t uid; uint64_t last_ns; /* activity for the janitor */ sock_listener* L; struct sock_conn* lnext; /* listener's connection list */ int (*emit_down)(void* token, vmsig_event*); void* token; uint8_t buf[sizeof(vmsig_wire)]; size_t buflen; conn_rx_phase phase; /* FRAME: read vmsig_wire; TAIL: read SRC tail */ vmsig_event pend; /* decoded frame awaiting its SRC tail */ uint32_t need; /* expected tail length (= mw.len) */ uint32_t got; /* tail bytes already accumulated */ uint8_t blob[VMSIG_MEMWRITE_MAX]; /* SRC tail (BORROWED payload; lives in conn) */ } sock_conn; static int conn_fd(void* ctl) { return ((sock_conn*)ctl)->fd; } static int conn_subscribe(void* ctl, vmsig_sub* out) { (void)ctl; memset(out, 0, sizeof *out); return 0; /* everything; the grant gates it */ } static int conn_deliver(void* ctl, const vmsig_event* ev) { sock_conn* c = ctl; vmsig_wire w; vmsig_wire_encode(&w, ev); ssize_t r = write(c->fd, &w, sizeof w); /* best-effort; EAGAIN => frame dropped */ (void)r; return 0; } static void conn_set_emit_down(void* ctl, int (*emit)(void* token, vmsig_event*), void* token) { sock_conn* c = ctl; c->emit_down = emit; c->token = token; } /* Does this decoded frame pull a length-prefixed SRC tail? Only a CMD_MEMWRITE that * advertises VMSIG_MW_SRC_PAYLOAD. The tail length is mw.len from the contract (already on * the wire in inln) — no separate wire prefix. INLINE / other kinds carry no tail. */ static int frame_pulls_tail(const vmsig_event* ev, uint32_t* need) { if (ev->kind != VMSIG_EV_CMD_MEMWRITE) return 0; const vmsig_memwrite* mw = (const vmsig_memwrite*)ev->inln; if (!(mw->flags & VMSIG_MW_SRC_PAYLOAD)) return 0; *need = mw->len; return 1; } static int conn_on_readable(void* ctl) { sock_conn* c = ctl; for (;;) { if (c->phase == CONN_RX_FRAME) { ssize_t n = read(c->fd, c->buf + c->buflen, sizeof c->buf - c->buflen); if (n == 0) { core_request_drop(c->core, c->id); return 0; } /* EOF */ if (n < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) break; core_request_drop(c->core, c->id); return 0; } c->last_ns = now_ns(); c->buflen += (size_t)n; if (c->buflen != sizeof c->buf) continue; /* frame not whole yet */ c->buflen = 0; vmsig_event ev; if (vmsig_wire_decode((const vmsig_wire*)c->buf, &ev) != 0) continue; /* bad magic/ver — drop frame */ ev.dir = VMSIG_DIR_DOWN; /* from a poller — DOWN only */ uint32_t need = 0; if (!frame_pulls_tail(&ev, &need)) { /* variant A / other kinds */ if (c->emit_down) c->emit_down(c->token, &ev); /* enforced by the grant */ continue; } /* Cap BEFORE reading the tail. A PAYLOAD frame promises EXACTLY mw.len tail bytes * with 1 <= mw.len <= MAX. A zero or over-cap length is a framing-contract * violation: the promised tail cannot be safely consumed (draining an * attacker-chosen length is a DoS) and leaving it unread would desync the stream * (the SRC bytes would be misread as the next frame). Close the connection — a * conformant poller never requests a tail outside [1, MAX]. */ if (need == 0 || need > VMSIG_MEMWRITE_MAX) { core_request_drop(c->core, c->id); return 0; } c->pend = ev; c->need = need; c->got = 0; c->phase = CONN_RX_TAIL; /* fall through to read the tail */ continue; } /* CONN_RX_TAIL: accumulate exactly c->need SRC bytes into the conn-owned blob. */ ssize_t n = read(c->fd, c->blob + c->got, c->need - c->got); if (n == 0) { core_request_drop(c->core, c->id); return 0; } /* EOF */ if (n < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) break; core_request_drop(c->core, c->id); return 0; } c->last_ns = now_ns(); c->got += (uint32_t)n; if (c->got != c->need) continue; /* tail not whole yet */ /* Tail complete: borrow it through the payload. The blob lives in the conn and thus * outlives the DOWN queue until pump_down copies it (mc_submit). release=NULL — the * body belongs to the conn; the adapter copies synchronously on the loop thread. */ c->pend.payload.data = c->blob; c->pend.payload.len = c->need; c->pend.payload.codec = VMSIG_CODEC_MEMCTX; c->pend.payload.flags = VMSIG_PL_BORROWED; c->pend.payload.release= NULL; c->pend.payload.owner = NULL; if (c->emit_down) c->emit_down(c->token, &c->pend); c->phase = CONN_RX_FRAME; c->got = 0; c->need = 0; /* Do NOT reuse c->blob until pump_down has copied it. conn-fd is LEVEL-triggered * (EPOLLIN without EPOLLET), so any remaining bytes re-fire EPOLLIN on the next * pass — break out and let pump_down run first. */ break; } return 0; } /* ===== listener ===== */ struct sock_listener { int listen_fd; int janitor_fd; vmsig_core* core; vmsig_socket_policy policy; void* ud; sock_conn* conns; /* singly-linked list of active connections */ }; static void listener_unlink(sock_listener* L, sock_conn* c) { sock_conn** pp = &L->conns; while (*pp) { if (*pp == c) { *pp = c->lnext; return; } pp = &(*pp)->lnext; } } static int listener_uid_count(sock_listener* L, uint32_t uid) { int n = 0; for (sock_conn* c = L->conns; c; c = c->lnext) if (c->uid == uid) n++; return n; } static void conn_close(void* ctl) { sock_conn* c = ctl; if (c->L) listener_unlink(c->L, c); if (c->fd >= 0) close(c->fd); free(c); } /* Send a SINGLE 80-byte vmsig_wire frame + ONE RO-fd in a cmsg (SCM_RIGHTS). This keeps * the control-socket stream fixed-framed at sizeof(vmsig_wire): the client reads one * frame via recvmsg and extracts the fd only on an fd-carrying frame. Partial cmsg * transfer is not allowed (the fd is all-or-nothing): a short sendmsg -> -1. Shared * primitive for the memctx handoff (one SCM_RIGHTS mechanism). */ static int conn_send_fd_frame(sock_conn* c, const vmsig_wire* w, int fd) { struct iovec iov; iov.iov_base = (void*)w; iov.iov_len = sizeof *w; union { char buf[CMSG_SPACE(sizeof(int))]; struct cmsghdr align; } cm; memset(&cm, 0, sizeof cm); struct msghdr mh; memset(&mh, 0, sizeof mh); mh.msg_iov = &iov; mh.msg_iovlen = 1; mh.msg_control = cm.buf; mh.msg_controllen = sizeof cm.buf; struct cmsghdr* cmsg = CMSG_FIRSTHDR(&mh); cmsg->cmsg_level = SOL_SOCKET; cmsg->cmsg_type = SCM_RIGHTS; cmsg->cmsg_len = CMSG_LEN(sizeof(int)); memcpy(CMSG_DATA(cmsg), &fd, sizeof(int)); for (;;) { ssize_t n = sendmsg(c->fd, &mh, MSG_NOSIGNAL); if (n < 0) { if (errno == EINTR) continue; return -1; } return ((size_t)n == sizeof *w) ? 0 : -1; /* partial frame -> failure */ } } /* Core -> socket-control: handoff of an address-space context (kind=MEMCTX, inln=vmsig_memctx * POD) + RO-fd of the RAM region in a cmsg. The segs payload does NOT go on the wire (the * fixed-framed vmsig_wire carries only inln); the holder opens it at `low`. */ static int conn_attach_memctx(void* ctl, const vmsig_event* ev, int fd) { sock_conn* c = ctl; if (fd < 0 || !ev) return -1; vmsig_wire w; vmsig_wire_encode(&w, ev); /* kind=MEMCTX, inln=vmsig_memctx; payload is not serialized */ return conn_send_fd_frame(c, &w, fd); } static const vmsig_control_ops CONN_OPS = { .name = "socket", .fd = conn_fd, .subscribe = conn_subscribe, .deliver = conn_deliver, .on_readable = conn_on_readable, .set_emit_down = conn_set_emit_down, .close = conn_close, .attach_memctx = conn_attach_memctx }; static void on_accept(void* user, uint32_t events) { (void)events; sock_listener* L = user; for (;;) { int fd = accept4(L->listen_fd, NULL, NULL, SOCK_NONBLOCK | SOCK_CLOEXEC); if (fd < 0) break; /* EAGAIN / other — done */ uint32_t uid = (uint32_t)-1, pid = 0; struct ucred uc; socklen_t ul = sizeof uc; if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &uc, &ul) == 0) { uid = (uint32_t)uc.uid; pid = (uint32_t)uc.pid; } vmsig_grant g; if (L->policy) g = L->policy(uid, pid, L->ud); else memset(&g, 0, sizeof g); if (g.cap_mask == 0 || g.endpoint_mask == 0) { /* not a valid poller */ vmsig_audit a = { VMSIG_AUDIT_REJECT, uid, 0, 0, pid }; core_audit(L->core, &a); close(fd); continue; } if (listener_uid_count(L, uid) >= VMSIG_SOCK_PER_UID_MAX) { /* anti-eviction */ vmsig_audit a = { VMSIG_AUDIT_REJECT, uid, 0, 0, pid }; core_audit(L->core, &a); close(fd); continue; } sock_conn* conn = calloc(1, sizeof *conn); if (!conn) { close(fd); continue; } conn->fd = fd; conn->core = L->core; conn->id = -1; conn->uid = uid; conn->last_ns = now_ns(); conn->L = L; conn->lnext = L->conns; L->conns = conn; int id = vmsig_core_add_control(L->core, &CONN_OPS, conn, &g); if (id < 0) { /* no slot — reject */ vmsig_audit a = { VMSIG_AUDIT_REJECT, uid, 0, 0, pid }; core_audit(L->core, &a); listener_unlink(L, conn); close(fd); free(conn); continue; } conn->id = id; vmsig_audit a = { VMSIG_AUDIT_ADMIT, g.principal, 0, 0, pid }; core_audit(L->core, &a); } } /* janitor: detach connections with a stuck partial frame OR a stuck partial SRC tail * (slowloris). The blob is a conn field, so teardown (free(c) in conn_close) needs no * extra cleanup; a partial tail never emitted an event, so no dangling payload either. */ static void on_janitor(void* user, uint32_t events) { (void)events; sock_listener* L = user; uint64_t v; while (read(L->janitor_fd, &v, sizeof v) == (ssize_t)sizeof v) { /* drain */ } uint64_t now = now_ns(); for (sock_conn* c = L->conns; c; c = c->lnext) { int stuck_frame = (c->buflen > 0); /* partial vmsig_wire */ int stuck_tail = (c->phase == CONN_RX_TAIL && c->got < c->need);/* partial SRC tail */ if ((stuck_frame || stuck_tail) && now - c->last_ns > VMSIG_SOCK_IDLE_NS) core_request_drop(c->core, c->id); } } /* listener cleanup on core_free (owner = the core, via on_free of the first source) */ static void listener_free(void* user) { sock_listener* L = user; if (L->janitor_fd >= 0) close(L->janitor_fd); if (L->listen_fd >= 0) close(L->listen_fd); free(L); } int vmsig_socket_attach(vmsig_core* core, const char* path, vmsig_socket_policy policy, void* ud) { if (!core || !path || !*path) return -1; int fd = socket(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0); if (fd < 0) return -1; struct sockaddr_un addr; memset(&addr, 0, sizeof addr); addr.sun_family = AF_UNIX; socklen_t alen; size_t n = strlen(path); if (path[0] == '@') { /* abstract namespace */ if (n > sizeof addr.sun_path) { close(fd); return -1; } addr.sun_path[0] = 0; memcpy(addr.sun_path + 1, path + 1, n - 1); alen = (socklen_t)(offsetof(struct sockaddr_un, sun_path) + n); } else { /* filesystem path */ if (n >= sizeof addr.sun_path) { close(fd); return -1; } unlink(path); memcpy(addr.sun_path, path, n); alen = (socklen_t)sizeof addr; } /* Create the filesystem socket with restrictive perms (0600): the path must not be * the only gate — connect requires write, so we open it to the owner only. * (An abstract socket has no FS perms; its access is bounded by the net namespace.) */ mode_t old_um = 0; int restrict_perm = (path[0] != '@'); if (restrict_perm) old_um = umask(0177); int br = bind(fd, (struct sockaddr*)&addr, alen); if (restrict_perm) umask(old_um); if (br < 0) { close(fd); return -1; } if (listen(fd, 64) < 0) { close(fd); return -1; } int jfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); if (jfd < 0) { close(fd); return -1; } struct itimerspec its; memset(&its, 0, sizeof its); its.it_interval.tv_sec = VMSIG_SOCK_JANITOR_S; its.it_value = its.it_interval; if (timerfd_settime(jfd, 0, &its, NULL) < 0) { close(jfd); close(fd); return -1; } sock_listener* L = calloc(1, sizeof *L); if (!L) { close(jfd); close(fd); return -1; } L->listen_fd = fd; L->janitor_fd = jfd; L->core = core; L->policy = policy; L->ud = ud; /* the listen source owns the listener (on_free=listener_free closes both fds + free) */ if (core_add_source(core, fd, on_accept, L, listener_free) < 0) { close(jfd); close(fd); free(L); return -1; } /* janitor without on_free (L already belongs to the core); on error core_free releases it */ if (core_add_source(core, jfd, on_janitor, L, NULL) < 0) return -1; return 0; }