txg: generalise txg_wait_synced_sig() to txg_wait_synced_flags() (#17284)

txg_wait_synced_sig() is "wait for txg, unless a signal arrives". We
expect that future development will require similar "wait unless X"
behaviour.

This generalises the API as txg_wait_synced_flags(), where the provided
flags describe the events that should cause the call to return.

Instead of a boolean, the return is now an error code, which the caller
can use to know which event caused the call to return.

The existing call to txg_wait_synced_sig() is now
txg_wait_synced_flags(TXG_WAIT_SIGNAL).

Sponsored-by: Klara, Inc.
Sponsored-by: Wasabi Technology, Inc.

Signed-off-by: Rob Norris <robn@despairlabs.com>
Reviewed-by: Allan Jude <allan@klarasystems.com>
Reviewed-by: Alexander Motin <mav@FreeBSD.org>
This commit is contained in:
Rob Norris 2025-05-03 08:29:50 +10:00 committed by GitHub
parent f85c96edf7
commit a7de203c86
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 52 additions and 28 deletions

View File

@ -66,6 +66,20 @@ typedef struct txg_list {
txg_node_t *tl_head[TXG_SIZE]; txg_node_t *tl_head[TXG_SIZE];
} txg_list_t; } txg_list_t;
/*
* Wait flags for txg_wait_synced_flags(). By default (TXG_WAIT_NONE), it will
* wait until the wanted txg is reached, or block forever. Additional flags
* indicate other conditions that the caller is interested in, that will cause
* the wait to break and return an error code describing the condition.
*/
typedef enum {
/* No special flags. Guaranteed to block forever or return 0 */
TXG_WAIT_NONE = 0,
/* If a signal arrives while waiting, abort and return EINTR */
TXG_WAIT_SIGNAL = (1 << 0),
} txg_wait_flag_t;
struct dsl_pool; struct dsl_pool;
extern void txg_init(struct dsl_pool *dp, uint64_t txg); extern void txg_init(struct dsl_pool *dp, uint64_t txg);
@ -86,13 +100,16 @@ extern void txg_kick(struct dsl_pool *dp, uint64_t txg);
* Try to make this happen as soon as possible (eg. kick off any * Try to make this happen as soon as possible (eg. kick off any
* necessary syncs immediately). If txg==0, wait for the currently open * necessary syncs immediately). If txg==0, wait for the currently open
* txg to finish syncing. * txg to finish syncing.
* See txg_wait_flag_t above for a description of how the flags affect the wait.
*/ */
extern void txg_wait_synced(struct dsl_pool *dp, uint64_t txg); extern int txg_wait_synced_flags(struct dsl_pool *dp, uint64_t txg,
txg_wait_flag_t flags);
/* /*
* Wait as above. Returns true if the thread was signaled while waiting. * Traditional form of txg_wait_synced_flags, waits forever.
* Shorthand for VERIFY0(txg_wait_synced_flags(dp, TXG_WAIT_NONE))
*/ */
extern boolean_t txg_wait_synced_sig(struct dsl_pool *dp, uint64_t txg); extern void txg_wait_synced(struct dsl_pool *dp, uint64_t txg);
/* /*
* Wait until the given transaction group, or one after it, is * Wait until the given transaction group, or one after it, is

View File

@ -86,11 +86,18 @@ top:
dmu_tx_commit(tx); dmu_tx_commit(tx);
if (sigfunc != NULL && txg_wait_synced_sig(dp, dst.dst_txg)) { if (sigfunc != NULL) {
err = txg_wait_synced_flags(dp, dst.dst_txg, TXG_WAIT_SIGNAL);
if (err != 0) {
VERIFY3U(err, ==, EINTR);
/* current contract is to call func once */ /* current contract is to call func once */
sigfunc(arg, tx); sigfunc(arg, tx);
sigfunc = NULL; /* in case we're performing an EAGAIN retry */ /* in case we're performing an EAGAIN retry */
sigfunc = NULL;
txg_wait_synced(dp, dst.dst_txg);
} }
} else
txg_wait_synced(dp, dst.dst_txg); txg_wait_synced(dp, dst.dst_txg);
if (dst.dst_error == EAGAIN) { if (dst.dst_error == EAGAIN) {

View File

@ -699,11 +699,13 @@ txg_delay(dsl_pool_t *dp, uint64_t txg, hrtime_t delay, hrtime_t resolution)
mutex_exit(&tx->tx_sync_lock); mutex_exit(&tx->tx_sync_lock);
} }
static boolean_t int
txg_wait_synced_impl(dsl_pool_t *dp, uint64_t txg, boolean_t wait_sig) txg_wait_synced_flags(dsl_pool_t *dp, uint64_t txg, txg_wait_flag_t flags)
{ {
int error = 0;
tx_state_t *tx = &dp->dp_tx; tx_state_t *tx = &dp->dp_tx;
ASSERT0(flags & ~TXG_WAIT_SIGNAL);
ASSERT(!dsl_pool_config_held(dp)); ASSERT(!dsl_pool_config_held(dp));
mutex_enter(&tx->tx_sync_lock); mutex_enter(&tx->tx_sync_lock);
@ -715,13 +717,19 @@ txg_wait_synced_impl(dsl_pool_t *dp, uint64_t txg, boolean_t wait_sig)
dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n", dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
(u_longlong_t)txg, (u_longlong_t)tx->tx_quiesce_txg_waiting, (u_longlong_t)txg, (u_longlong_t)tx->tx_quiesce_txg_waiting,
(u_longlong_t)tx->tx_sync_txg_waiting); (u_longlong_t)tx->tx_sync_txg_waiting);
/*
* Keep pushing util the pool gets to the wanted txg. If something
* else interesting happens, we'll set an error and break out.
*/
while (tx->tx_synced_txg < txg) { while (tx->tx_synced_txg < txg) {
dprintf("broadcasting sync more " dprintf("broadcasting sync more "
"tx_synced=%llu waiting=%llu dp=%px\n", "tx_synced=%llu waiting=%llu dp=%px\n",
(u_longlong_t)tx->tx_synced_txg, (u_longlong_t)tx->tx_synced_txg,
(u_longlong_t)tx->tx_sync_txg_waiting, dp); (u_longlong_t)tx->tx_sync_txg_waiting, dp);
cv_broadcast(&tx->tx_sync_more_cv); cv_broadcast(&tx->tx_sync_more_cv);
if (wait_sig) {
if (flags & TXG_WAIT_SIGNAL) {
/* /*
* Condition wait here but stop if the thread receives a * Condition wait here but stop if the thread receives a
* signal. The caller may call txg_wait_synced*() again * signal. The caller may call txg_wait_synced*() again
@ -729,31 +737,23 @@ txg_wait_synced_impl(dsl_pool_t *dp, uint64_t txg, boolean_t wait_sig)
*/ */
if (cv_wait_io_sig(&tx->tx_sync_done_cv, if (cv_wait_io_sig(&tx->tx_sync_done_cv,
&tx->tx_sync_lock) == 0) { &tx->tx_sync_lock) == 0) {
mutex_exit(&tx->tx_sync_lock); error = SET_ERROR(EINTR);
return (B_TRUE); break;
} }
} else { } else {
/* Uninterruptable wait, until the condvar fires */
cv_wait_io(&tx->tx_sync_done_cv, &tx->tx_sync_lock); cv_wait_io(&tx->tx_sync_done_cv, &tx->tx_sync_lock);
} }
} }
mutex_exit(&tx->tx_sync_lock); mutex_exit(&tx->tx_sync_lock);
return (B_FALSE); return (error);
} }
void void
txg_wait_synced(dsl_pool_t *dp, uint64_t txg) txg_wait_synced(dsl_pool_t *dp, uint64_t txg)
{ {
VERIFY0(txg_wait_synced_impl(dp, txg, B_FALSE)); VERIFY0(txg_wait_synced_flags(dp, txg, TXG_WAIT_NONE));
}
/*
* Similar to a txg_wait_synced but it can be interrupted from a signal.
* Returns B_TRUE if the thread was signaled while waiting.
*/
boolean_t
txg_wait_synced_sig(dsl_pool_t *dp, uint64_t txg)
{
return (txg_wait_synced_impl(dp, txg, B_TRUE));
} }
/* /*

View File

@ -970,8 +970,8 @@ zcp_pool_error(zcp_run_info_t *ri, const char *poolname, int error)
} }
/* /*
* This callback is called when txg_wait_synced_sig encountered a signal. * This callback is called when txg_wait_synced_flags encountered a signal.
* The txg_wait_synced_sig will continue to wait for the txg to complete * The txg_wait_synced_flags will continue to wait for the txg to complete
* after calling this callback. * after calling this callback.
*/ */
static void static void