ddt: dedup log

Adds a log/journal to dedup. At the end of txg, instead of writing the
entry directly to the ZAP, instead its adding to an in-memory tree and
appended to an on-disk object. The on-disk object is only read at
import, to reload the in-memory tree.

Lookups first go the the log tree before going to the ZAP, so
recently-used entries will remain close by in memory. This vastly
reduces overhead from dedup IO, as it will not have to do so many
read/update/write cycles on ZAP leaf nodes.

A flushing facility is added at end of txg, to push logged entries out
to the ZAP. There's actually two separate "logs" (in-memory tree and
on-disk object), one active (recieving updated entries) and one flushing
(writing out to disk). These are swapped (ie flushing begins) based on
memory used by the in-memory log trees and time since we last flushed
something.

The flushing facility monitors the amount of entries coming in and being
flushed out, and calibrates itself to try to flush enough each txg to
keep up with the ingest rate without competing too much with other IO.
Multiple tuneables are provided to control the flushing facility.

All the histograms and stats are update to accomodate the log as a
separate entry store. zdb gains knowledge of how to count them and dump
them. Documentation included!

Reviewed-by: Alexander Motin <mav@FreeBSD.org>
Reviewed-by: Brian Behlendorf <behlendorf1@llnl.gov>
Co-authored-by: Allan Jude <allan@klarasystems.com>
Signed-off-by: Rob Norris <rob.norris@klarasystems.com>
Sponsored-by: Klara, Inc.
Sponsored-by: iXsystems, Inc.
Closes #15895
This commit is contained in:
Rob Norris
2023-06-22 17:46:22 +10:00
committed by Brian Behlendorf
parent cbb9ef0a4c
commit cd69ba3d49
17 changed files with 1621 additions and 131 deletions
+524 -122
View File
@@ -125,6 +125,28 @@
* without which, no space would be recovered and the DDT would continue to be
* considered "over quota". See zap_shrink_enabled.
*
* ## Dedup log
*
* Historically, all entries modified on a txg were written back to dedup
* storage objects at the end of every txg. This could cause significant
* overheads, as each entry only takes up a tiny portion of a ZAP leaf node,
* and so required reading the whole node, updating the entry, and writing it
* back. On busy pools, this could add serious IO and memory overheads.
*
* To address this, the dedup log was added. If the "fast_dedup" feature is
* enabled, at the end of each txg, modified entries will be copied to an
* in-memory "log" object (ddt_log_t), and appended to an on-disk log. If the
* same block is requested again, the in-memory object will be checked first,
* and if its there, the entry inflated back onto the live tree without going
* to storage. The on-disk log is only read at pool import time, to reload the
* in-memory log.
*
* Each txg, some amount of the in-memory log will be flushed out to a DDT
* storage object (ie ZAP) as normal. OpenZFS will try hard to flush enough to
* keep up with the rate of change on dedup entries, but not so much that it
* would impact overall throughput, and not using too much memory. See the
* zfs_dedup_log_* tuneables in zfs(4) for more details.
*
* ## Repair IO
*
* If a read on a dedup block fails, but there are other copies of the block in
@@ -201,6 +223,26 @@ int zfs_dedup_prefetch = 0;
uint_t dedup_class_wait_txgs = 5;
/*
* Don't do more than this many incremental flush passes per txg.
*/
uint_t zfs_dedup_log_flush_passes_max = 8;
/*
* Minimum time to flush per txg.
*/
uint_t zfs_dedup_log_flush_min_time_ms = 1000;
/*
* Minimum entries to flush per txg.
*/
uint_t zfs_dedup_log_flush_entries_min = 1000;
/*
* Number of txgs to average flow rates across.
*/
uint_t zfs_dedup_log_flush_flow_rate_txgs = 10;
static const ddt_ops_t *const ddt_ops[DDT_TYPES] = {
&ddt_zap_ops,
};
@@ -217,7 +259,7 @@ static const char *const ddt_class_name[DDT_CLASSES] = {
*/
static const uint64_t ddt_version_flags[] = {
[DDT_VERSION_LEGACY] = 0,
[DDT_VERSION_FDT] = DDT_FLAG_FLAT,
[DDT_VERSION_FDT] = DDT_FLAG_FLAT | DDT_FLAG_LOG,
};
/* Dummy version to signal that configure is still necessary */
@@ -405,13 +447,13 @@ ddt_object_prefetch_all(ddt_t *ddt, ddt_type_t type, ddt_class_t class)
static int
ddt_object_update(ddt_t *ddt, ddt_type_t type, ddt_class_t class,
ddt_entry_t *dde, dmu_tx_t *tx)
const ddt_lightweight_entry_t *ddlwe, dmu_tx_t *tx)
{
ASSERT(ddt_object_exists(ddt, type, class));
return (ddt_ops[type]->ddt_op_update(ddt->ddt_os,
ddt->ddt_object[type][class], &dde->dde_key,
dde->dde_phys, DDT_PHYS_SIZE(ddt), tx));
ddt->ddt_object[type][class], &ddlwe->ddlwe_key,
&ddlwe->ddlwe_phys, DDT_PHYS_SIZE(ddt), tx));
}
static int
@@ -701,16 +743,15 @@ ddt_phys_refcnt(const ddt_univ_phys_t *ddp, ddt_phys_variant_t v)
}
uint64_t
ddt_phys_total_refcnt(const ddt_t *ddt, const ddt_entry_t *dde)
ddt_phys_total_refcnt(const ddt_t *ddt, const ddt_univ_phys_t *ddp)
{
uint64_t refcnt = 0;
if (ddt->ddt_flags & DDT_FLAG_FLAT) {
refcnt = dde->dde_phys->ddp_flat.ddp_refcnt;
} else {
for (int p = DDT_PHYS_SINGLE; p <= DDT_PHYS_TRIPLE; p++)
refcnt += dde->dde_phys->ddp_trad[p].ddp_refcnt;
}
if (ddt->ddt_flags & DDT_FLAG_FLAT)
refcnt = ddp->ddp_flat.ddp_refcnt;
else
for (int v = DDT_PHYS_SINGLE; v <= DDT_PHYS_TRIPLE; v++)
refcnt += ddp->ddp_trad[v].ddp_refcnt;
return (refcnt);
}
@@ -743,11 +784,15 @@ ddt_init(void)
DDT_ENTRY_FLAT_SIZE, 0, NULL, NULL, NULL, NULL, NULL, 0);
ddt_entry_trad_cache = kmem_cache_create("ddt_entry_trad_cache",
DDT_ENTRY_TRAD_SIZE, 0, NULL, NULL, NULL, NULL, NULL, 0);
ddt_log_init();
}
void
ddt_fini(void)
{
ddt_log_fini();
kmem_cache_destroy(ddt_entry_trad_cache);
kmem_cache_destroy(ddt_entry_flat_cache);
kmem_cache_destroy(ddt_cache);
@@ -805,6 +850,13 @@ ddt_remove(ddt_t *ddt, ddt_entry_t *dde)
{
ASSERT(MUTEX_HELD(&ddt->ddt_lock));
/* Entry is still in the log, so charge the entry back to it */
if (dde->dde_flags & DDE_FLAG_LOGGED) {
ddt_lightweight_entry_t ddlwe;
DDT_ENTRY_TO_LIGHTWEIGHT(ddt, dde, &ddlwe);
ddt_histogram_add_entry(ddt, &ddt->ddt_log_histogram, &ddlwe);
}
avl_remove(&ddt->ddt_tree, dde);
ddt_free(ddt, dde);
}
@@ -951,6 +1003,25 @@ ddt_lookup(ddt_t *ddt, const blkptr_t *bp)
avl_insert(&ddt->ddt_tree, dde, where);
/* If its in the log tree, we can "load" it from there */
if (ddt->ddt_flags & DDT_FLAG_LOG) {
ddt_lightweight_entry_t ddlwe;
if (ddt_log_take_key(ddt, ddt->ddt_log_active,
&search, &ddlwe) ||
ddt_log_take_key(ddt, ddt->ddt_log_flushing,
&search, &ddlwe)) {
dde->dde_flags = DDE_FLAG_LOADED | DDE_FLAG_LOGGED;
dde->dde_type = ddlwe.ddlwe_type;
dde->dde_class = ddlwe.ddlwe_class;
memcpy(dde->dde_phys, &ddlwe.ddlwe_phys,
DDT_PHYS_SIZE(ddt));
return (dde);
}
}
/*
* ddt_tree is now stable, so unlock and let everyone else keep moving.
* Anyone landing on this entry will find it without DDE_FLAG_LOADED,
@@ -993,10 +1064,14 @@ ddt_lookup(ddt_t *ddt, const blkptr_t *bp)
dde->dde_flags |= DDE_FLAG_OVERQUOTA;
} else if (error == 0) {
/*
* The histograms only track inactive (stored) blocks.
* The histograms only track inactive (stored or logged) blocks.
* We've just put an entry onto the live list, so we need to
* remove its counts. When its synced back, it'll be re-added
* to the right one.
*
* We only do this when we successfully found it in the store.
* error == ENOENT means this is a new entry, and so its already
* not counted.
*/
ddt_histogram_t *ddh =
&ddt->ddt_histogram[dde->dde_type][dde->dde_class];
@@ -1099,6 +1174,8 @@ ddt_destroy_dir(ddt_t *ddt, dmu_tx_t *tx)
}
}
ddt_log_destroy(ddt, tx);
uint64_t count;
ASSERT0(zap_count(ddt->ddt_os, ddt->ddt_dir_object, &count));
ASSERT0(zap_contains(ddt->ddt_os, ddt->ddt_dir_object,
@@ -1241,23 +1318,26 @@ ddt_table_alloc(spa_t *spa, enum zio_checksum c)
ddt = kmem_cache_alloc(ddt_cache, KM_SLEEP);
memset(ddt, 0, sizeof (ddt_t));
mutex_init(&ddt->ddt_lock, NULL, MUTEX_DEFAULT, NULL);
avl_create(&ddt->ddt_tree, ddt_key_compare,
sizeof (ddt_entry_t), offsetof(ddt_entry_t, dde_node));
avl_create(&ddt->ddt_repair_tree, ddt_key_compare,
sizeof (ddt_entry_t), offsetof(ddt_entry_t, dde_node));
ddt->ddt_checksum = c;
ddt->ddt_spa = spa;
ddt->ddt_os = spa->spa_meta_objset;
ddt->ddt_version = DDT_VERSION_UNCONFIGURED;
ddt_log_alloc(ddt);
return (ddt);
}
static void
ddt_table_free(ddt_t *ddt)
{
ddt_log_free(ddt);
ASSERT0(avl_numnodes(&ddt->ddt_tree));
ASSERT0(avl_numnodes(&ddt->ddt_repair_tree));
avl_destroy(&ddt->ddt_tree);
@@ -1310,6 +1390,10 @@ ddt_load(spa_t *spa)
}
}
error = ddt_log_load(ddt);
if (error != 0 && error != ENOENT)
return (error);
/*
* Seed the cached histograms.
*/
@@ -1483,109 +1567,15 @@ ddt_repair_table(ddt_t *ddt, zio_t *rio)
}
static void
ddt_sync_entry(ddt_t *ddt, ddt_entry_t *dde, dmu_tx_t *tx, uint64_t txg)
ddt_sync_update_stats(ddt_t *ddt, dmu_tx_t *tx)
{
dsl_pool_t *dp = ddt->ddt_spa->spa_dsl_pool;
ddt_key_t *ddk = &dde->dde_key;
ddt_type_t otype = dde->dde_type;
ddt_type_t ntype = DDT_TYPE_DEFAULT;
ddt_class_t oclass = dde->dde_class;
ddt_class_t nclass;
uint64_t total_refcnt = 0;
ASSERT(dde->dde_flags & DDE_FLAG_LOADED);
for (int p = 0; p < DDT_NPHYS(ddt); p++) {
ASSERT(dde->dde_io == NULL ||
dde->dde_io->dde_lead_zio[p] == NULL);
ddt_univ_phys_t *ddp = dde->dde_phys;
ddt_phys_variant_t v = DDT_PHYS_VARIANT(ddt, p);
uint64_t phys_refcnt = ddt_phys_refcnt(ddp, v);
if (ddt_phys_birth(ddp, v) == 0) {
ASSERT0(phys_refcnt);
continue;
}
if (DDT_PHYS_IS_DITTO(ddt, p)) {
/*
* Note, we no longer create DDT-DITTO blocks, but we
* don't want to leak any written by older software.
*/
ddt_phys_free(ddt, ddk, ddp, v, txg);
continue;
}
if (phys_refcnt == 0)
ddt_phys_free(ddt, ddk, ddp, v, txg);
total_refcnt += phys_refcnt;
}
if (total_refcnt > 1)
nclass = DDT_CLASS_DUPLICATE;
else
nclass = DDT_CLASS_UNIQUE;
if (otype != DDT_TYPES &&
(otype != ntype || oclass != nclass || total_refcnt == 0)) {
VERIFY0(ddt_object_remove(ddt, otype, oclass, ddk, tx));
ASSERT3U(
ddt_object_contains(ddt, otype, oclass, ddk), ==, ENOENT);
}
if (total_refcnt != 0) {
dde->dde_type = ntype;
dde->dde_class = nclass;
if (!ddt_object_exists(ddt, ntype, nclass))
ddt_object_create(ddt, ntype, nclass, tx);
VERIFY0(ddt_object_update(ddt, ntype, nclass, dde, tx));
ddt_lightweight_entry_t ddlwe;
DDT_ENTRY_TO_LIGHTWEIGHT(ddt, dde, &ddlwe);
ddt_histogram_t *ddh =
&ddt->ddt_histogram[ntype][nclass];
ddt_histogram_add_entry(ddt, ddh, &ddlwe);
/*
* If the class changes, the order that we scan this bp
* changes. If it decreases, we could miss it, so
* scan it right now. (This covers both class changing
* while we are doing ddt_walk(), and when we are
* traversing.)
*/
if (nclass < oclass) {
dsl_scan_ddt_entry(dp->dp_scan,
ddt->ddt_checksum, ddt, &ddlwe, tx);
}
}
}
static void
ddt_sync_table(ddt_t *ddt, dmu_tx_t *tx, uint64_t txg)
{
spa_t *spa = ddt->ddt_spa;
ddt_entry_t *dde;
void *cookie = NULL;
if (avl_numnodes(&ddt->ddt_tree) == 0)
return;
ASSERT3U(spa->spa_uberblock.ub_version, >=, SPA_VERSION_DEDUP);
if (spa->spa_ddt_stat_object == 0) {
spa->spa_ddt_stat_object = zap_create_link(ddt->ddt_os,
DMU_OT_DDT_STATS, DMU_POOL_DIRECTORY_OBJECT,
DMU_POOL_DDT_STATS, tx);
}
if (ddt->ddt_version == DDT_VERSION_FDT && ddt->ddt_dir_object == 0)
ddt_create_dir(ddt, tx);
while ((dde = avl_destroy_nodes(&ddt->ddt_tree, &cookie)) != NULL) {
ddt_sync_entry(ddt, dde, tx, txg);
ddt_free(ddt, dde);
}
/*
* Count all the entries stored for each type/class, and updates the
* stats within (ddt_object_sync()). If there's no entries for the
* type/class, the whole object is removed. If all objects for the DDT
* are removed, its containing dir is removed, effectively resetting
* the entire DDT to an empty slate.
*/
uint64_t count = 0;
for (ddt_type_t type = 0; type < DDT_TYPES; type++) {
uint64_t add, tcount = 0;
@@ -1604,6 +1594,12 @@ ddt_sync_table(ddt_t *ddt, dmu_tx_t *tx, uint64_t txg)
count += tcount;
}
if (ddt->ddt_flags & DDT_FLAG_LOG) {
/* Include logged entries in the total count */
count += avl_numnodes(&ddt->ddt_log_active->ddl_tree);
count += avl_numnodes(&ddt->ddt_log_flushing->ddl_tree);
}
if (count == 0) {
/*
* No entries left on the DDT, so reset the version for next
@@ -1620,8 +1616,398 @@ ddt_sync_table(ddt_t *ddt, dmu_tx_t *tx, uint64_t txg)
memcpy(&ddt->ddt_histogram_cache, ddt->ddt_histogram,
sizeof (ddt->ddt_histogram));
spa->spa_dedup_dspace = ~0ULL;
spa->spa_dedup_dsize = ~0ULL;
ddt->ddt_spa->spa_dedup_dspace = ~0ULL;
ddt->ddt_spa->spa_dedup_dsize = ~0ULL;
}
static void
ddt_sync_scan_entry(ddt_t *ddt, ddt_lightweight_entry_t *ddlwe, dmu_tx_t *tx)
{
dsl_pool_t *dp = ddt->ddt_spa->spa_dsl_pool;
/*
* Compute the target class, so we can decide whether or not to inform
* the scrub traversal (below). Note that we don't store this in the
* entry, as it might change multiple times before finally being
* committed (if we're logging). Instead, we recompute it in
* ddt_sync_entry().
*/
uint64_t refcnt = ddt_phys_total_refcnt(ddt, &ddlwe->ddlwe_phys);
ddt_class_t nclass =
(refcnt > 1) ? DDT_CLASS_DUPLICATE : DDT_CLASS_UNIQUE;
/*
* If the class changes, the order that we scan this bp changes. If it
* decreases, we could miss it, so scan it right now. (This covers both
* class changing while we are doing ddt_walk(), and when we are
* traversing.)
*
* We also do this when the refcnt goes to zero, because that change is
* only in the log so far; the blocks on disk won't be freed until
* the log is flushed, and the refcnt might increase before that. If it
* does, then we could miss it in the same way.
*/
if (refcnt == 0 || nclass < ddlwe->ddlwe_class)
dsl_scan_ddt_entry(dp->dp_scan, ddt->ddt_checksum, ddt,
ddlwe, tx);
}
static void
ddt_sync_flush_entry(ddt_t *ddt, ddt_lightweight_entry_t *ddlwe,
ddt_type_t otype, ddt_class_t oclass, dmu_tx_t *tx)
{
ddt_key_t *ddk = &ddlwe->ddlwe_key;
ddt_type_t ntype = DDT_TYPE_DEFAULT;
uint64_t refcnt = 0;
/*
* Compute the total refcnt. Along the way, issue frees for any DVAs
* we no longer want.
*/
for (int p = 0; p < DDT_NPHYS(ddt); p++) {
ddt_univ_phys_t *ddp = &ddlwe->ddlwe_phys;
ddt_phys_variant_t v = DDT_PHYS_VARIANT(ddt, p);
uint64_t phys_refcnt = ddt_phys_refcnt(ddp, v);
if (ddt_phys_birth(ddp, v) == 0) {
ASSERT3U(phys_refcnt, ==, 0);
continue;
}
if (DDT_PHYS_IS_DITTO(ddt, p)) {
/*
* We don't want to keep any obsolete slots (eg ditto),
* regardless of their refcount, but we don't want to
* leak them either. So, free them.
*/
ddt_phys_free(ddt, ddk, ddp, v, tx->tx_txg);
continue;
}
if (phys_refcnt == 0)
/* No remaining references, free it! */
ddt_phys_free(ddt, ddk, ddp, v, tx->tx_txg);
refcnt += phys_refcnt;
}
/* Select the best class for the entry. */
ddt_class_t nclass =
(refcnt > 1) ? DDT_CLASS_DUPLICATE : DDT_CLASS_UNIQUE;
/*
* If an existing entry changed type or class, or its refcount reached
* zero, delete it from the DDT object
*/
if (otype != DDT_TYPES &&
(otype != ntype || oclass != nclass || refcnt == 0)) {
VERIFY0(ddt_object_remove(ddt, otype, oclass, ddk, tx));
ASSERT(ddt_object_contains(ddt, otype, oclass, ddk) == ENOENT);
}
/*
* Add or update the entry
*/
if (refcnt != 0) {
ddt_histogram_t *ddh =
&ddt->ddt_histogram[ntype][nclass];
ddt_histogram_add_entry(ddt, ddh, ddlwe);
if (!ddt_object_exists(ddt, ntype, nclass))
ddt_object_create(ddt, ntype, nclass, tx);
VERIFY0(ddt_object_update(ddt, ntype, nclass, ddlwe, tx));
}
}
/* Calculate an exponential weighted moving average, lower limited to zero */
static inline int32_t
_ewma(int32_t val, int32_t prev, uint32_t weight)
{
ASSERT3U(val, >=, 0);
ASSERT3U(prev, >=, 0);
const int32_t new =
MAX(0, prev + (val-prev) / (int32_t)MAX(weight, 1));
ASSERT3U(new, >=, 0);
return (new);
}
/* Returns true if done for this txg */
static boolean_t
ddt_sync_flush_log_incremental(ddt_t *ddt, dmu_tx_t *tx)
{
if (ddt->ddt_flush_pass == 0) {
if (spa_sync_pass(ddt->ddt_spa) == 1) {
/* First run this txg, get set up */
ddt->ddt_flush_start = gethrtime();
ddt->ddt_flush_count = 0;
/*
* How many entries we need to flush. We want to at
* least match the ingest rate.
*/
ddt->ddt_flush_min = MAX(
ddt->ddt_log_ingest_rate,
zfs_dedup_log_flush_entries_min);
} else {
/* We already decided we're done for this txg */
return (B_FALSE);
}
} else if (ddt->ddt_flush_pass == spa_sync_pass(ddt->ddt_spa)) {
/*
* We already did some flushing on this pass, skip it. This
* happens when dsl_process_async_destroys() runs during a scan
* (on pass 1) and does an additional ddt_sync() to update
* freed blocks.
*/
return (B_FALSE);
}
if (spa_sync_pass(ddt->ddt_spa) >
MAX(zfs_dedup_log_flush_passes_max, 1)) {
/* Too many passes this txg, defer until next. */
ddt->ddt_flush_pass = 0;
return (B_TRUE);
}
if (avl_is_empty(&ddt->ddt_log_flushing->ddl_tree)) {
/* Nothing to flush, done for this txg. */
ddt->ddt_flush_pass = 0;
return (B_TRUE);
}
uint64_t target_time = txg_sync_waiting(ddt->ddt_spa->spa_dsl_pool) ?
MIN(MSEC2NSEC(zfs_dedup_log_flush_min_time_ms),
SEC2NSEC(zfs_txg_timeout)) : SEC2NSEC(zfs_txg_timeout);
uint64_t elapsed_time = gethrtime() - ddt->ddt_flush_start;
if (elapsed_time >= target_time) {
/* Too long since we started, done for this txg. */
ddt->ddt_flush_pass = 0;
return (B_TRUE);
}
ddt->ddt_flush_pass++;
ASSERT3U(spa_sync_pass(ddt->ddt_spa), ==, ddt->ddt_flush_pass);
/*
* Estimate how much time we'll need to flush the remaining entries
* based on how long it normally takes.
*/
uint32_t want_time;
if (ddt->ddt_flush_pass == 1) {
/* First pass, use the average time/entries */
if (ddt->ddt_log_flush_rate == 0)
/* Zero rate, just assume the whole time */
want_time = target_time;
else
want_time = ddt->ddt_flush_min *
ddt->ddt_log_flush_time_rate /
ddt->ddt_log_flush_rate;
} else {
/* Later pass, calculate from this txg so far */
want_time = ddt->ddt_flush_min *
elapsed_time / ddt->ddt_flush_count;
}
/* Figure out how much time we have left */
uint32_t remain_time = target_time - elapsed_time;
/* Smear the remaining entries over the remaining passes. */
uint32_t nentries = ddt->ddt_flush_min /
(MAX(1, zfs_dedup_log_flush_passes_max) + 1 - ddt->ddt_flush_pass);
if (want_time > remain_time) {
/*
* We're behind; try to catch up a bit by doubling the amount
* this pass. If we're behind that means we're in a later
* pass and likely have most of the remaining time to
* ourselves. If we're in the last couple of passes, then
* doubling might just take us over the timeout, but probably
* not be much, and it stops us falling behind. If we're
* in the middle passes, there'll be more to do, but it
* might just help us catch up a bit and we'll recalculate on
* the next pass anyway.
*/
nentries = MIN(ddt->ddt_flush_min, nentries*2);
}
ddt_lightweight_entry_t ddlwe;
uint32_t count = 0;
while (ddt_log_take_first(ddt, ddt->ddt_log_flushing, &ddlwe)) {
ddt_sync_flush_entry(ddt, &ddlwe,
ddlwe.ddlwe_type, ddlwe.ddlwe_class, tx);
/* End this pass if we've synced as much as we need to. */
if (++count >= nentries)
break;
}
ddt->ddt_flush_count += count;
ddt->ddt_flush_min -= count;
if (avl_is_empty(&ddt->ddt_log_flushing->ddl_tree)) {
/* We emptied it, so truncate on-disk */
ddt_log_truncate(ddt, tx);
/* No more passes needed this txg */
ddt->ddt_flush_pass = 0;
} else
/* More to do next time, save checkpoint */
ddt_log_checkpoint(ddt, &ddlwe, tx);
ddt_sync_update_stats(ddt, tx);
return (ddt->ddt_flush_pass == 0);
}
static void
ddt_sync_flush_log(ddt_t *ddt, dmu_tx_t *tx)
{
ASSERT(avl_is_empty(&ddt->ddt_tree));
/* Don't do any flushing when the pool is ready to shut down */
if (tx->tx_txg > spa_final_dirty_txg(ddt->ddt_spa))
return;
/* Try to flush some. */
if (!ddt_sync_flush_log_incremental(ddt, tx))
/* More to do next time */
return;
/* No more flushing this txg, so we can do end-of-txg housekeeping */
if (avl_is_empty(&ddt->ddt_log_flushing->ddl_tree) &&
!avl_is_empty(&ddt->ddt_log_active->ddl_tree)) {
/*
* No more to flush, and the active list has stuff, so
* try to swap the logs for next time.
*/
(void) ddt_log_swap(ddt, tx);
}
/*
* Update flush rate. This is an exponential weighted moving average of
* the number of entries flushed over recent txgs.
*/
ddt->ddt_log_flush_rate = _ewma(
ddt->ddt_flush_count, ddt->ddt_log_flush_rate,
zfs_dedup_log_flush_flow_rate_txgs);
/*
* Update flush time rate. This is an exponential weighted moving
* average of the total time taken to flush over recent txgs.
*/
ddt->ddt_log_flush_time_rate = _ewma(
ddt->ddt_log_flush_time_rate,
((int32_t)(NSEC2MSEC(gethrtime() - ddt->ddt_flush_start))),
zfs_dedup_log_flush_flow_rate_txgs);
}
static void
ddt_sync_table_log(ddt_t *ddt, dmu_tx_t *tx)
{
uint64_t count = avl_numnodes(&ddt->ddt_tree);
if (count > 0) {
ddt_log_update_t dlu = {0};
ddt_log_begin(ddt, count, tx, &dlu);
ddt_entry_t *dde;
void *cookie = NULL;
ddt_lightweight_entry_t ddlwe;
while ((dde =
avl_destroy_nodes(&ddt->ddt_tree, &cookie)) != NULL) {
ASSERT(dde->dde_flags & DDE_FLAG_LOADED);
DDT_ENTRY_TO_LIGHTWEIGHT(ddt, dde, &ddlwe);
ddt_log_entry(ddt, &ddlwe, &dlu);
ddt_sync_scan_entry(ddt, &ddlwe, tx);
ddt_free(ddt, dde);
}
ddt_log_commit(ddt, &dlu);
/*
* Sync the stats for the store objects. Even though we haven't
* modified anything on those objects, they're no longer the
* source of truth for entries that are now in the log, and we
* need the on-disk counts to reflect that, otherwise we'll
* miscount later when importing.
*/
for (ddt_type_t type = 0; type < DDT_TYPES; type++) {
for (ddt_class_t class = 0;
class < DDT_CLASSES; class++) {
if (ddt_object_exists(ddt, type, class))
ddt_object_sync(ddt, type, class, tx);
}
}
memcpy(&ddt->ddt_histogram_cache, ddt->ddt_histogram,
sizeof (ddt->ddt_histogram));
ddt->ddt_spa->spa_dedup_dspace = ~0ULL;
ddt->ddt_spa->spa_dedup_dsize = ~0ULL;
}
if (spa_sync_pass(ddt->ddt_spa) == 1)
/*
* Update ingest rate. This is an exponential weighted moving
* average of the number of entries changed over recent txgs.
* The ramp-up cost shouldn't matter too much because the
* flusher will be trying to take at least the minimum anyway.
*/
ddt->ddt_log_ingest_rate = _ewma(
count, ddt->ddt_log_ingest_rate,
zfs_dedup_log_flush_flow_rate_txgs);
}
static void
ddt_sync_table_flush(ddt_t *ddt, dmu_tx_t *tx)
{
if (avl_numnodes(&ddt->ddt_tree) == 0)
return;
ddt_entry_t *dde;
void *cookie = NULL;
while ((dde = avl_destroy_nodes(
&ddt->ddt_tree, &cookie)) != NULL) {
ASSERT(dde->dde_flags & DDE_FLAG_LOADED);
ddt_lightweight_entry_t ddlwe;
DDT_ENTRY_TO_LIGHTWEIGHT(ddt, dde, &ddlwe);
ddt_sync_flush_entry(ddt, &ddlwe,
dde->dde_type, dde->dde_class, tx);
ddt_sync_scan_entry(ddt, &ddlwe, tx);
ddt_free(ddt, dde);
}
memcpy(&ddt->ddt_histogram_cache, ddt->ddt_histogram,
sizeof (ddt->ddt_histogram));
ddt->ddt_spa->spa_dedup_dspace = ~0ULL;
ddt->ddt_spa->spa_dedup_dsize = ~0ULL;
ddt_sync_update_stats(ddt, tx);
}
static void
ddt_sync_table(ddt_t *ddt, dmu_tx_t *tx)
{
spa_t *spa = ddt->ddt_spa;
if (ddt->ddt_version == UINT64_MAX)
return;
if (spa->spa_uberblock.ub_version < SPA_VERSION_DEDUP) {
ASSERT0(avl_numnodes(&ddt->ddt_tree));
return;
}
if (spa->spa_ddt_stat_object == 0) {
spa->spa_ddt_stat_object = zap_create_link(ddt->ddt_os,
DMU_OT_DDT_STATS, DMU_POOL_DIRECTORY_OBJECT,
DMU_POOL_DDT_STATS, tx);
}
if (ddt->ddt_version == DDT_VERSION_FDT && ddt->ddt_dir_object == 0)
ddt_create_dir(ddt, tx);
if (ddt->ddt_flags & DDT_FLAG_LOG)
ddt_sync_table_log(ddt, tx);
else
ddt_sync_table_flush(ddt, tx);
}
void
@@ -1651,7 +2037,9 @@ ddt_sync(spa_t *spa, uint64_t txg)
ddt_t *ddt = spa->spa_ddt[c];
if (ddt == NULL)
continue;
ddt_sync_table(ddt, tx, txg);
ddt_sync_table(ddt, tx);
if (ddt->ddt_flags & DDT_FLAG_LOG)
ddt_sync_flush_log(ddt, tx);
ddt_repair_table(ddt, rio);
}
@@ -1719,9 +2107,12 @@ ddt_addref(spa_t *spa, const blkptr_t *bp)
return (B_FALSE);
}
if (dde->dde_type < DDT_TYPES) {
ASSERT3S(dde->dde_class, <, DDT_CLASSES);
if ((dde->dde_type < DDT_TYPES) || (dde->dde_flags & DDE_FLAG_LOGGED)) {
/*
* This entry was either synced to a store object (dde_type is
* real) or was logged. It must be properly on disk at this
* point, so we can just bump its refcount.
*/
int p = DDT_PHYS_FOR_COPIES(ddt, BP_GET_NDVAS(bp));
ddt_phys_variant_t v = DDT_PHYS_VARIANT(ddt, p);
@@ -1748,7 +2139,6 @@ ddt_addref(spa_t *spa, const blkptr_t *bp)
* we may have a block with the DEDUP set, but which doesn't
* have a corresponding entry in the DDT. Be ready.
*/
ASSERT3S(dde->dde_class, ==, DDT_CLASSES);
ddt_remove(ddt, dde);
result = B_FALSE;
}
@@ -1761,3 +2151,15 @@ ddt_addref(spa_t *spa, const blkptr_t *bp)
ZFS_MODULE_PARAM(zfs_dedup, zfs_dedup_, prefetch, INT, ZMOD_RW,
"Enable prefetching dedup-ed blks");
ZFS_MODULE_PARAM(zfs_dedup, zfs_dedup_, log_flush_passes_max, UINT, ZMOD_RW,
"Max number of incremental dedup log flush passes per transaction");
ZFS_MODULE_PARAM(zfs_dedup, zfs_dedup_, log_flush_min_time_ms, UINT, ZMOD_RW,
"Min time to spend on incremental dedup log flush each transaction");
ZFS_MODULE_PARAM(zfs_dedup, zfs_dedup_, log_flush_entries_min, UINT, ZMOD_RW,
"Min number of log entries to flush each transaction");
ZFS_MODULE_PARAM(zfs_dedup, zfs_dedup_, log_flush_flow_rate_txgs, UINT, ZMOD_RW,
"Number of txgs to average flow rates across");