Use SA_HDL_PRIVATE for SA xattrs

A private SA handle must be used to ensure we can drop the dbuf
hold on the spill block prior to calling dmu_tx_commit().  If we
call dmu_tx_commit() before sa_handle_destroy(), then our hold
will trigger a copy of the dbuf to be made.  This is done to
prevent data from leaking in to the syncing txg.  As a result
the original dirty spill block will remain cached.

Additionally, relying on the shared zp->z_sa_hdl is unsafe in
the xattr context because the znode may be asynchronously dropped
from the cache.  It's far safer and simpler just to use a private
handle for xattrs.  Plus any additional overhead is offset by
the avoidance of the previously mentioned memory copy.

These forever dirty buffers can be noticed in the arcstats under
the anon_size.  On a quiescent system the value should be zero.
Without this fix and a SA xattr write workload you will see
anon_size increase.  Eventually, if enough dirty data builds up
your system it will appear to hang.  This occurs because the dmu
won't allow new txs to be assigned until that dirty data is
flushed, and it won't be because it's not part of an assigned tx.

As an aside, I typically see anon_size lurk around 16k so I think
there is another place in the code which needs a similar fix.
However, this value doesn't grow over time so it isn't critical.

Signed-off-by: Brian Behlendorf <behlendorf1@llnl.gov>
Issue #503
Issue #513
This commit is contained in:
Brian Behlendorf 2012-03-02 10:35:50 -08:00
parent 4b787d75c8
commit ec2626ad3f

View File

@ -188,6 +188,7 @@ int
zfs_sa_get_xattr(znode_t *zp) zfs_sa_get_xattr(znode_t *zp)
{ {
zfs_sb_t *zsb = ZTOZSB(zp); zfs_sb_t *zsb = ZTOZSB(zp);
sa_handle_t *sa;
char *obj; char *obj;
int size; int size;
int error; int error;
@ -196,8 +197,14 @@ zfs_sa_get_xattr(znode_t *zp)
ASSERT(!zp->z_xattr_cached); ASSERT(!zp->z_xattr_cached);
ASSERT(zp->z_is_sa); ASSERT(zp->z_is_sa);
error = sa_size(zp->z_sa_hdl, SA_ZPL_DXATTR(zsb), &size); error = sa_handle_get(zsb->z_os, zp->z_id, NULL, SA_HDL_PRIVATE, &sa);
if (error)
return (error);
error = sa_size(sa, SA_ZPL_DXATTR(zsb), &size);
if (error) { if (error) {
sa_handle_destroy(sa);
if (error == ENOENT) if (error == ENOENT)
return nvlist_alloc(&zp->z_xattr_cached, return nvlist_alloc(&zp->z_xattr_cached,
NV_UNIQUE_NAME, KM_SLEEP); NV_UNIQUE_NAME, KM_SLEEP);
@ -207,11 +214,12 @@ zfs_sa_get_xattr(znode_t *zp)
obj = sa_spill_alloc(KM_SLEEP); obj = sa_spill_alloc(KM_SLEEP);
error = sa_lookup(zp->z_sa_hdl, SA_ZPL_DXATTR(zsb), obj, size); error = sa_lookup(sa, SA_ZPL_DXATTR(zsb), obj, size);
if (error == 0) if (error == 0)
error = nvlist_unpack(obj, size, &zp->z_xattr_cached, KM_SLEEP); error = nvlist_unpack(obj, size, &zp->z_xattr_cached, KM_SLEEP);
sa_spill_free(obj); sa_spill_free(obj);
sa_handle_destroy(sa);
return (error); return (error);
} }
@ -220,6 +228,7 @@ int
zfs_sa_set_xattr(znode_t *zp) zfs_sa_set_xattr(znode_t *zp)
{ {
zfs_sb_t *zsb = ZTOZSB(zp); zfs_sb_t *zsb = ZTOZSB(zp);
sa_handle_t *sa;
dmu_tx_t *tx; dmu_tx_t *tx;
char *obj; char *obj;
size_t size; size_t size;
@ -240,16 +249,30 @@ zfs_sa_set_xattr(znode_t *zp)
if (error) if (error)
goto out_free; goto out_free;
/*
* A private SA handle must be used to ensure we can drop the hold
* on the spill block prior to calling dmu_tx_commit(). If we call
* dmu_tx_commit() before sa_handle_destroy(), then our hold will
* trigger a copy of the buffer at txg sync time. This is done to
* prevent data from leaking in to the syncing txg. As a result
* the original dirty spill block will be remain dirty in the arc
* while the copy is written and laundered.
*/
error = sa_handle_get(zsb->z_os, zp->z_id, NULL, SA_HDL_PRIVATE, &sa);
if (error)
goto out_free;
tx = dmu_tx_create(zsb->z_os); tx = dmu_tx_create(zsb->z_os);
dmu_tx_hold_sa_create(tx, size); dmu_tx_hold_sa_create(tx, size);
dmu_tx_hold_sa(tx, zp->z_sa_hdl, B_TRUE); dmu_tx_hold_sa(tx, sa, B_TRUE);
error = dmu_tx_assign(tx, TXG_WAIT); error = dmu_tx_assign(tx, TXG_WAIT);
if (error) { if (error) {
dmu_tx_abort(tx); dmu_tx_abort(tx);
sa_handle_destroy(sa);
} else { } else {
error = sa_update(zp->z_sa_hdl, SA_ZPL_DXATTR(zsb), error = sa_update(sa, SA_ZPL_DXATTR(zsb), obj, size, tx);
obj, size, tx); sa_handle_destroy(sa);
if (error) if (error)
dmu_tx_abort(tx); dmu_tx_abort(tx);
else else