/* ctx.c — transfer context: priority, ordering, protocol timing. * This is the SISC-critical seam. No behavioral timing here: commands arrive * already decided by control; the context only orders and paces them. */ #include "ctx_internal.h" #include #include #include #include #include /* Default depth ceiling for a single band (per source,dir) when no policy is set. */ #define VMSIG_CTX_DEFAULT_INFLIGHT 4096 static uint64_t now_ns(void) { struct timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); return (uint64_t)ts.tv_sec * 1000000000ull + (uint64_t)ts.tv_nsec; } /* ---- node recycling (free-list under the shared mutex) ------------------- */ static ev_node* node_get(vmsig_ctx* c) { ev_node* n = c->freelist; if (n) { c->freelist = n->next; return n; } return malloc(sizeof *n); } static void node_put(vmsig_ctx* c, ev_node* n) { n->next = c->freelist; c->freelist = n; } vmsig_ctx* vmsig_ctx_new(void) { vmsig_ctx* c = calloc(1, sizeof *c); if (!c) return NULL; if (pthread_mutex_init(&c->lock, NULL) != 0) { free(c); return NULL; } for (int d = 0; d < 2; d++) { c->dir[d].timing_fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); if (c->dir[d].timing_fd < 0) { for (int k = 0; k < d; k++) close(c->dir[k].timing_fd); pthread_mutex_destroy(&c->lock); free(c); return NULL; } } return c; } void vmsig_ctx_free(vmsig_ctx* c) { if (!c) return; for (int d = 0; d < 2; d++) { for (int p = 0; p < VMSIG_PRIO_MAX; p++) { ev_node* n = c->dir[d].band[p].head; while (n) { ev_node* nx = n->next; vmsig_payload_release(&n->ev); free(n); n = nx; } } if (c->dir[d].timing_fd >= 0) close(c->dir[d].timing_fd); } /* actually free the recycled nodes (no payload attached) */ ev_node* f = c->freelist; while (f) { ev_node* nx = f->next; free(f); f = nx; } pthread_mutex_destroy(&c->lock); free(c); } int vmsig_ctx_set_policy(vmsig_ctx* c, vmsig_source src, vmsig_dir dir, vmsig_prio default_prio, const vmsig_timing* t) { if (!c || src >= VMSIG_SRC_MAX || dir > VMSIG_DIR_DOWN) return -1; pthread_mutex_lock(&c->lock); ctx_policy* pol = &c->policy[src][dir]; pol->default_prio = default_prio; if (t) pol->timing = *t; else memset(&pol->timing, 0, sizeof pol->timing); pol->policy_set = 1; pthread_mutex_unlock(&c->lock); return 0; } static void band_push_tail(ev_band* b, ev_node* n) { n->next = NULL; if (b->tail) b->tail->next = n; else b->head = n; b->tail = n; b->count++; } int vmsig_ctx_submit(vmsig_ctx* c, vmsig_dir dir, vmsig_event* ev) { if (!c || !ev || dir > VMSIG_DIR_DOWN) return -1; vmsig_source src = ev->source < VMSIG_SRC_MAX ? ev->source : VMSIG_SRC_NONE; pthread_mutex_lock(&c->lock); ctx_policy* pol = &c->policy[src][dir]; /* effective priority = max(policy default, emitter request) */ vmsig_prio eff = ev->prio > pol->default_prio ? ev->prio : pol->default_prio; if (eff >= VMSIG_PRIO_MAX) eff = VMSIG_PRIO_MAX - 1; ev->seq = ++c->seq; if (ev->ts_ns == 0) ev->ts_ns = now_ns(); ev->prio = eff; ev_band* band = &c->dir[dir].band[eff]; /* coalescing: a burst of the same kind+endpoint is collapsed (newest wins) */ if (pol->timing.coalesce_ns) { for (ev_node* n = band->head; n; n = n->next) { if (n->ev.kind == ev->kind && n->ev.endpoint == ev->endpoint) { vmsig_payload_release(&n->ev); uint32_t keep_seq = n->ev.seq; /* keep position in the order */ n->ev = *ev; n->ev.seq = keep_seq; pthread_mutex_unlock(&c->lock); return 1; } } } /* backpressure: channel depth is bounded. When no policy is set * (max_inflight==0), a BUILT-IN default ceiling applies (drop newest), * so the queue does not grow without bound under a command flood. */ uint32_t cap = pol->timing.max_inflight ? pol->timing.max_inflight : VMSIG_CTX_DEFAULT_INFLIGHT; uint8_t dp = pol->timing.max_inflight ? pol->timing.drop_policy : VMSIG_DROP_NEWEST; if (band->count >= (int)cap) { if (dp == VMSIG_DROP_OLDEST) { ev_node* old = band->head; /* drop the oldest */ if (old) { band->head = old->next; if (!band->head) band->tail = NULL; band->count--; vmsig_payload_release(&old->ev); node_put(c, old); } } else { /* NEWEST / BLOCK (the loop must not block) — drop the incoming event */ vmsig_payload_release(ev); pthread_mutex_unlock(&c->lock); return 1; } } ev_node* node = node_get(c); if (!node) { pthread_mutex_unlock(&c->lock); return -1; } node->ev = *ev; /* take ownership of the payload */ band_push_tail(band, node); pthread_mutex_unlock(&c->lock); return 0; } int vmsig_ctx_next(vmsig_ctx* c, vmsig_dir dir, vmsig_event* out) { if (!c || !out || dir > VMSIG_DIR_DOWN) return -1; pthread_mutex_lock(&c->lock); ctx_dir* d = &c->dir[dir]; uint64_t now = now_ns(); uint64_t min_rem = 0; int have_rem = 0; /* Walk bands from highest priority to lowest, and within a band from head * to tail, returning the FIRST event "matured" against its protocol min_gap. * A paced source thus waits without blocking ready events of other sources. * Within one source the order is preserved (its earlier events come first). */ for (int p = VMSIG_PRIO_MAX - 1; p >= 0; p--) { ev_band* b = &d->band[p]; ev_node* prev = NULL; ev_node* n = b->head; while (n) { vmsig_source src = n->ev.source < VMSIG_SRC_MAX ? n->ev.source : VMSIG_SRC_NONE; ctx_policy* pol = &c->policy[src][dir]; int due = 1; uint64_t rem = 0; if (pol->timing.min_gap_ns) { uint64_t due_at = pol->last_emit_ns + pol->timing.min_gap_ns; if (now < due_at) { due = 0; rem = due_at - now; } } if (due) { if (prev) prev->next = n->next; else b->head = n->next; if (b->tail == n) b->tail = prev; b->count--; pol->last_emit_ns = now; *out = n->ev; /* payload ownership -> caller */ node_put(c, n); pthread_mutex_unlock(&c->lock); return 1; } if (!have_rem || rem < min_rem) { min_rem = rem; have_rem = 1; } prev = n; n = n->next; } } /* nothing matured: arm the timing-fd for the nearest due time (if any waiting) */ if (have_rem) { struct itimerspec its; memset(&its, 0, sizeof its); its.it_value.tv_sec = (time_t)(min_rem / 1000000000ull); its.it_value.tv_nsec = (long)(min_rem % 1000000000ull); if (its.it_value.tv_sec == 0 && its.it_value.tv_nsec == 0) its.it_value.tv_nsec = 1; timerfd_settime(d->timing_fd, 0, &its, NULL); } pthread_mutex_unlock(&c->lock); return 0; } int vmsig_ctx_timing_fd(vmsig_ctx* c, vmsig_dir dir) { if (!c || dir > VMSIG_DIR_DOWN) return -1; return c->dir[dir].timing_fd; }