]> granicus.if.org Git - zfs/commitdiff
OpenZFS 7090 - zfs should throttle allocations
authorDon Brady <don.brady@intel.com>
Fri, 14 Oct 2016 00:59:18 +0000 (18:59 -0600)
committerBrian Behlendorf <behlendorf1@llnl.gov>
Fri, 14 Oct 2016 00:59:18 +0000 (17:59 -0700)
OpenZFS 7090 - zfs should throttle allocations

Authored by: George Wilson <george.wilson@delphix.com>
Reviewed by: Alex Reece <alex@delphix.com>
Reviewed by: Christopher Siden <christopher.siden@delphix.com>
Reviewed by: Dan Kimmel <dan.kimmel@delphix.com>
Reviewed by: Matthew Ahrens <mahrens@delphix.com>
Reviewed by: Paul Dagnelie <paul.dagnelie@delphix.com>
Reviewed by: Prakash Surya <prakash.surya@delphix.com>
Reviewed by: Sebastien Roy <sebastien.roy@delphix.com>
Approved by: Matthew Ahrens <mahrens@delphix.com>
Ported-by: Don Brady <don.brady@intel.com>
Reviewed-by: Brian Behlendorf <behlendorf1@llnl.gov>
When write I/Os are issued, they are issued in block order but the ZIO
pipeline will drive them asynchronously through the allocation stage
which can result in blocks being allocated out-of-order. It would be
nice to preserve as much of the logical order as possible.

In addition, the allocations are equally scattered across all top-level
VDEVs but not all top-level VDEVs are created equally. The pipeline
should be able to detect devices that are more capable of handling
allocations and should allocate more blocks to those devices. This
allows for dynamic allocation distribution when devices are imbalanced
as fuller devices will tend to be slower than empty devices.

The change includes a new pool-wide allocation queue which would
throttle and order allocations in the ZIO pipeline. The queue would be
ordered by issued time and offset and would provide an initial amount of
allocation of work to each top-level vdev. The allocation logic utilizes
a reservation system to reserve allocations that will be performed by
the allocator. Once an allocation is successfully completed it's
scheduled on a given top-level vdev. Each top-level vdev maintains a
maximum number of allocations that it can handle (mg_alloc_queue_depth).
The pool-wide reserved allocations (top-levels * mg_alloc_queue_depth)
are distributed across the top-level vdevs metaslab groups and round
robin across all eligible metaslab groups to distribute the work. As
top-levels complete their work, they receive additional work from the
pool-wide allocation queue until the allocation queue is emptied.

OpenZFS-issue: https://www.illumos.org/issues/7090
OpenZFS-commit: https://github.com/openzfs/openzfs/commit/4756c3d7
Closes #5258

Porting Notes:
- Maintained minimal stack in zio_done
- Preserve linux-specific io sizes in zio_write_compress
- Added module params and documentation
- Updated to use optimize AVL cmp macros

18 files changed:
include/sys/fs/zfs.h
include/sys/metaslab.h
include/sys/metaslab_impl.h
include/sys/refcount.h
include/sys/spa_impl.h
include/sys/vdev_impl.h
include/sys/zio.h
include/sys/zio_impl.h
man/man5/zfs-module-parameters.5
module/zfs/metaslab.c
module/zfs/refcount.c
module/zfs/spa.c
module/zfs/spa_misc.c
module/zfs/vdev.c
module/zfs/vdev_cache.c
module/zfs/vdev_mirror.c
module/zfs/vdev_queue.c
module/zfs/zio.c

index 5c93f53dec36a309a0fe98b789d58380dce4da7a..c51d190c7f3d00c27bd488c2ed84c547a204cac7 100644 (file)
@@ -1038,7 +1038,8 @@ typedef enum {
        SPA_LOAD_IMPORT,        /* import in progress   */
        SPA_LOAD_TRYIMPORT,     /* tryimport in progress */
        SPA_LOAD_RECOVER,       /* recovery requested   */
-       SPA_LOAD_ERROR          /* load failed          */
+       SPA_LOAD_ERROR,         /* load failed          */
+       SPA_LOAD_CREATE         /* creation in progress */
 } spa_load_state_t;
 
 /*
index 5f831a1f5604d9602c78fb625a369b8b27d83b35..408f6d333fb452eff69c92d9d8bce6e30474f94f 100644 (file)
@@ -20,7 +20,7 @@
  */
 /*
  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
- * Copyright (c) 2011, 2014 by Delphix. All rights reserved.
+ * Copyright (c) 2011, 2015 by Delphix. All rights reserved.
  */
 
 #ifndef _SYS_METASLAB_H
@@ -55,15 +55,16 @@ void metaslab_sync_done(metaslab_t *, uint64_t);
 void metaslab_sync_reassess(metaslab_group_t *);
 uint64_t metaslab_block_maxsize(metaslab_t *);
 
-#define        METASLAB_HINTBP_FAVOR   0x0
-#define        METASLAB_HINTBP_AVOID   0x1
-#define        METASLAB_GANG_HEADER    0x2
-#define        METASLAB_GANG_CHILD     0x4
-#define        METASLAB_GANG_AVOID     0x8
-#define        METASLAB_FASTWRITE      0x10
+#define        METASLAB_HINTBP_FAVOR           0x0
+#define        METASLAB_HINTBP_AVOID           0x1
+#define        METASLAB_GANG_HEADER            0x2
+#define        METASLAB_GANG_CHILD             0x4
+#define        METASLAB_ASYNC_ALLOC            0x8
+#define        METASLAB_DONT_THROTTLE          0x10
+#define        METASLAB_FASTWRITE              0x20
 
 int metaslab_alloc(spa_t *, metaslab_class_t *, uint64_t,
-    blkptr_t *, int, uint64_t, blkptr_t *, int);
+    blkptr_t *, int, uint64_t, blkptr_t *, int, zio_t *);
 void metaslab_free(spa_t *, const blkptr_t *, uint64_t, boolean_t);
 int metaslab_claim(spa_t *, const blkptr_t *, uint64_t);
 void metaslab_check_free(spa_t *, const blkptr_t *);
@@ -76,6 +77,9 @@ int metaslab_class_validate(metaslab_class_t *);
 void metaslab_class_histogram_verify(metaslab_class_t *);
 uint64_t metaslab_class_fragmentation(metaslab_class_t *);
 uint64_t metaslab_class_expandable_space(metaslab_class_t *);
+boolean_t metaslab_class_throttle_reserve(metaslab_class_t *, int,
+    zio_t *, int);
+void metaslab_class_throttle_unreserve(metaslab_class_t *, int, zio_t *);
 
 void metaslab_class_space_update(metaslab_class_t *, int64_t, int64_t,
     int64_t, int64_t);
@@ -88,10 +92,13 @@ metaslab_group_t *metaslab_group_create(metaslab_class_t *, vdev_t *);
 void metaslab_group_destroy(metaslab_group_t *);
 void metaslab_group_activate(metaslab_group_t *);
 void metaslab_group_passivate(metaslab_group_t *);
+boolean_t metaslab_group_initialized(metaslab_group_t *);
 uint64_t metaslab_group_get_space(metaslab_group_t *);
 void metaslab_group_histogram_verify(metaslab_group_t *);
 uint64_t metaslab_group_fragmentation(metaslab_group_t *);
 void metaslab_group_histogram_remove(metaslab_group_t *, metaslab_t *);
+void metaslab_group_alloc_decrement(spa_t *, uint64_t, void *, int);
+void metaslab_group_alloc_verify(spa_t *, const blkptr_t *, void *);
 
 #ifdef __cplusplus
 }
index 27a53b515fbc48ab5b200e88259df91cb6effe19..1c8993aca55ab0e9ceec1e3f20ede2c679efa8e4 100644 (file)
@@ -24,7 +24,7 @@
  */
 
 /*
- * Copyright (c) 2011, 2014 by Delphix. All rights reserved.
+ * Copyright (c) 2011, 2015 by Delphix. All rights reserved.
  */
 
 #ifndef _SYS_METASLAB_IMPL_H
@@ -59,11 +59,42 @@ extern "C" {
  * to use a block allocator that best suits that class.
  */
 struct metaslab_class {
+       kmutex_t                mc_lock;
        spa_t                   *mc_spa;
        metaslab_group_t        *mc_rotor;
        metaslab_ops_t          *mc_ops;
        uint64_t                mc_aliquot;
+
+       /*
+        * Track the number of metaslab groups that have been initialized
+        * and can accept allocations. An initialized metaslab group is
+        * one has been completely added to the config (i.e. we have
+        * updated the MOS config and the space has been added to the pool).
+        */
+       uint64_t                mc_groups;
+
+       /*
+        * Toggle to enable/disable the allocation throttle.
+        */
+       boolean_t               mc_alloc_throttle_enabled;
+
+       /*
+        * The allocation throttle works on a reservation system. Whenever
+        * an asynchronous zio wants to perform an allocation it must
+        * first reserve the number of blocks that it wants to allocate.
+        * If there aren't sufficient slots available for the pending zio
+        * then that I/O is throttled until more slots free up. The current
+        * number of reserved allocations is maintained by the mc_alloc_slots
+        * refcount. The mc_alloc_max_slots value determines the maximum
+        * number of allocations that the system allows. Gang blocks are
+        * allowed to reserve slots even if we've reached the maximum
+        * number of allocations allowed.
+        */
+       uint64_t                mc_alloc_max_slots;
+       refcount_t              mc_alloc_slots;
+
        uint64_t                mc_alloc_groups; /* # of allocatable groups */
+
        uint64_t                mc_alloc;       /* total allocated space */
        uint64_t                mc_deferred;    /* total deferred frees */
        uint64_t                mc_space;       /* total space (alloc + free) */
@@ -85,6 +116,15 @@ struct metaslab_group {
        avl_tree_t              mg_metaslab_tree;
        uint64_t                mg_aliquot;
        boolean_t               mg_allocatable;         /* can we allocate? */
+
+       /*
+        * A metaslab group is considered to be initialized only after
+        * we have updated the MOS config and added the space to the pool.
+        * We only allow allocation attempts to a metaslab group if it
+        * has been initialized.
+        */
+       boolean_t               mg_initialized;
+
        uint64_t                mg_free_capacity;       /* percentage free */
        int64_t                 mg_bias;
        int64_t                 mg_activation_count;
@@ -93,6 +133,27 @@ struct metaslab_group {
        taskq_t                 *mg_taskq;
        metaslab_group_t        *mg_prev;
        metaslab_group_t        *mg_next;
+
+       /*
+        * Each metaslab group can handle mg_max_alloc_queue_depth allocations
+        * which are tracked by mg_alloc_queue_depth. It's possible for a
+        * metaslab group to handle more allocations than its max. This
+        * can occur when gang blocks are required or when other groups
+        * are unable to handle their share of allocations.
+        */
+       uint64_t                mg_max_alloc_queue_depth;
+       refcount_t              mg_alloc_queue_depth;
+
+       /*
+        * A metalab group that can no longer allocate the minimum block
+        * size will set mg_no_free_space. Once a metaslab group is out
+        * of space then its share of work must be distributed to other
+        * groups.
+        */
+       boolean_t               mg_no_free_space;
+
+       uint64_t                mg_allocations;
+       uint64_t                mg_failed_allocations;
        uint64_t                mg_fragmentation;
        uint64_t                mg_histogram[RANGE_TREE_HISTOGRAM_SIZE];
 };
index 580976c912bf8ae6c988852d2b0f31c70b7bc6cb..3f50cddb6f5114af8d0342adcb499c1ab4a0a1d0 100644 (file)
@@ -20,6 +20,7 @@
  */
 /*
  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2012, 2015 by Delphix. All rights reserved.
  */
 
 #ifndef        _SYS_REFCOUNT_H
@@ -61,6 +62,7 @@ typedef struct refcount {
 
 void refcount_create(refcount_t *rc);
 void refcount_create_untracked(refcount_t *rc);
+void refcount_create_tracked(refcount_t *rc);
 void refcount_destroy(refcount_t *rc);
 void refcount_destroy_many(refcount_t *rc, uint64_t number);
 int refcount_is_zero(refcount_t *rc);
@@ -71,6 +73,8 @@ int64_t refcount_add_many(refcount_t *rc, uint64_t number, void *holder_tag);
 int64_t refcount_remove_many(refcount_t *rc, uint64_t number, void *holder_tag);
 void refcount_transfer(refcount_t *dst, refcount_t *src);
 void refcount_transfer_ownership(refcount_t *, void *, void *);
+boolean_t refcount_held(refcount_t *, void *);
+boolean_t refcount_not_held(refcount_t *, void *);
 
 void refcount_init(void);
 void refcount_fini(void);
@@ -83,6 +87,7 @@ typedef struct refcount {
 
 #define        refcount_create(rc) ((rc)->rc_count = 0)
 #define        refcount_create_untracked(rc) ((rc)->rc_count = 0)
+#define        refcount_create_tracked(rc) ((rc)->rc_count = 0)
 #define        refcount_destroy(rc) ((rc)->rc_count = 0)
 #define        refcount_destroy_many(rc, number) ((rc)->rc_count = 0)
 #define        refcount_is_zero(rc) ((rc)->rc_count == 0)
@@ -99,6 +104,8 @@ typedef struct refcount {
        atomic_add_64(&(dst)->rc_count, __tmp); \
 }
 #define        refcount_transfer_ownership(rc, current_holder, new_holder)     (void)0
+#define        refcount_held(rc, holder)               ((rc)->rc_count > 0)
+#define        refcount_not_held(rc, holder)           (B_TRUE)
 
 #define        refcount_init()
 #define        refcount_fini()
index cb1d16ad55475117b1dbe7345471907880186742..88bde98dc5577b91f567d6e825752e9701cb7d93 100644 (file)
@@ -165,6 +165,8 @@ struct spa {
        uint64_t        spa_last_synced_guid;   /* last synced guid */
        list_t          spa_config_dirty_list;  /* vdevs with dirty config */
        list_t          spa_state_dirty_list;   /* vdevs with dirty state */
+       kmutex_t        spa_alloc_lock;
+       avl_tree_t      spa_alloc_tree;
        spa_aux_vdev_t  spa_spares;             /* hot spares */
        spa_aux_vdev_t  spa_l2cache;            /* L2ARC cache devices */
        nvlist_t        *spa_label_features;    /* Features for reading MOS */
index 0d09c81c7f83030451b09a8b498d00c7a19b640c..47e70090a568fa703d212e1817e95cb582ec1474 100644 (file)
@@ -53,6 +53,9 @@ typedef struct vdev_queue vdev_queue_t;
 typedef struct vdev_cache vdev_cache_t;
 typedef struct vdev_cache_entry vdev_cache_entry_t;
 
+extern int zfs_vdev_queue_depth_pct;
+extern uint32_t zfs_vdev_async_write_max_active;
+
 /*
  * Virtual device operations
  */
@@ -177,9 +180,20 @@ struct vdev {
        uint64_t        vdev_deflate_ratio; /* deflation ratio (x512)   */
        uint64_t        vdev_islog;     /* is an intent log device      */
        uint64_t        vdev_removing;  /* device is being removed?     */
-       boolean_t       vdev_ishole;    /* is a hole in the namespace   */
+       boolean_t       vdev_ishole;    /* is a hole in the namespace   */
+       kmutex_t        vdev_queue_lock; /* protects vdev_queue_depth   */
        uint64_t        vdev_top_zap;
 
+       /*
+        * The queue depth parameters determine how many async writes are
+        * still pending (i.e. allocated by net yet issued to disk) per
+        * top-level (vdev_async_write_queue_depth) and the maximum allowed
+        * (vdev_max_async_write_queue_depth). These values only apply to
+        * top-level vdevs.
+        */
+       uint64_t        vdev_async_write_queue_depth;
+       uint64_t        vdev_max_async_write_queue_depth;
+
        /*
         * Leaf vdev state.
         */
index 22001559cb5be39616542d2a6adb19453ade6a84..864e8b2bec8d4f8b60bae86411b1262636781807 100644 (file)
@@ -157,6 +157,7 @@ enum zio_flag {
        ZIO_FLAG_DONT_CACHE     = 1 << 11,
        ZIO_FLAG_NODATA         = 1 << 12,
        ZIO_FLAG_INDUCE_DAMAGE  = 1 << 13,
+       ZIO_FLAG_IO_ALLOCATING  = 1 << 14,
 
 #define        ZIO_FLAG_DDT_INHERIT    (ZIO_FLAG_IO_RETRY - 1)
 #define        ZIO_FLAG_GANG_INHERIT   (ZIO_FLAG_IO_RETRY - 1)
@@ -164,28 +165,28 @@ enum zio_flag {
        /*
         * Flags inherited by vdev children.
         */
-       ZIO_FLAG_IO_RETRY       = 1 << 14,      /* must be first for INHERIT */
-       ZIO_FLAG_PROBE          = 1 << 15,
-       ZIO_FLAG_TRYHARD        = 1 << 16,
-       ZIO_FLAG_OPTIONAL       = 1 << 17,
+       ZIO_FLAG_IO_RETRY       = 1 << 15,      /* must be first for INHERIT */
+       ZIO_FLAG_PROBE          = 1 << 16,
+       ZIO_FLAG_TRYHARD        = 1 << 17,
+       ZIO_FLAG_OPTIONAL       = 1 << 18,
 
 #define        ZIO_FLAG_VDEV_INHERIT   (ZIO_FLAG_DONT_QUEUE - 1)
 
        /*
         * Flags not inherited by any children.
         */
-       ZIO_FLAG_DONT_QUEUE     = 1 << 18,      /* must be first for INHERIT */
-       ZIO_FLAG_DONT_PROPAGATE = 1 << 19,
-       ZIO_FLAG_IO_BYPASS      = 1 << 20,
-       ZIO_FLAG_IO_REWRITE     = 1 << 21,
-       ZIO_FLAG_RAW            = 1 << 22,
-       ZIO_FLAG_GANG_CHILD     = 1 << 23,
-       ZIO_FLAG_DDT_CHILD      = 1 << 24,
-       ZIO_FLAG_GODFATHER      = 1 << 25,
-       ZIO_FLAG_NOPWRITE       = 1 << 26,
-       ZIO_FLAG_REEXECUTED     = 1 << 27,
-       ZIO_FLAG_DELEGATED      = 1 << 28,
-       ZIO_FLAG_FASTWRITE      = 1 << 29,
+       ZIO_FLAG_DONT_QUEUE     = 1 << 19,      /* must be first for INHERIT */
+       ZIO_FLAG_DONT_PROPAGATE = 1 << 20,
+       ZIO_FLAG_IO_BYPASS      = 1 << 21,
+       ZIO_FLAG_IO_REWRITE     = 1 << 22,
+       ZIO_FLAG_RAW            = 1 << 23,
+       ZIO_FLAG_GANG_CHILD     = 1 << 24,
+       ZIO_FLAG_DDT_CHILD      = 1 << 25,
+       ZIO_FLAG_GODFATHER      = 1 << 26,
+       ZIO_FLAG_NOPWRITE       = 1 << 27,
+       ZIO_FLAG_REEXECUTED     = 1 << 28,
+       ZIO_FLAG_DELEGATED      = 1 << 29,
+       ZIO_FLAG_FASTWRITE      = 1 << 30
 };
 
 #define        ZIO_FLAG_MUSTSUCCEED            0
@@ -225,6 +226,7 @@ enum zio_wait_type {
 
 typedef void zio_done_func_t(zio_t *zio);
 
+extern int zio_dva_throttle_enabled;
 extern const char *zio_type_name[ZIO_TYPES];
 
 /*
@@ -379,7 +381,6 @@ struct zio {
        blkptr_t        io_bp_copy;
        list_t          io_parent_list;
        list_t          io_child_list;
-       zio_link_t      *io_walk_link;
        zio_t           *io_logical;
        zio_transform_t *io_transform_stack;
 
@@ -407,12 +408,14 @@ struct zio {
 
        uint64_t        io_offset;
        hrtime_t        io_timestamp;   /* submitted at */
+       hrtime_t        io_queued_timestamp;
        hrtime_t        io_target_timestamp;
        hrtime_t        io_delta;       /* vdev queue service delta */
        hrtime_t        io_delay;       /* Device access time (disk or */
                                        /* file). */
        avl_node_t      io_queue_node;
        avl_node_t      io_offset_node;
+       avl_node_t      io_alloc_node;
 
        /* Internal pipeline state */
        enum zio_flag   io_flags;
@@ -421,6 +424,7 @@ struct zio {
        enum zio_flag   io_orig_flags;
        enum zio_stage  io_orig_stage;
        enum zio_stage  io_orig_pipeline;
+       enum zio_stage  io_pipeline_trace;
        int             io_error;
        int             io_child_error[ZIO_CHILD_TYPES];
        uint64_t        io_children[ZIO_CHILD_TYPES][ZIO_WAIT_TYPES];
@@ -443,6 +447,8 @@ struct zio {
        taskq_ent_t     io_tqent;
 };
 
+extern int zio_timestamp_compare(const void *, const void *);
+
 extern zio_t *zio_null(zio_t *pio, spa_t *spa, vdev_t *vd,
     zio_done_func_t *done, void *private, enum zio_flag flags);
 
@@ -502,8 +508,8 @@ extern void zio_interrupt(zio_t *zio);
 extern void zio_delay_init(zio_t *zio);
 extern void zio_delay_interrupt(zio_t *zio);
 
-extern zio_t *zio_walk_parents(zio_t *cio);
-extern zio_t *zio_walk_children(zio_t *pio);
+extern zio_t *zio_walk_parents(zio_t *cio, zio_link_t **);
+extern zio_t *zio_walk_children(zio_t *pio, zio_link_t **);
 extern zio_t *zio_unique_parent(zio_t *cio);
 extern void zio_add_child(zio_t *pio, zio_t *cio);
 
index 08f820103e823681031100c8b2f65f8661e8293e..a36749a308d675fb77fae43923304a954d97c086 100644 (file)
@@ -24,7 +24,7 @@
  */
 
 /*
- * Copyright (c) 2013 by Delphix. All rights reserved.
+ * Copyright (c) 2012, 2015 by Delphix. All rights reserved.
  */
 
 #ifndef _ZIO_IMPL_H
@@ -108,35 +108,37 @@ enum zio_stage {
        ZIO_STAGE_OPEN                  = 1 << 0,       /* RWFCI */
 
        ZIO_STAGE_READ_BP_INIT          = 1 << 1,       /* R---- */
-       ZIO_STAGE_FREE_BP_INIT          = 1 << 2,       /* --F-- */
-       ZIO_STAGE_ISSUE_ASYNC           = 1 << 3,       /* RWF-- */
-       ZIO_STAGE_WRITE_BP_INIT         = 1 << 4,       /* -W--- */
+       ZIO_STAGE_WRITE_BP_INIT         = 1 << 2,       /* -W--- */
+       ZIO_STAGE_FREE_BP_INIT          = 1 << 3,       /* --F-- */
+       ZIO_STAGE_ISSUE_ASYNC           = 1 << 4,       /* RWF-- */
+       ZIO_STAGE_WRITE_COMPRESS        = 1 << 5,       /* -W--- */
 
-       ZIO_STAGE_CHECKSUM_GENERATE     = 1 << 5,       /* -W--- */
+       ZIO_STAGE_CHECKSUM_GENERATE     = 1 << 6,       /* -W--- */
 
-       ZIO_STAGE_NOP_WRITE             = 1 << 6,       /* -W--- */
+       ZIO_STAGE_NOP_WRITE             = 1 << 7,       /* -W--- */
 
-       ZIO_STAGE_DDT_READ_START        = 1 << 7,       /* R---- */
-       ZIO_STAGE_DDT_READ_DONE         = 1 << 8,       /* R---- */
-       ZIO_STAGE_DDT_WRITE             = 1 << 9,       /* -W--- */
-       ZIO_STAGE_DDT_FREE              = 1 << 10,      /* --F-- */
+       ZIO_STAGE_DDT_READ_START        = 1 << 8,       /* R---- */
+       ZIO_STAGE_DDT_READ_DONE         = 1 << 9,       /* R---- */
+       ZIO_STAGE_DDT_WRITE             = 1 << 10,      /* -W--- */
+       ZIO_STAGE_DDT_FREE              = 1 << 11,      /* --F-- */
 
-       ZIO_STAGE_GANG_ASSEMBLE         = 1 << 11,      /* RWFC- */
-       ZIO_STAGE_GANG_ISSUE            = 1 << 12,      /* RWFC- */
+       ZIO_STAGE_GANG_ASSEMBLE         = 1 << 12,      /* RWFC- */
+       ZIO_STAGE_GANG_ISSUE            = 1 << 13,      /* RWFC- */
 
-       ZIO_STAGE_DVA_ALLOCATE          = 1 << 13,      /* -W--- */
-       ZIO_STAGE_DVA_FREE              = 1 << 14,      /* --F-- */
-       ZIO_STAGE_DVA_CLAIM             = 1 << 15,      /* ---C- */
+       ZIO_STAGE_DVA_THROTTLE          = 1 << 14,      /* -W--- */
+       ZIO_STAGE_DVA_ALLOCATE          = 1 << 15,      /* -W--- */
+       ZIO_STAGE_DVA_FREE              = 1 << 16,      /* --F-- */
+       ZIO_STAGE_DVA_CLAIM             = 1 << 17,      /* ---C- */
 
-       ZIO_STAGE_READY                 = 1 << 16,      /* RWFCI */
+       ZIO_STAGE_READY                 = 1 << 18,      /* RWFCI */
 
-       ZIO_STAGE_VDEV_IO_START         = 1 << 17,      /* RW--I */
-       ZIO_STAGE_VDEV_IO_DONE          = 1 << 18,      /* RW--I */
-       ZIO_STAGE_VDEV_IO_ASSESS        = 1 << 19,      /* RW--I */
+       ZIO_STAGE_VDEV_IO_START         = 1 << 19,      /* RW--I */
+       ZIO_STAGE_VDEV_IO_DONE          = 1 << 20,      /* RW--I */
+       ZIO_STAGE_VDEV_IO_ASSESS        = 1 << 21,      /* RW--I */
 
-       ZIO_STAGE_CHECKSUM_VERIFY       = 1 << 20,      /* R---- */
+       ZIO_STAGE_CHECKSUM_VERIFY       = 1 << 22,      /* R---- */
 
-       ZIO_STAGE_DONE                  = 1 << 21       /* RWFCI */
+       ZIO_STAGE_DONE                  = 1 << 23       /* RWFCI */
 };
 
 #define        ZIO_INTERLOCK_STAGES                    \
@@ -187,22 +189,27 @@ enum zio_stage {
 
 #define        ZIO_REWRITE_PIPELINE                    \
        (ZIO_WRITE_COMMON_STAGES |              \
+       ZIO_STAGE_WRITE_COMPRESS |              \
        ZIO_STAGE_WRITE_BP_INIT)
 
 #define        ZIO_WRITE_PIPELINE                      \
        (ZIO_WRITE_COMMON_STAGES |              \
        ZIO_STAGE_WRITE_BP_INIT |               \
+       ZIO_STAGE_WRITE_COMPRESS |              \
+       ZIO_STAGE_DVA_THROTTLE |                \
        ZIO_STAGE_DVA_ALLOCATE)
 
 #define        ZIO_DDT_CHILD_WRITE_PIPELINE            \
        (ZIO_INTERLOCK_STAGES |                 \
        ZIO_VDEV_IO_STAGES |                    \
+       ZIO_STAGE_DVA_THROTTLE |                \
        ZIO_STAGE_DVA_ALLOCATE)
 
 #define        ZIO_DDT_WRITE_PIPELINE                  \
        (ZIO_INTERLOCK_STAGES |                 \
-       ZIO_STAGE_ISSUE_ASYNC |                 \
        ZIO_STAGE_WRITE_BP_INIT |               \
+       ZIO_STAGE_ISSUE_ASYNC |                 \
+       ZIO_STAGE_WRITE_COMPRESS |              \
        ZIO_STAGE_CHECKSUM_GENERATE |           \
        ZIO_STAGE_DDT_WRITE)
 
index e247160149265146d12c36ccb431737a5943fa0c..932342cfda21eaccb6a0682b28cf1833720e0f23 100644 (file)
@@ -1097,6 +1097,18 @@ See the section "ZFS I/O SCHEDULER".
 Default value: \fB10\fR.
 .RE
 
+.sp
+.ne 2
+.na
+\fBzfs_vdev_queue_depth_pct\fR (int)
+.ad
+.RS 12n
+The queue depth percentage for each top-level virtual device.
+Used in conjunction with zfs_vdev_async_max_active.
+.sp
+Default value: \fB1000\fR.
+.RE
+
 .sp
 .ne 2
 .na
@@ -1840,6 +1852,18 @@ operations.
 Default value: \fB30,000\fR.
 .RE
 
+.sp
+.ne 2
+.na
+\fBzio_dva_throttle_enabled\fR (int)
+.ad
+.RS 12n
+Throttle block allocations in the ZIO pipeline. This allows for
+dynamic allocation distribution when devices are imbalanced.
+.sp
+Default value: \fB1\fR.
+.RE
+
 .sp
 .ne 2
 .na
index 9de65c86ea179576dfa94100fd3581517575ea55..e54eeeae266c2d7912d14f7660e50405c8e52edc 100644 (file)
 
 #define        WITH_DF_BLOCK_ALLOCATOR
 
-/*
- * Allow allocations to switch to gang blocks quickly. We do this to
- * avoid having to load lots of space_maps in a given txg. There are,
- * however, some cases where we want to avoid "fast" ganging and instead
- * we want to do an exhaustive search of all metaslabs on this device.
- * Currently we don't allow any gang, slog, or dump device related allocations
- * to "fast" gang.
- */
-#define        CAN_FASTGANG(flags) \
-       (!((flags) & (METASLAB_GANG_CHILD | METASLAB_GANG_HEADER | \
-       METASLAB_GANG_AVOID)))
+#define        GANG_ALLOCATION(flags) \
+       ((flags) & (METASLAB_GANG_CHILD | METASLAB_GANG_HEADER))
 
 #define        METASLAB_WEIGHT_PRIMARY         (1ULL << 63)
 #define        METASLAB_WEIGHT_SECONDARY       (1ULL << 62)
@@ -198,6 +189,8 @@ metaslab_class_create(spa_t *spa, metaslab_ops_t *ops)
        mc->mc_spa = spa;
        mc->mc_rotor = NULL;
        mc->mc_ops = ops;
+       mutex_init(&mc->mc_lock, NULL, MUTEX_DEFAULT, NULL);
+       refcount_create_tracked(&mc->mc_alloc_slots);
 
        return (mc);
 }
@@ -211,6 +204,8 @@ metaslab_class_destroy(metaslab_class_t *mc)
        ASSERT(mc->mc_space == 0);
        ASSERT(mc->mc_dspace == 0);
 
+       refcount_destroy(&mc->mc_alloc_slots);
+       mutex_destroy(&mc->mc_lock);
        kmem_free(mc, sizeof (metaslab_class_t));
 }
 
@@ -414,9 +409,10 @@ metaslab_compare(const void *x1, const void *x2)
 /*
  * Update the allocatable flag and the metaslab group's capacity.
  * The allocatable flag is set to true if the capacity is below
- * the zfs_mg_noalloc_threshold. If a metaslab group transitions
- * from allocatable to non-allocatable or vice versa then the metaslab
- * group's class is updated to reflect the transition.
+ * the zfs_mg_noalloc_threshold or has a fragmentation value that is
+ * greater than zfs_mg_fragmentation_threshold. If a metaslab group
+ * transitions from allocatable to non-allocatable or vice versa then the
+ * metaslab group's class is updated to reflect the transition.
  */
 static void
 metaslab_group_alloc_update(metaslab_group_t *mg)
@@ -425,22 +421,45 @@ metaslab_group_alloc_update(metaslab_group_t *mg)
        metaslab_class_t *mc = mg->mg_class;
        vdev_stat_t *vs = &vd->vdev_stat;
        boolean_t was_allocatable;
+       boolean_t was_initialized;
 
        ASSERT(vd == vd->vdev_top);
 
        mutex_enter(&mg->mg_lock);
        was_allocatable = mg->mg_allocatable;
+       was_initialized = mg->mg_initialized;
 
        mg->mg_free_capacity = ((vs->vs_space - vs->vs_alloc) * 100) /
            (vs->vs_space + 1);
 
+       mutex_enter(&mc->mc_lock);
+
+       /*
+        * If the metaslab group was just added then it won't
+        * have any space until we finish syncing out this txg.
+        * At that point we will consider it initialized and available
+        * for allocations.  We also don't consider non-activated
+        * metaslab groups (e.g. vdevs that are in the middle of being removed)
+        * to be initialized, because they can't be used for allocation.
+        */
+       mg->mg_initialized = metaslab_group_initialized(mg);
+       if (!was_initialized && mg->mg_initialized) {
+               mc->mc_groups++;
+       } else if (was_initialized && !mg->mg_initialized) {
+               ASSERT3U(mc->mc_groups, >, 0);
+               mc->mc_groups--;
+       }
+       if (mg->mg_initialized)
+               mg->mg_no_free_space = B_FALSE;
+
        /*
         * A metaslab group is considered allocatable if it has plenty
         * of free space or is not heavily fragmented. We only take
         * fragmentation into account if the metaslab group has a valid
         * fragmentation metric (i.e. a value between 0 and 100).
         */
-       mg->mg_allocatable = (mg->mg_free_capacity > zfs_mg_noalloc_threshold &&
+       mg->mg_allocatable = (mg->mg_activation_count > 0 &&
+           mg->mg_free_capacity > zfs_mg_noalloc_threshold &&
            (mg->mg_fragmentation == ZFS_FRAG_INVALID ||
            mg->mg_fragmentation <= zfs_mg_fragmentation_threshold));
 
@@ -463,6 +482,7 @@ metaslab_group_alloc_update(metaslab_group_t *mg)
                mc->mc_alloc_groups--;
        else if (!was_allocatable && mg->mg_allocatable)
                mc->mc_alloc_groups++;
+       mutex_exit(&mc->mc_lock);
 
        mutex_exit(&mg->mg_lock);
 }
@@ -479,6 +499,9 @@ metaslab_group_create(metaslab_class_t *mc, vdev_t *vd)
        mg->mg_vd = vd;
        mg->mg_class = mc;
        mg->mg_activation_count = 0;
+       mg->mg_initialized = B_FALSE;
+       mg->mg_no_free_space = B_TRUE;
+       refcount_create_tracked(&mg->mg_alloc_queue_depth);
 
        mg->mg_taskq = taskq_create("metaslab_group_taskq", metaslab_load_pct,
            maxclsyspri, 10, INT_MAX, TASKQ_THREADS_CPU_PCT | TASKQ_DYNAMIC);
@@ -501,6 +524,7 @@ metaslab_group_destroy(metaslab_group_t *mg)
        taskq_destroy(mg->mg_taskq);
        avl_destroy(&mg->mg_metaslab_tree);
        mutex_destroy(&mg->mg_lock);
+       refcount_destroy(&mg->mg_alloc_queue_depth);
        kmem_free(mg, sizeof (metaslab_group_t));
 }
 
@@ -570,6 +594,15 @@ metaslab_group_passivate(metaslab_group_t *mg)
        mg->mg_next = NULL;
 }
 
+boolean_t
+metaslab_group_initialized(metaslab_group_t *mg)
+{
+       vdev_t *vd = mg->mg_vd;
+       vdev_stat_t *vs = &vd->vdev_stat;
+
+       return (vs->vs_space != 0 && mg->mg_activation_count > 0);
+}
+
 uint64_t
 metaslab_group_get_space(metaslab_group_t *mg)
 {
@@ -742,30 +775,97 @@ metaslab_group_fragmentation(metaslab_group_t *mg)
  * group should avoid allocations if its free capacity is less than the
  * zfs_mg_noalloc_threshold or its fragmentation metric is greater than
  * zfs_mg_fragmentation_threshold and there is at least one metaslab group
- * that can still handle allocations.
+ * that can still handle allocations. If the allocation throttle is enabled
+ * then we skip allocations to devices that have reached their maximum
+ * allocation queue depth unless the selected metaslab group is the only
+ * eligible group remaining.
  */
 static boolean_t
-metaslab_group_allocatable(metaslab_group_t *mg)
+metaslab_group_allocatable(metaslab_group_t *mg, metaslab_group_t *rotor,
+    uint64_t psize)
 {
-       vdev_t *vd = mg->mg_vd;
-       spa_t *spa = vd->vdev_spa;
+       spa_t *spa = mg->mg_vd->vdev_spa;
        metaslab_class_t *mc = mg->mg_class;
 
        /*
-        * We use two key metrics to determine if a metaslab group is
-        * considered allocatable -- free space and fragmentation. If
-        * the free space is greater than the free space threshold and
-        * the fragmentation is less than the fragmentation threshold then
-        * consider the group allocatable. There are two case when we will
-        * not consider these key metrics. The first is if the group is
-        * associated with a slog device and the second is if all groups
-        * in this metaslab class have already been consider ineligible
+        * We can only consider skipping this metaslab group if it's
+        * in the normal metaslab class and there are other metaslab
+        * groups to select from. Otherwise, we always consider it eligible
         * for allocations.
         */
-       return ((mg->mg_free_capacity > zfs_mg_noalloc_threshold &&
-           (mg->mg_fragmentation == ZFS_FRAG_INVALID ||
-           mg->mg_fragmentation <= zfs_mg_fragmentation_threshold)) ||
-           mc != spa_normal_class(spa) || mc->mc_alloc_groups == 0);
+       if (mc != spa_normal_class(spa) || mc->mc_groups <= 1)
+               return (B_TRUE);
+
+       /*
+        * If the metaslab group's mg_allocatable flag is set (see comments
+        * in metaslab_group_alloc_update() for more information) and
+        * the allocation throttle is disabled then allow allocations to this
+        * device. However, if the allocation throttle is enabled then
+        * check if we have reached our allocation limit (mg_alloc_queue_depth)
+        * to determine if we should allow allocations to this metaslab group.
+        * If all metaslab groups are no longer considered allocatable
+        * (mc_alloc_groups == 0) or we're trying to allocate the smallest
+        * gang block size then we allow allocations on this metaslab group
+        * regardless of the mg_allocatable or throttle settings.
+        */
+       if (mg->mg_allocatable) {
+               metaslab_group_t *mgp;
+               int64_t qdepth;
+               uint64_t qmax = mg->mg_max_alloc_queue_depth;
+
+               if (!mc->mc_alloc_throttle_enabled)
+                       return (B_TRUE);
+
+               /*
+                * If this metaslab group does not have any free space, then
+                * there is no point in looking further.
+                */
+               if (mg->mg_no_free_space)
+                       return (B_FALSE);
+
+               qdepth = refcount_count(&mg->mg_alloc_queue_depth);
+
+               /*
+                * If this metaslab group is below its qmax or it's
+                * the only allocatable metasable group, then attempt
+                * to allocate from it.
+                */
+               if (qdepth < qmax || mc->mc_alloc_groups == 1)
+                       return (B_TRUE);
+               ASSERT3U(mc->mc_alloc_groups, >, 1);
+
+               /*
+                * Since this metaslab group is at or over its qmax, we
+                * need to determine if there are metaslab groups after this
+                * one that might be able to handle this allocation. This is
+                * racy since we can't hold the locks for all metaslab
+                * groups at the same time when we make this check.
+                */
+               for (mgp = mg->mg_next; mgp != rotor; mgp = mgp->mg_next) {
+                       qmax = mgp->mg_max_alloc_queue_depth;
+
+                       qdepth = refcount_count(&mgp->mg_alloc_queue_depth);
+
+                       /*
+                        * If there is another metaslab group that
+                        * might be able to handle the allocation, then
+                        * we return false so that we skip this group.
+                        */
+                       if (qdepth < qmax && !mgp->mg_no_free_space)
+                               return (B_FALSE);
+               }
+
+               /*
+                * We didn't find another group to handle the allocation
+                * so we can't skip this metaslab group even though
+                * we are at or over our qmax.
+                */
+               return (B_TRUE);
+
+       } else if (mc->mc_alloc_groups == 0 || psize == SPA_MINBLOCKSIZE) {
+               return (B_TRUE);
+       }
+       return (B_FALSE);
 }
 
 /*
@@ -2054,8 +2154,62 @@ metaslab_distance(metaslab_t *msp, dva_t *dva)
        return (0);
 }
 
+/*
+ * ==========================================================================
+ * Metaslab block operations
+ * ==========================================================================
+ */
+
+static void
+metaslab_group_alloc_increment(spa_t *spa, uint64_t vdev, void *tag, int flags)
+{
+       metaslab_group_t *mg;
+
+       if (!(flags & METASLAB_ASYNC_ALLOC) ||
+           flags & METASLAB_DONT_THROTTLE)
+               return;
+
+       mg = vdev_lookup_top(spa, vdev)->vdev_mg;
+       if (!mg->mg_class->mc_alloc_throttle_enabled)
+               return;
+
+       (void) refcount_add(&mg->mg_alloc_queue_depth, tag);
+}
+
+void
+metaslab_group_alloc_decrement(spa_t *spa, uint64_t vdev, void *tag, int flags)
+{
+       metaslab_group_t *mg;
+
+       if (!(flags & METASLAB_ASYNC_ALLOC) ||
+           flags & METASLAB_DONT_THROTTLE)
+               return;
+
+       mg = vdev_lookup_top(spa, vdev)->vdev_mg;
+       if (!mg->mg_class->mc_alloc_throttle_enabled)
+               return;
+
+       (void) refcount_remove(&mg->mg_alloc_queue_depth, tag);
+}
+
+void
+metaslab_group_alloc_verify(spa_t *spa, const blkptr_t *bp, void *tag)
+{
+#ifdef ZFS_DEBUG
+       const dva_t *dva = bp->blk_dva;
+       int ndvas = BP_GET_NDVAS(bp);
+       int d;
+
+       for (d = 0; d < ndvas; d++) {
+               uint64_t vdev = DVA_GET_VDEV(&dva[d]);
+               metaslab_group_t *mg = vdev_lookup_top(spa, vdev)->vdev_mg;
+               VERIFY(refcount_not_held(&mg->mg_alloc_queue_depth, tag));
+       }
+#endif
+}
+
 static uint64_t
-metaslab_group_alloc(metaslab_group_t *mg, uint64_t psize, uint64_t asize,
+metaslab_group_alloc(metaslab_group_t *mg, uint64_t asize,
     uint64_t txg, uint64_t min_distance, dva_t *dva, int d)
 {
        spa_t *spa = mg->mg_vd->vdev_spa;
@@ -2082,10 +2236,10 @@ metaslab_group_alloc(metaslab_group_t *mg, uint64_t psize, uint64_t asize,
                        if (msp->ms_weight < asize) {
                                spa_dbgmsg(spa, "%s: failed to meet weight "
                                    "requirement: vdev %llu, txg %llu, mg %p, "
-                                   "msp %p, psize %llu, asize %llu, "
+                                   "msp %p, asize %llu, "
                                    "weight %llu", spa_name(spa),
                                    mg->mg_vd->vdev_id, txg,
-                                   mg, msp, psize, asize, msp->ms_weight);
+                                   mg, msp, asize, msp->ms_weight);
                                mutex_exit(&mg->mg_lock);
                                return (-1ULL);
                        }
@@ -2167,7 +2321,6 @@ metaslab_group_alloc(metaslab_group_t *mg, uint64_t psize, uint64_t asize,
        msp->ms_access_txg = txg + metaslab_unload_delay;
 
        mutex_exit(&msp->ms_lock);
-
        return (offset);
 }
 
@@ -2184,7 +2337,6 @@ metaslab_alloc_dva(spa_t *spa, metaslab_class_t *mc, uint64_t psize,
        int all_zero;
        int zio_lock = B_FALSE;
        boolean_t allocatable;
-       uint64_t offset = -1ULL;
        uint64_t asize;
        uint64_t distance;
 
@@ -2262,8 +2414,9 @@ metaslab_alloc_dva(spa_t *spa, metaslab_class_t *mc, uint64_t psize,
 top:
        all_zero = B_TRUE;
        do {
-               ASSERT(mg->mg_activation_count == 1);
+               uint64_t offset;
 
+               ASSERT(mg->mg_activation_count == 1);
                vd = mg->mg_vd;
 
                /*
@@ -2279,24 +2432,23 @@ top:
 
                /*
                 * Determine if the selected metaslab group is eligible
-                * for allocations. If we're ganging or have requested
-                * an allocation for the smallest gang block size
-                * then we don't want to avoid allocating to the this
-                * metaslab group. If we're in this condition we should
-                * try to allocate from any device possible so that we
-                * don't inadvertently return ENOSPC and suspend the pool
+                * for allocations. If we're ganging then don't allow
+                * this metaslab group to skip allocations since that would
+                * inadvertently return ENOSPC and suspend the pool
                 * even though space is still available.
                 */
-               if (allocatable && CAN_FASTGANG(flags) &&
-                   psize > SPA_GANGBLOCKSIZE)
-                       allocatable = metaslab_group_allocatable(mg);
+               if (allocatable && !GANG_ALLOCATION(flags) && !zio_lock) {
+                       allocatable = metaslab_group_allocatable(mg, rotor,
+                           psize);
+               }
 
                if (!allocatable)
                        goto next;
 
+               ASSERT(mg->mg_initialized);
+
                /*
-                * Avoid writing single-copy data to a failing vdev
-                * unless the user instructs us that it is okay.
+                * Avoid writing single-copy data to a failing vdev.
                 */
                if ((vd->vdev_stat.vs_write_errors > 0 ||
                    vd->vdev_state < VDEV_STATE_HEALTHY) &&
@@ -2316,8 +2468,31 @@ top:
                asize = vdev_psize_to_asize(vd, psize);
                ASSERT(P2PHASE(asize, 1ULL << vd->vdev_ashift) == 0);
 
-               offset = metaslab_group_alloc(mg, psize, asize, txg, distance,
-                   dva, d);
+               offset = metaslab_group_alloc(mg, asize, txg, distance, dva, d);
+
+               mutex_enter(&mg->mg_lock);
+               if (offset == -1ULL) {
+                       mg->mg_failed_allocations++;
+                       if (asize == SPA_GANGBLOCKSIZE) {
+                               /*
+                                * This metaslab group was unable to allocate
+                                * the minimum gang block size so it must be
+                                * out of space. We must notify the allocation
+                                * throttle to start skipping allocation
+                                * attempts to this metaslab group until more
+                                * space becomes available.
+                                *
+                                * Note: this failure cannot be caused by the
+                                * allocation throttle since the allocation
+                                * throttle is only responsible for skipping
+                                * devices and not failing block allocations.
+                                */
+                               mg->mg_no_free_space = B_TRUE;
+                       }
+               }
+               mg->mg_allocations++;
+               mutex_exit(&mg->mg_lock);
+
                if (offset != -1ULL) {
                        /*
                         * If we've just selected this metaslab group,
@@ -2517,9 +2692,62 @@ metaslab_claim_dva(spa_t *spa, const dva_t *dva, uint64_t txg)
        return (0);
 }
 
+/*
+ * Reserve some allocation slots. The reservation system must be called
+ * before we call into the allocator. If there aren't any available slots
+ * then the I/O will be throttled until an I/O completes and its slots are
+ * freed up. The function returns true if it was successful in placing
+ * the reservation.
+ */
+boolean_t
+metaslab_class_throttle_reserve(metaslab_class_t *mc, int slots, zio_t *zio,
+    int flags)
+{
+       uint64_t available_slots = 0;
+       uint64_t reserved_slots;
+       boolean_t slot_reserved = B_FALSE;
+
+       ASSERT(mc->mc_alloc_throttle_enabled);
+       mutex_enter(&mc->mc_lock);
+
+       reserved_slots = refcount_count(&mc->mc_alloc_slots);
+       if (reserved_slots < mc->mc_alloc_max_slots)
+               available_slots = mc->mc_alloc_max_slots - reserved_slots;
+
+       if (slots <= available_slots || GANG_ALLOCATION(flags)) {
+               int d;
+
+               /*
+                * We reserve the slots individually so that we can unreserve
+                * them individually when an I/O completes.
+                */
+               for (d = 0; d < slots; d++) {
+                       reserved_slots = refcount_add(&mc->mc_alloc_slots, zio);
+               }
+               zio->io_flags |= ZIO_FLAG_IO_ALLOCATING;
+               slot_reserved = B_TRUE;
+       }
+
+       mutex_exit(&mc->mc_lock);
+       return (slot_reserved);
+}
+
+void
+metaslab_class_throttle_unreserve(metaslab_class_t *mc, int slots, zio_t *zio)
+{
+       int d;
+
+       ASSERT(mc->mc_alloc_throttle_enabled);
+       mutex_enter(&mc->mc_lock);
+       for (d = 0; d < slots; d++) {
+               (void) refcount_remove(&mc->mc_alloc_slots, zio);
+       }
+       mutex_exit(&mc->mc_lock);
+}
+
 int
 metaslab_alloc(spa_t *spa, metaslab_class_t *mc, uint64_t psize, blkptr_t *bp,
-    int ndvas, uint64_t txg, blkptr_t *hintbp, int flags)
+    int ndvas, uint64_t txg, blkptr_t *hintbp, int flags, zio_t *zio)
 {
        dva_t *dva = bp->blk_dva;
        dva_t *hintdva = hintbp->blk_dva;
@@ -2545,11 +2773,21 @@ metaslab_alloc(spa_t *spa, metaslab_class_t *mc, uint64_t psize, blkptr_t *bp,
                if (error != 0) {
                        for (d--; d >= 0; d--) {
                                metaslab_free_dva(spa, &dva[d], txg, B_TRUE);
+                               metaslab_group_alloc_decrement(spa,
+                                   DVA_GET_VDEV(&dva[d]), zio, flags);
                                bzero(&dva[d], sizeof (dva_t));
                        }
                        spa_config_exit(spa, SCL_ALLOC, FTAG);
                        return (error);
+               } else {
+                       /*
+                        * Update the metaslab group's queue depth
+                        * based on the newly allocated dva.
+                        */
+                       metaslab_group_alloc_increment(spa,
+                           DVA_GET_VDEV(&dva[d]), zio, flags);
                }
+
        }
        ASSERT(error == 0);
        ASSERT(BP_GET_NDVAS(bp) == ndvas);
index 1903c59540d368daf72fa1e8e99b6332dd2fed34..6f8f4db0891f06bad9c2703a19cf32629b52f01d 100644 (file)
@@ -20,7 +20,7 @@
  */
 /*
  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
- * Copyright (c) 2012 by Delphix. All rights reserved.
+ * Copyright (c) 2012, 2015 by Delphix. All rights reserved.
  */
 
 #include <sys/zfs_context.h>
@@ -68,6 +68,13 @@ refcount_create(refcount_t *rc)
        rc->rc_tracked = reference_tracking_enable;
 }
 
+void
+refcount_create_tracked(refcount_t *rc)
+{
+       refcount_create(rc);
+       rc->rc_tracked = B_TRUE;
+}
+
 void
 refcount_create_untracked(refcount_t *rc)
 {
@@ -251,4 +258,60 @@ refcount_transfer_ownership(refcount_t *rc, void *current_holder,
        ASSERT(found);
        mutex_exit(&rc->rc_mtx);
 }
+
+/*
+ * If tracking is enabled, return true if a reference exists that matches
+ * the "holder" tag. If tracking is disabled, then return true if a reference
+ * might be held.
+ */
+boolean_t
+refcount_held(refcount_t *rc, void *holder)
+{
+       reference_t *ref;
+
+       mutex_enter(&rc->rc_mtx);
+
+       if (!rc->rc_tracked) {
+               mutex_exit(&rc->rc_mtx);
+               return (rc->rc_count > 0);
+       }
+
+       for (ref = list_head(&rc->rc_list); ref;
+           ref = list_next(&rc->rc_list, ref)) {
+               if (ref->ref_holder == holder) {
+                       mutex_exit(&rc->rc_mtx);
+                       return (B_TRUE);
+               }
+       }
+       mutex_exit(&rc->rc_mtx);
+       return (B_FALSE);
+}
+
+/*
+ * If tracking is enabled, return true if a reference does not exist that
+ * matches the "holder" tag. If tracking is disabled, always return true
+ * since the reference might not be held.
+ */
+boolean_t
+refcount_not_held(refcount_t *rc, void *holder)
+{
+       reference_t *ref;
+
+       mutex_enter(&rc->rc_mtx);
+
+       if (!rc->rc_tracked) {
+               mutex_exit(&rc->rc_mtx);
+               return (B_TRUE);
+       }
+
+       for (ref = list_head(&rc->rc_list); ref;
+           ref = list_next(&rc->rc_list, ref)) {
+               if (ref->ref_holder == holder) {
+                       mutex_exit(&rc->rc_mtx);
+                       return (B_FALSE);
+               }
+       }
+       mutex_exit(&rc->rc_mtx);
+       return (B_TRUE);
+}
 #endif /* ZFS_DEBUG */
index 9c29543b90cb58779044e195595e4e904e0ce90c..0cf07be9b4cfa753fd6cfa41ba3321a9b332e3ca 100644 (file)
@@ -1363,7 +1363,6 @@ spa_unload(spa_t *spa)
 
        ddt_unload(spa);
 
-
        /*
         * Drop and purge level 2 cache
         */
@@ -3813,6 +3812,7 @@ spa_create(const char *pool, nvlist_t *nvroot, nvlist_t *props,
        spa->spa_uberblock.ub_txg = txg - 1;
        spa->spa_uberblock.ub_version = version;
        spa->spa_ubsync = spa->spa_uberblock;
+       spa->spa_load_state = SPA_LOAD_CREATE;
 
        /*
         * Create "The Godfather" zio to hold all async IOs
@@ -3997,6 +3997,7 @@ spa_create(const char *pool, nvlist_t *nvroot, nvlist_t *props,
         */
        spa_evicting_os_wait(spa);
        spa->spa_minref = refcount_count(&spa->spa_refcount);
+       spa->spa_load_state = SPA_LOAD_NONE;
 
        mutex_exit(&spa_namespace_lock);
 
@@ -5312,7 +5313,7 @@ spa_nvlist_lookup_by_guid(nvlist_t **nvpp, int count, uint64_t target_guid)
 
 static void
 spa_vdev_remove_aux(nvlist_t *config, char *name, nvlist_t **dev, int count,
-       nvlist_t *dev_to_remove)
+    nvlist_t *dev_to_remove)
 {
        nvlist_t **newdev = NULL;
        int i, j;
@@ -6466,10 +6467,14 @@ spa_sync(spa_t *spa, uint64_t txg)
        dsl_pool_t *dp = spa->spa_dsl_pool;
        objset_t *mos = spa->spa_meta_objset;
        bplist_t *free_bpl = &spa->spa_free_bplist[txg & TXG_MASK];
+       metaslab_class_t *mc;
        vdev_t *rvd = spa->spa_root_vdev;
        vdev_t *vd;
        dmu_tx_t *tx;
        int error;
+       uint32_t max_queue_depth = zfs_vdev_async_write_max_active *
+           zfs_vdev_queue_depth_pct / 100;
+       uint64_t queue_depth_total;
        int c;
 
        VERIFY(spa_writeable(spa));
@@ -6482,6 +6487,10 @@ spa_sync(spa_t *spa, uint64_t txg)
        spa->spa_syncing_txg = txg;
        spa->spa_sync_pass = 0;
 
+       mutex_enter(&spa->spa_alloc_lock);
+       VERIFY0(avl_numnodes(&spa->spa_alloc_tree));
+       mutex_exit(&spa->spa_alloc_lock);
+
        /*
         * If there are any pending vdev state changes, convert them
         * into config changes that go out with this transaction group.
@@ -6535,6 +6544,38 @@ spa_sync(spa_t *spa, uint64_t txg)
                }
        }
 
+       /*
+        * Set the top-level vdev's max queue depth. Evaluate each
+        * top-level's async write queue depth in case it changed.
+        * The max queue depth will not change in the middle of syncing
+        * out this txg.
+        */
+       queue_depth_total = 0;
+       for (c = 0; c < rvd->vdev_children; c++) {
+               vdev_t *tvd = rvd->vdev_child[c];
+               metaslab_group_t *mg = tvd->vdev_mg;
+
+               if (mg == NULL || mg->mg_class != spa_normal_class(spa) ||
+                   !metaslab_group_initialized(mg))
+                       continue;
+
+               /*
+                * It is safe to do a lock-free check here because only async
+                * allocations look at mg_max_alloc_queue_depth, and async
+                * allocations all happen from spa_sync().
+                */
+               ASSERT0(refcount_count(&mg->mg_alloc_queue_depth));
+               mg->mg_max_alloc_queue_depth = max_queue_depth;
+               queue_depth_total += mg->mg_max_alloc_queue_depth;
+       }
+       mc = spa_normal_class(spa);
+       ASSERT0(refcount_count(&mc->mc_alloc_slots));
+       mc->mc_alloc_max_slots = queue_depth_total;
+       mc->mc_alloc_throttle_enabled = zio_dva_throttle_enabled;
+
+       ASSERT3U(mc->mc_alloc_max_slots, <=,
+           max_queue_depth * rvd->vdev_children);
+
        /*
         * Iterate to convergence.
         */
@@ -6689,6 +6730,10 @@ spa_sync(spa_t *spa, uint64_t txg)
 
        dsl_pool_sync_done(dp, txg);
 
+       mutex_enter(&spa->spa_alloc_lock);
+       VERIFY0(avl_numnodes(&spa->spa_alloc_tree));
+       mutex_exit(&spa->spa_alloc_lock);
+
        /*
         * Update usable space statistics.
         */
index 595e594ca97224c5e89cab97a65e5d6b8ce53adc..6ec05214ef134efa1cb9c03799f85c21af7caf9d 100644 (file)
@@ -564,6 +564,7 @@ spa_add(const char *name, nvlist_t *config, const char *altroot)
        mutex_init(&spa->spa_suspend_lock, NULL, MUTEX_DEFAULT, NULL);
        mutex_init(&spa->spa_vdev_top_lock, NULL, MUTEX_DEFAULT, NULL);
        mutex_init(&spa->spa_feat_stats_lock, NULL, MUTEX_DEFAULT, NULL);
+       mutex_init(&spa->spa_alloc_lock, NULL, MUTEX_DEFAULT, NULL);
 
        cv_init(&spa->spa_async_cv, NULL, CV_DEFAULT, NULL);
        cv_init(&spa->spa_evicting_os_cv, NULL, CV_DEFAULT, NULL);
@@ -596,6 +597,9 @@ spa_add(const char *name, nvlist_t *config, const char *altroot)
        if (altroot)
                spa->spa_root = spa_strdup(altroot);
 
+       avl_create(&spa->spa_alloc_tree, zio_timestamp_compare,
+           sizeof (zio_t), offsetof(zio_t, io_alloc_node));
+
        /*
         * Every pool starts with the default cachefile
         */
@@ -673,6 +677,7 @@ spa_remove(spa_t *spa)
                kmem_free(dp, sizeof (spa_config_dirent_t));
        }
 
+       avl_destroy(&spa->spa_alloc_tree);
        list_destroy(&spa->spa_config_list);
 
        nvlist_free(spa->spa_label_features);
@@ -696,6 +701,7 @@ spa_remove(spa_t *spa)
        cv_destroy(&spa->spa_scrub_io_cv);
        cv_destroy(&spa->spa_suspend_cv);
 
+       mutex_destroy(&spa->spa_alloc_lock);
        mutex_destroy(&spa->spa_async_lock);
        mutex_destroy(&spa->spa_errlist_lock);
        mutex_destroy(&spa->spa_errlog_lock);
index 104db3d153b1d3ca33171ecb406588b5fd24fdbf..5ff5cf3b1271b0d25e7697891bcec9ad913670fb 100644 (file)
@@ -351,6 +351,7 @@ vdev_alloc_common(spa_t *spa, uint_t id, uint64_t guid, vdev_ops_t *ops)
        mutex_init(&vd->vdev_dtl_lock, NULL, MUTEX_NOLOCKDEP, NULL);
        mutex_init(&vd->vdev_stat_lock, NULL, MUTEX_DEFAULT, NULL);
        mutex_init(&vd->vdev_probe_lock, NULL, MUTEX_DEFAULT, NULL);
+       mutex_init(&vd->vdev_queue_lock, NULL, MUTEX_DEFAULT, NULL);
        for (t = 0; t < DTL_TYPES; t++) {
                vd->vdev_dtl[t] = range_tree_create(NULL, NULL,
                    &vd->vdev_dtl_lock);
@@ -681,6 +682,7 @@ vdev_free(vdev_t *vd)
        }
        mutex_exit(&vd->vdev_dtl_lock);
 
+       mutex_destroy(&vd->vdev_queue_lock);
        mutex_destroy(&vd->vdev_dtl_lock);
        mutex_destroy(&vd->vdev_stat_lock);
        mutex_destroy(&vd->vdev_probe_lock);
@@ -990,6 +992,7 @@ vdev_probe_done(zio_t *zio)
                zio_buf_free(zio->io_data, zio->io_size);
        } else if (zio->io_type == ZIO_TYPE_NULL) {
                zio_t *pio;
+               zio_link_t *zl;
 
                vd->vdev_cant_read |= !vps->vps_readable;
                vd->vdev_cant_write |= !vps->vps_writeable;
@@ -1009,7 +1012,8 @@ vdev_probe_done(zio_t *zio)
                vd->vdev_probe_zio = NULL;
                mutex_exit(&vd->vdev_probe_lock);
 
-               while ((pio = zio_walk_parents(zio)) != NULL)
+               zl = NULL;
+               while ((pio = zio_walk_parents(zio, &zl)) != NULL)
                        if (!vdev_accessible(vd, pio))
                                pio->io_error = SET_ERROR(ENXIO);
 
@@ -2754,7 +2758,8 @@ vdev_allocatable(vdev_t *vd)
         * we're asking two separate questions about it.
         */
        return (!(state < VDEV_STATE_DEGRADED && state != VDEV_STATE_CLOSED) &&
-           !vd->vdev_cant_write && !vd->vdev_ishole);
+           !vd->vdev_cant_write && !vd->vdev_ishole &&
+           vd->vdev_mg->mg_initialized);
 }
 
 boolean_t
index d7de7c5c90dee94f6b090a9fa612c5041c5b9c70..321ea4a2f38c32419c36acf0aa6dfda1886cd52b 100644 (file)
@@ -23,7 +23,7 @@
  * Use is subject to license terms.
  */
 /*
- * Copyright (c) 2013 by Delphix. All rights reserved.
+ * Copyright (c) 2013, 2015 by Delphix. All rights reserved.
  */
 
 #include <sys/zfs_context.h>
@@ -214,6 +214,7 @@ vdev_cache_fill(zio_t *fio)
        vdev_cache_t *vc = &vd->vdev_cache;
        vdev_cache_entry_t *ve = fio->io_private;
        zio_t *pio;
+       zio_link_t *zl;
 
        ASSERT(fio->io_size == VCBS);
 
@@ -233,7 +234,8 @@ vdev_cache_fill(zio_t *fio)
         * any reads that were queued up before the missed update are still
         * valid, so we can satisfy them from this line before we evict it.
         */
-       while ((pio = zio_walk_parents(fio)) != NULL)
+       zl = NULL;
+       while ((pio = zio_walk_parents(fio, &zl)) != NULL)
                vdev_cache_hit(vc, ve, pio);
 
        if (fio->io_error || ve->ve_missed_update)
index d3dbdca79a427da1da608718c863440b97e25fa9..7803111954041f1a0850c2c6ba1e538623b3390d 100644 (file)
@@ -24,7 +24,7 @@
  */
 
 /*
- * Copyright (c) 2012, 2014 by Delphix. All rights reserved.
+ * Copyright (c) 2012, 2015 by Delphix. All rights reserved.
  */
 
 #include <sys/zfs_context.h>
@@ -266,9 +266,10 @@ vdev_mirror_scrub_done(zio_t *zio)
 
        if (zio->io_error == 0) {
                zio_t *pio;
+               zio_link_t *zl = NULL;
 
                mutex_enter(&zio->io_lock);
-               while ((pio = zio_walk_parents(zio)) != NULL) {
+               while ((pio = zio_walk_parents(zio, &zl)) != NULL) {
                        mutex_enter(&pio->io_lock);
                        ASSERT3U(zio->io_size, >=, pio->io_size);
                        bcopy(zio->io_data, pio->io_data, pio->io_size);
index 4cffa500b4b1a6d06518433ca42bd305de3c0eb3..8f394eef5b6589e430972472365c8fc8829619e8 100644 (file)
@@ -33,6 +33,7 @@
 #include <sys/zio.h>
 #include <sys/avl.h>
 #include <sys/dsl_pool.h>
+#include <sys/metaslab_impl.h>
 #include <sys/spa.h>
 #include <sys/spa_impl.h>
 #include <sys/kstat.h>
@@ -171,6 +172,23 @@ int zfs_vdev_aggregation_limit = SPA_OLD_MAXBLOCKSIZE;
 int zfs_vdev_read_gap_limit = 32 << 10;
 int zfs_vdev_write_gap_limit = 4 << 10;
 
+/*
+ * Define the queue depth percentage for each top-level. This percentage is
+ * used in conjunction with zfs_vdev_async_max_active to determine how many
+ * allocations a specific top-level vdev should handle. Once the queue depth
+ * reaches zfs_vdev_queue_depth_pct * zfs_vdev_async_write_max_active / 100
+ * then allocator will stop allocating blocks on that top-level device.
+ * The default kernel setting is 1000% which will yield 100 allocations per
+ * device. For userland testing, the default setting is 300% which equates
+ * to 30 allocations per device.
+ */
+#ifdef _KERNEL
+int zfs_vdev_queue_depth_pct = 1000;
+#else
+int zfs_vdev_queue_depth_pct = 300;
+#endif
+
+
 int
 vdev_queue_offset_compare(const void *x1, const void *x2)
 {
@@ -476,7 +494,8 @@ vdev_queue_agg_io_done(zio_t *aio)
 {
        if (aio->io_type == ZIO_TYPE_READ) {
                zio_t *pio;
-               while ((pio = zio_walk_parents(aio)) != NULL) {
+               zio_link_t *zl = NULL;
+               while ((pio = zio_walk_parents(aio, &zl)) != NULL) {
                        bcopy((char *)aio->io_data + (pio->io_offset -
                            aio->io_offset), pio->io_data, pio->io_size);
                }
@@ -856,4 +875,8 @@ MODULE_PARM_DESC(zfs_vdev_sync_write_max_active,
 module_param(zfs_vdev_sync_write_min_active, int, 0644);
 MODULE_PARM_DESC(zfs_vdev_sync_write_min_active,
        "Min active sync write I/Os per vdev");
+
+module_param(zfs_vdev_queue_depth_pct, int, 0644);
+MODULE_PARM_DESC(zfs_vdev_queue_depth_pct,
+       "Queue depth percentage for each top-level vdev");
 #endif
index 8a063ab7fc8c5dff1b19d525ed57ec39861a9650..0147cb17c1aed8192de84bede23a80d832cadca9 100644 (file)
@@ -39,6 +39,7 @@
 #include <sys/ddt.h>
 #include <sys/blkptr.h>
 #include <sys/zfeature.h>
+#include <sys/metaslab_impl.h>
 #include <sys/time.h>
 #include <sys/trace_zio.h>
 
  * ==========================================================================
  */
 const char *zio_type_name[ZIO_TYPES] = {
+       /*
+        * Note: Linux kernel thread name length is limited
+        * so these names will differ from upstream open zfs.
+        */
        "z_null", "z_rd", "z_wr", "z_fr", "z_cl", "z_ioctl"
 };
 
+int zio_dva_throttle_enabled = B_TRUE;
+
 /*
  * ==========================================================================
  * I/O kmem caches
@@ -100,6 +107,8 @@ int zio_buf_debug_limit = 0;
 
 static inline void __zio_execute(zio_t *zio);
 
+static void zio_taskq_dispatch(zio_t *, zio_taskq_type_t, boolean_t);
+
 void
 zio_init(void)
 {
@@ -368,52 +377,39 @@ zio_decompress(zio_t *zio, void *data, uint64_t size)
  * I/O parent/child relationships and pipeline interlocks
  * ==========================================================================
  */
-/*
- * NOTE - Callers to zio_walk_parents() and zio_walk_children must
- *        continue calling these functions until they return NULL.
- *        Otherwise, the next caller will pick up the list walk in
- *        some indeterminate state.  (Otherwise every caller would
- *        have to pass in a cookie to keep the state represented by
- *        io_walk_link, which gets annoying.)
- */
 zio_t *
-zio_walk_parents(zio_t *cio)
+zio_walk_parents(zio_t *cio, zio_link_t **zl)
 {
-       zio_link_t *zl = cio->io_walk_link;
        list_t *pl = &cio->io_parent_list;
 
-       zl = (zl == NULL) ? list_head(pl) : list_next(pl, zl);
-       cio->io_walk_link = zl;
-
-       if (zl == NULL)
+       *zl = (*zl == NULL) ? list_head(pl) : list_next(pl, *zl);
+       if (*zl == NULL)
                return (NULL);
 
-       ASSERT(zl->zl_child == cio);
-       return (zl->zl_parent);
+       ASSERT((*zl)->zl_child == cio);
+       return ((*zl)->zl_parent);
 }
 
 zio_t *
-zio_walk_children(zio_t *pio)
+zio_walk_children(zio_t *pio, zio_link_t **zl)
 {
-       zio_link_t *zl = pio->io_walk_link;
        list_t *cl = &pio->io_child_list;
 
-       zl = (zl == NULL) ? list_head(cl) : list_next(cl, zl);
-       pio->io_walk_link = zl;
-
-       if (zl == NULL)
+       *zl = (*zl == NULL) ? list_head(cl) : list_next(cl, *zl);
+       if (*zl == NULL)
                return (NULL);
 
-       ASSERT(zl->zl_parent == pio);
-       return (zl->zl_child);
+       ASSERT((*zl)->zl_parent == pio);
+       return ((*zl)->zl_child);
 }
 
 zio_t *
 zio_unique_parent(zio_t *cio)
 {
-       zio_t *pio = zio_walk_parents(cio);
+       zio_link_t *zl = NULL;
+       zio_t *pio = zio_walk_parents(cio, &zl);
 
-       VERIFY(zio_walk_parents(cio) == NULL);
+       VERIFY3P(zio_walk_parents(cio, &zl), ==, NULL);
        return (pio);
 }
 
@@ -469,7 +465,6 @@ zio_remove_child(zio_t *pio, zio_t *cio, zio_link_t *zl)
 
        mutex_exit(&pio->io_lock);
        mutex_exit(&cio->io_lock);
-
        kmem_cache_free(zio_link_cache, zl);
 }
 
@@ -483,6 +478,7 @@ zio_wait_for_children(zio_t *zio, enum zio_child child, enum zio_wait_type wait)
        ASSERT(zio->io_stall == NULL);
        if (*countp != 0) {
                zio->io_stage >>= 1;
+               ASSERT3U(zio->io_stage, !=, ZIO_STAGE_OPEN);
                zio->io_stall = countp;
                waiting = B_TRUE;
        }
@@ -507,9 +503,18 @@ zio_notify_parent(zio_t *pio, zio_t *zio, enum zio_wait_type wait)
        (*countp)--;
 
        if (*countp == 0 && pio->io_stall == countp) {
+               zio_taskq_type_t type =
+                   pio->io_stage < ZIO_STAGE_VDEV_IO_START ? ZIO_TASKQ_ISSUE :
+                   ZIO_TASKQ_INTERRUPT;
                pio->io_stall = NULL;
                mutex_exit(&pio->io_lock);
-               __zio_execute(pio);
+               /*
+                * Dispatch the parent zio in its own taskq so that
+                * the child can continue to make progress. This also
+                * prevents overflowing the stack when we have deeply nested
+                * parent-child relationships.
+                */
+               zio_taskq_dispatch(pio, type, B_FALSE);
        } else {
                mutex_exit(&pio->io_lock);
        }
@@ -522,6 +527,24 @@ zio_inherit_child_errors(zio_t *zio, enum zio_child c)
                zio->io_error = zio->io_child_error[c];
 }
 
+int
+zio_timestamp_compare(const void *x1, const void *x2)
+{
+       const zio_t *z1 = x1;
+       const zio_t *z2 = x2;
+       int cmp;
+
+       cmp = AVL_CMP(z1->io_queued_timestamp, z2->io_queued_timestamp);
+       if (likely(cmp))
+               return (cmp);
+
+       cmp = AVL_CMP(z1->io_offset, z2->io_offset);
+       if (likely(cmp))
+               return (cmp);
+
+       return (AVL_PCMP(z1, z2));
+}
+
 /*
  * ==========================================================================
  * Create the various types of I/O (read, write, free, etc)
@@ -594,6 +617,7 @@ zio_create(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
        zio->io_orig_flags = zio->io_flags = flags;
        zio->io_orig_stage = zio->io_stage = stage;
        zio->io_orig_pipeline = zio->io_pipeline = pipeline;
+       zio->io_pipeline_trace = ZIO_STAGE_OPEN;
 
        zio->io_state[ZIO_WAIT_READY] = (stage >= ZIO_STAGE_READY);
        zio->io_state[ZIO_WAIT_DONE] = (stage >= ZIO_STAGE_DONE);
@@ -797,7 +821,7 @@ zio_rewrite(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp, void *data,
        zio_t *zio;
 
        zio = zio_create(pio, spa, txg, bp, data, size, size, done, private,
-           ZIO_TYPE_WRITE, priority, flags, NULL, 0, zb,
+           ZIO_TYPE_WRITE, priority, flags | ZIO_FLAG_IO_REWRITE, NULL, 0, zb,
            ZIO_STAGE_OPEN, ZIO_REWRITE_PIPELINE);
 
        return (zio);
@@ -912,6 +936,7 @@ zio_claim(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
        zio = zio_create(pio, spa, txg, bp, NULL, BP_GET_PSIZE(bp),
            BP_GET_PSIZE(bp), done, private, ZIO_TYPE_CLAIM, ZIO_PRIORITY_NOW,
            flags, NULL, 0, NULL, ZIO_STAGE_OPEN, ZIO_CLAIM_PIPELINE);
+       ASSERT0(zio->io_queued_timestamp);
 
        return (zio);
 }
@@ -1031,9 +1056,31 @@ zio_vdev_child_io(zio_t *pio, blkptr_t *bp, vdev_t *vd, uint64_t offset,
        if (flags & ZIO_FLAG_IO_REPAIR)
                flags &= ~ZIO_FLAG_SPECULATIVE;
 
+       /*
+        * If we're creating a child I/O that is not associated with a
+        * top-level vdev, then the child zio is not an allocating I/O.
+        * If this is a retried I/O then we ignore it since we will
+        * have already processed the original allocating I/O.
+        */
+       if (flags & ZIO_FLAG_IO_ALLOCATING &&
+           (vd != vd->vdev_top || (flags & ZIO_FLAG_IO_RETRY))) {
+               metaslab_class_t *mc = spa_normal_class(pio->io_spa);
+
+               ASSERT(mc->mc_alloc_throttle_enabled);
+               ASSERT(type == ZIO_TYPE_WRITE);
+               ASSERT(priority == ZIO_PRIORITY_ASYNC_WRITE);
+               ASSERT(!(flags & ZIO_FLAG_IO_REPAIR));
+               ASSERT(!(pio->io_flags & ZIO_FLAG_IO_REWRITE) ||
+                   pio->io_child_type == ZIO_CHILD_GANG);
+
+               flags &= ~ZIO_FLAG_IO_ALLOCATING;
+       }
+
+
        zio = zio_create(pio, pio->io_spa, pio->io_txg, bp, data, size, size,
            done, private, type, priority, flags, vd, offset, &pio->io_bookmark,
            ZIO_STAGE_VDEV_IO_START >> 1, pipeline);
+       ASSERT3U(zio->io_child_type, ==, ZIO_CHILD_VDEV);
 
        zio->io_physdone = pio->io_physdone;
        if (vd->vdev_ops->vdev_op_leaf && zio->io_logical != NULL)
@@ -1131,40 +1178,16 @@ zio_read_bp_init(zio_t *zio)
 static int
 zio_write_bp_init(zio_t *zio)
 {
-       spa_t *spa = zio->io_spa;
-       zio_prop_t *zp = &zio->io_prop;
-       enum zio_compress compress = zp->zp_compress;
-       blkptr_t *bp = zio->io_bp;
-       uint64_t lsize = zio->io_lsize;
-       uint64_t psize = zio->io_size;
-       int pass = 1;
-
-       EQUIV(lsize != psize, (zio->io_flags & ZIO_FLAG_RAW) != 0);
-
-       /*
-        * If our children haven't all reached the ready stage,
-        * wait for them and then repeat this pipeline stage.
-        */
-       if (zio_wait_for_children(zio, ZIO_CHILD_GANG, ZIO_WAIT_READY) ||
-           zio_wait_for_children(zio, ZIO_CHILD_LOGICAL, ZIO_WAIT_READY))
-               return (ZIO_PIPELINE_STOP);
 
        if (!IO_IS_ALLOCATING(zio))
                return (ZIO_PIPELINE_CONTINUE);
 
-       if (zio->io_children_ready != NULL) {
-               /*
-                * Now that all our children are ready, run the callback
-                * associated with this zio in case it wants to modify the
-                * data to be written.
-                */
-               ASSERT3U(zp->zp_level, >, 0);
-               zio->io_children_ready(zio);
-       }
-
        ASSERT(zio->io_child_type != ZIO_CHILD_DDT);
 
        if (zio->io_bp_override) {
+               blkptr_t *bp = zio->io_bp;
+               zio_prop_t *zp = &zio->io_prop;
+
                ASSERT(bp->blk_birth != zio->io_txg);
                ASSERT(BP_GET_DEDUP(zio->io_bp_override) == 0);
 
@@ -1181,6 +1204,7 @@ zio_write_bp_init(zio_t *zio)
                 */
                if (!BP_IS_HOLE(bp) && zp->zp_nopwrite) {
                        ASSERT(!zp->zp_dedup);
+                       ASSERT3U(BP_GET_CHECKSUM(bp), ==, zp->zp_checksum);
                        zio->io_flags |= ZIO_FLAG_NOPWRITE;
                        return (ZIO_PIPELINE_CONTINUE);
                }
@@ -1198,10 +1222,56 @@ zio_write_bp_init(zio_t *zio)
                        zio->io_pipeline |= ZIO_STAGE_DDT_WRITE;
                        return (ZIO_PIPELINE_CONTINUE);
                }
+
+               /*
+                * We were unable to handle this as an override bp, treat
+                * it as a regular write I/O.
+                */
                zio->io_bp_override = NULL;
-               BP_ZERO(bp);
+               *bp = zio->io_bp_orig;
+               zio->io_pipeline = zio->io_orig_pipeline;
+       }
+
+       return (ZIO_PIPELINE_CONTINUE);
+}
+
+static int
+zio_write_compress(zio_t *zio)
+{
+       spa_t *spa = zio->io_spa;
+       zio_prop_t *zp = &zio->io_prop;
+       enum zio_compress compress = zp->zp_compress;
+       blkptr_t *bp = zio->io_bp;
+       uint64_t lsize = zio->io_lsize;
+       uint64_t psize = zio->io_size;
+       int pass = 1;
+
+       EQUIV(lsize != psize, (zio->io_flags & ZIO_FLAG_RAW) != 0);
+
+       /*
+        * If our children haven't all reached the ready stage,
+        * wait for them and then repeat this pipeline stage.
+        */
+       if (zio_wait_for_children(zio, ZIO_CHILD_GANG, ZIO_WAIT_READY) ||
+           zio_wait_for_children(zio, ZIO_CHILD_LOGICAL, ZIO_WAIT_READY))
+               return (ZIO_PIPELINE_STOP);
+
+       if (!IO_IS_ALLOCATING(zio))
+               return (ZIO_PIPELINE_CONTINUE);
+
+       if (zio->io_children_ready != NULL) {
+               /*
+                * Now that all our children are ready, run the callback
+                * associated with this zio in case it wants to modify the
+                * data to be written.
+                */
+               ASSERT3U(zp->zp_level, >, 0);
+               zio->io_children_ready(zio);
        }
 
+       ASSERT(zio->io_child_type != ZIO_CHILD_DDT);
+       ASSERT(zio->io_bp_override == NULL);
+
        if (!BP_IS_HOLE(bp) && bp->blk_birth == zio->io_txg) {
                /*
                 * We're rewriting an existing block, which means we're
@@ -1273,6 +1343,15 @@ zio_write_bp_init(zio_t *zio)
                                    psize, lsize, NULL);
                        }
                }
+
+               /*
+                * We were unable to handle this as an override bp, treat
+                * it as a regular write I/O.
+                */
+               zio->io_bp_override = NULL;
+               *bp = zio->io_bp_orig;
+               zio->io_pipeline = zio->io_orig_pipeline;
+
        } else {
                ASSERT3U(psize, !=, 0);
 
@@ -1328,7 +1407,6 @@ zio_write_bp_init(zio_t *zio)
                        zio->io_pipeline |= ZIO_STAGE_NOP_WRITE;
                }
        }
-
        return (ZIO_PIPELINE_CONTINUE);
 }
 
@@ -1559,6 +1637,8 @@ __zio_execute(zio_t *zio)
 {
        zio->io_executor = curthread;
 
+       ASSERT3U(zio->io_queued_timestamp, >, 0);
+
        while (zio->io_stage < ZIO_STAGE_DONE) {
                enum zio_stage pipeline = zio->io_pipeline;
                enum zio_stage stage = zio->io_stage;
@@ -1603,6 +1683,7 @@ __zio_execute(zio_t *zio)
                }
 
                zio->io_stage = stage;
+               zio->io_pipeline_trace |= zio->io_stage;
                rv = zio_pipeline[highbit64(stage) - 1](zio);
 
                if (rv == ZIO_PIPELINE_STOP)
@@ -1627,6 +1708,8 @@ zio_wait(zio_t *zio)
        ASSERT(zio->io_executor == NULL);
 
        zio->io_waiter = curthread;
+       ASSERT0(zio->io_queued_timestamp);
+       zio->io_queued_timestamp = gethrtime();
 
        __zio_execute(zio);
 
@@ -1663,6 +1746,8 @@ zio_nowait(zio_t *zio)
                zio_add_child(pio, zio);
        }
 
+       ASSERT0(zio->io_queued_timestamp);
+       zio->io_queued_timestamp = gethrtime();
        __zio_execute(zio);
 }
 
@@ -1677,6 +1762,7 @@ zio_reexecute(zio_t *pio)
 {
        zio_t *cio, *cio_next;
        int c, w;
+       zio_link_t *zl = NULL;
 
        ASSERT(pio->io_child_type == ZIO_CHILD_LOGICAL);
        ASSERT(pio->io_orig_stage == ZIO_STAGE_OPEN);
@@ -1688,6 +1774,7 @@ zio_reexecute(zio_t *pio)
        pio->io_pipeline = pio->io_orig_pipeline;
        pio->io_reexecute = 0;
        pio->io_flags |= ZIO_FLAG_REEXECUTED;
+       pio->io_pipeline_trace = 0;
        pio->io_error = 0;
        for (w = 0; w < ZIO_WAIT_TYPES; w++)
                pio->io_state[w] = 0;
@@ -1704,8 +1791,8 @@ zio_reexecute(zio_t *pio)
         * the remainder of pio's io_child_list, from 'cio_next' onward,
         * cannot be affected by any side effects of reexecuting 'cio'.
         */
-       for (cio = zio_walk_children(pio); cio != NULL; cio = cio_next) {
-               cio_next = zio_walk_children(pio);
+       for (cio = zio_walk_children(pio, &zl); cio != NULL; cio = cio_next) {
+               cio_next = zio_walk_children(pio, &zl);
                mutex_enter(&pio->io_lock);
                for (w = 0; w < ZIO_WAIT_TYPES; w++)
                        pio->io_children[cio->io_child_type][w]++;
@@ -1718,8 +1805,10 @@ zio_reexecute(zio_t *pio)
         * We don't reexecute "The Godfather" I/O here as it's the
         * responsibility of the caller to wait on him.
         */
-       if (!(pio->io_flags & ZIO_FLAG_GODFATHER))
+       if (!(pio->io_flags & ZIO_FLAG_GODFATHER)) {
+               pio->io_queued_timestamp = gethrtime();
                __zio_execute(pio);
+       }
 }
 
 void
@@ -2120,6 +2209,7 @@ static int
 zio_write_gang_block(zio_t *pio)
 {
        spa_t *spa = pio->io_spa;
+       metaslab_class_t *mc = spa_normal_class(spa);
        blkptr_t *bp = pio->io_bp;
        zio_t *gio = pio->io_gang_leader;
        zio_t *zio;
@@ -2133,10 +2223,44 @@ zio_write_gang_block(zio_t *pio)
        zio_prop_t zp;
        int g, error;
 
-       error = metaslab_alloc(spa, spa_normal_class(spa), SPA_GANGBLOCKSIZE,
-           bp, gbh_copies, txg, pio == gio ? NULL : gio->io_bp,
-           METASLAB_HINTBP_FAVOR | METASLAB_GANG_HEADER);
+       int flags = METASLAB_HINTBP_FAVOR | METASLAB_GANG_HEADER;
+       if (pio->io_flags & ZIO_FLAG_IO_ALLOCATING) {
+               ASSERT(pio->io_priority == ZIO_PRIORITY_ASYNC_WRITE);
+               ASSERT(!(pio->io_flags & ZIO_FLAG_NODATA));
+
+               flags |= METASLAB_ASYNC_ALLOC;
+               VERIFY(refcount_held(&mc->mc_alloc_slots, pio));
+
+               /*
+                * The logical zio has already placed a reservation for
+                * 'copies' allocation slots but gang blocks may require
+                * additional copies. These additional copies
+                * (i.e. gbh_copies - copies) are guaranteed to succeed
+                * since metaslab_class_throttle_reserve() always allows
+                * additional reservations for gang blocks.
+                */
+               VERIFY(metaslab_class_throttle_reserve(mc, gbh_copies - copies,
+                   pio, flags));
+       }
+
+       error = metaslab_alloc(spa, mc, SPA_GANGBLOCKSIZE,
+           bp, gbh_copies, txg, pio == gio ? NULL : gio->io_bp, flags, pio);
        if (error) {
+               if (pio->io_flags & ZIO_FLAG_IO_ALLOCATING) {
+                       ASSERT(pio->io_priority == ZIO_PRIORITY_ASYNC_WRITE);
+                       ASSERT(!(pio->io_flags & ZIO_FLAG_NODATA));
+
+                       /*
+                        * If we failed to allocate the gang block header then
+                        * we remove any additional allocation reservations that
+                        * we placed here. The original reservation will
+                        * be removed when the logical I/O goes to the ready
+                        * stage.
+                        */
+                       metaslab_class_throttle_unreserve(mc,
+                           gbh_copies - copies, pio);
+               }
+
                pio->io_error = error;
                return (ZIO_PIPELINE_CONTINUE);
        }
@@ -2162,6 +2286,8 @@ zio_write_gang_block(zio_t *pio)
         * Create and nowait the gang children.
         */
        for (g = 0; resid != 0; resid -= lsize, g++) {
+               zio_t *cio;
+
                lsize = P2ROUNDUP(resid / (SPA_GBH_NBLKPTRS - g),
                    SPA_MINBLOCKSIZE);
                ASSERT(lsize >= SPA_MINBLOCKSIZE && lsize <= resid);
@@ -2175,11 +2301,26 @@ zio_write_gang_block(zio_t *pio)
                zp.zp_dedup_verify = B_FALSE;
                zp.zp_nopwrite = B_FALSE;
 
-               zio_nowait(zio_write(zio, spa, txg, &gbh->zg_blkptr[g],
+               cio = zio_write(zio, spa, txg, &gbh->zg_blkptr[g],
                    (char *)pio->io_data + (pio->io_size - resid), lsize,
                    lsize, &zp, zio_write_gang_member_ready, NULL, NULL, NULL,
                    &gn->gn_child[g], pio->io_priority,
-                   ZIO_GANG_CHILD_FLAGS(pio), &pio->io_bookmark));
+                   ZIO_GANG_CHILD_FLAGS(pio), &pio->io_bookmark);
+
+               if (pio->io_flags & ZIO_FLAG_IO_ALLOCATING) {
+                       ASSERT(pio->io_priority == ZIO_PRIORITY_ASYNC_WRITE);
+                       ASSERT(!(pio->io_flags & ZIO_FLAG_NODATA));
+
+                       /*
+                        * Gang children won't throttle but we should
+                        * account for their work, so reserve an allocation
+                        * slot for them here.
+                        */
+                       VERIFY(metaslab_class_throttle_reserve(mc,
+                           zp.zp_copies, cio, flags));
+               }
+               zio_nowait(cio);
+
        }
 
        /*
@@ -2478,6 +2619,7 @@ zio_ddt_child_write_ready(zio_t *zio)
        ddt_entry_t *dde = zio->io_private;
        ddt_phys_t *ddp = &dde->dde_phys[p];
        zio_t *pio;
+       zio_link_t *zl;
 
        if (zio->io_error)
                return;
@@ -2488,7 +2630,8 @@ zio_ddt_child_write_ready(zio_t *zio)
 
        ddt_phys_fill(ddp, zio->io_bp);
 
-       while ((pio = zio_walk_parents(zio)) != NULL)
+       zl = NULL;
+       while ((pio = zio_walk_parents(zio, &zl)) != NULL)
                ddt_bp_fill(ddp, pio->io_bp, zio->io_txg);
 
        ddt_exit(ddt);
@@ -2509,7 +2652,8 @@ zio_ddt_child_write_done(zio_t *zio)
        dde->dde_lead_zio[p] = NULL;
 
        if (zio->io_error == 0) {
-               while (zio_walk_parents(zio) != NULL)
+               zio_link_t *zl = NULL;
+               while (zio_walk_parents(zio, &zl) != NULL)
                        ddt_phys_addref(ddp);
        } else {
                ddt_phys_clear(ddp);
@@ -2691,6 +2835,97 @@ zio_ddt_free(zio_t *zio)
  * Allocate and free blocks
  * ==========================================================================
  */
+
+static zio_t *
+zio_io_to_allocate(spa_t *spa)
+{
+       zio_t *zio;
+
+       ASSERT(MUTEX_HELD(&spa->spa_alloc_lock));
+
+       zio = avl_first(&spa->spa_alloc_tree);
+       if (zio == NULL)
+               return (NULL);
+
+       ASSERT(IO_IS_ALLOCATING(zio));
+
+       /*
+        * Try to place a reservation for this zio. If we're unable to
+        * reserve then we throttle.
+        */
+       if (!metaslab_class_throttle_reserve(spa_normal_class(spa),
+           zio->io_prop.zp_copies, zio, 0)) {
+               return (NULL);
+       }
+
+       avl_remove(&spa->spa_alloc_tree, zio);
+       ASSERT3U(zio->io_stage, <, ZIO_STAGE_DVA_ALLOCATE);
+
+       return (zio);
+}
+
+static int
+zio_dva_throttle(zio_t *zio)
+{
+       spa_t *spa = zio->io_spa;
+       zio_t *nio;
+
+       if (zio->io_priority == ZIO_PRIORITY_SYNC_WRITE ||
+           !spa_normal_class(zio->io_spa)->mc_alloc_throttle_enabled ||
+           zio->io_child_type == ZIO_CHILD_GANG ||
+           zio->io_flags & ZIO_FLAG_NODATA) {
+               return (ZIO_PIPELINE_CONTINUE);
+       }
+
+       ASSERT(zio->io_child_type > ZIO_CHILD_GANG);
+
+       ASSERT3U(zio->io_queued_timestamp, >, 0);
+       ASSERT(zio->io_stage == ZIO_STAGE_DVA_THROTTLE);
+
+       mutex_enter(&spa->spa_alloc_lock);
+
+       ASSERT(zio->io_type == ZIO_TYPE_WRITE);
+       avl_add(&spa->spa_alloc_tree, zio);
+
+       nio = zio_io_to_allocate(zio->io_spa);
+       mutex_exit(&spa->spa_alloc_lock);
+
+       if (nio == zio)
+               return (ZIO_PIPELINE_CONTINUE);
+
+       if (nio != NULL) {
+               ASSERT3U(nio->io_queued_timestamp, <=,
+                   zio->io_queued_timestamp);
+               ASSERT(nio->io_stage == ZIO_STAGE_DVA_THROTTLE);
+               /*
+                * We are passing control to a new zio so make sure that
+                * it is processed by a different thread. We do this to
+                * avoid stack overflows that can occur when parents are
+                * throttled and children are making progress. We allow
+                * it to go to the head of the taskq since it's already
+                * been waiting.
+                */
+               zio_taskq_dispatch(nio, ZIO_TASKQ_ISSUE, B_TRUE);
+       }
+       return (ZIO_PIPELINE_STOP);
+}
+
+void
+zio_allocate_dispatch(spa_t *spa)
+{
+       zio_t *zio;
+
+       mutex_enter(&spa->spa_alloc_lock);
+       zio = zio_io_to_allocate(spa);
+       mutex_exit(&spa->spa_alloc_lock);
+       if (zio == NULL)
+               return;
+
+       ASSERT3U(zio->io_stage, ==, ZIO_STAGE_DVA_THROTTLE);
+       ASSERT0(zio->io_error);
+       zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE, B_TRUE);
+}
+
 static int
 zio_dva_allocate(zio_t *zio)
 {
@@ -2711,19 +2946,18 @@ zio_dva_allocate(zio_t *zio)
        ASSERT3U(zio->io_prop.zp_copies, <=, spa_max_replication(spa));
        ASSERT3U(zio->io_size, ==, BP_GET_PSIZE(bp));
 
-       /*
-        * The dump device does not support gang blocks so allocation on
-        * behalf of the dump device (i.e. ZIO_FLAG_NODATA) must avoid
-        * the "fast" gang feature.
-        */
-       flags |= (zio->io_flags & ZIO_FLAG_NODATA) ? METASLAB_GANG_AVOID : 0;
-       flags |= (zio->io_flags & ZIO_FLAG_GANG_CHILD) ?
-           METASLAB_GANG_CHILD : 0;
        flags |= (zio->io_flags & ZIO_FLAG_FASTWRITE) ? METASLAB_FASTWRITE : 0;
+       if (zio->io_flags & ZIO_FLAG_NODATA)
+               flags |= METASLAB_DONT_THROTTLE;
+       if (zio->io_flags & ZIO_FLAG_GANG_CHILD)
+               flags |= METASLAB_GANG_CHILD;
+       if (zio->io_priority == ZIO_PRIORITY_ASYNC_WRITE)
+               flags |= METASLAB_ASYNC_ALLOC;
+
        error = metaslab_alloc(spa, mc, zio->io_size, bp,
-           zio->io_prop.zp_copies, zio->io_txg, NULL, flags);
+           zio->io_prop.zp_copies, zio->io_txg, NULL, flags, zio);
 
-       if (error) {
+       if (error != 0) {
                spa_dbgmsg(spa, "%s: metaslab allocation failure: zio %p, "
                    "size %llu, error %d", spa_name(spa), zio, zio->io_size,
                    error);
@@ -2790,21 +3024,14 @@ zio_alloc_zil(spa_t *spa, uint64_t txg, blkptr_t *new_bp, uint64_t size,
 
        ASSERT(txg > spa_syncing_txg(spa));
 
-       /*
-        * ZIL blocks are always contiguous (i.e. not gang blocks) so we
-        * set the METASLAB_GANG_AVOID flag so that they don't "fast gang"
-        * when allocating them.
-        */
        if (use_slog) {
                error = metaslab_alloc(spa, spa_log_class(spa), size,
-                   new_bp, 1, txg, NULL,
-                   METASLAB_FASTWRITE | METASLAB_GANG_AVOID);
+                   new_bp, 1, txg, NULL, METASLAB_FASTWRITE, NULL);
        }
 
        if (error) {
                error = metaslab_alloc(spa, spa_normal_class(spa), size,
-                   new_bp, 1, txg, NULL,
-                   METASLAB_FASTWRITE);
+                   new_bp, 1, txg, NULL, METASLAB_FASTWRITE, NULL);
        }
 
        if (error == 0) {
@@ -2875,6 +3102,8 @@ zio_vdev_io_start(zio_t *zio)
                return (ZIO_PIPELINE_STOP);
        }
 
+       ASSERT3P(zio->io_logical, !=, zio);
+
        /*
         * We keep track of time-sensitive I/Os so that the scan thread
         * can quickly react to certain workloads.  In particular, we care
@@ -3252,6 +3481,7 @@ zio_ready(zio_t *zio)
 {
        blkptr_t *bp = zio->io_bp;
        zio_t *pio, *pio_next;
+       zio_link_t *zl = NULL;
 
        if (zio_wait_for_children(zio, ZIO_CHILD_GANG, ZIO_WAIT_READY) ||
            zio_wait_for_children(zio, ZIO_CHILD_DDT, ZIO_WAIT_READY))
@@ -3269,12 +3499,26 @@ zio_ready(zio_t *zio)
        if (bp != NULL && bp != &zio->io_bp_copy)
                zio->io_bp_copy = *bp;
 
-       if (zio->io_error)
+       if (zio->io_error != 0) {
                zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
 
+               if (zio->io_flags & ZIO_FLAG_IO_ALLOCATING) {
+                       ASSERT(IO_IS_ALLOCATING(zio));
+                       ASSERT(zio->io_priority == ZIO_PRIORITY_ASYNC_WRITE);
+                       /*
+                        * We were unable to allocate anything, unreserve and
+                        * issue the next I/O to allocate.
+                        */
+                       metaslab_class_throttle_unreserve(
+                           spa_normal_class(zio->io_spa),
+                           zio->io_prop.zp_copies, zio);
+                       zio_allocate_dispatch(zio->io_spa);
+               }
+       }
+
        mutex_enter(&zio->io_lock);
        zio->io_state[ZIO_WAIT_READY] = 1;
-       pio = zio_walk_parents(zio);
+       pio = zio_walk_parents(zio, &zl);
        mutex_exit(&zio->io_lock);
 
        /*
@@ -3285,7 +3529,7 @@ zio_ready(zio_t *zio)
         * all parents must wait for us to be done before they can be done.
         */
        for (; pio != NULL; pio = pio_next) {
-               pio_next = zio_walk_parents(zio);
+               pio_next = zio_walk_parents(zio, &zl);
                zio_notify_parent(pio, zio, ZIO_WAIT_READY);
        }
 
@@ -3305,11 +3549,76 @@ zio_ready(zio_t *zio)
        return (ZIO_PIPELINE_CONTINUE);
 }
 
+/*
+ * Update the allocation throttle accounting.
+ */
+static void
+zio_dva_throttle_done(zio_t *zio)
+{
+       zio_t *lio = zio->io_logical;
+       zio_t *pio = zio_unique_parent(zio);
+       vdev_t *vd = zio->io_vd;
+       int flags = METASLAB_ASYNC_ALLOC;
+
+       ASSERT3P(zio->io_bp, !=, NULL);
+       ASSERT3U(zio->io_type, ==, ZIO_TYPE_WRITE);
+       ASSERT3U(zio->io_priority, ==, ZIO_PRIORITY_ASYNC_WRITE);
+       ASSERT3U(zio->io_child_type, ==, ZIO_CHILD_VDEV);
+       ASSERT(vd != NULL);
+       ASSERT3P(vd, ==, vd->vdev_top);
+       ASSERT(!(zio->io_flags & (ZIO_FLAG_IO_REPAIR | ZIO_FLAG_IO_RETRY)));
+       ASSERT(zio->io_flags & ZIO_FLAG_IO_ALLOCATING);
+       ASSERT(!(lio->io_flags & ZIO_FLAG_IO_REWRITE));
+       ASSERT(!(lio->io_orig_flags & ZIO_FLAG_NODATA));
+
+       /*
+        * Parents of gang children can have two flavors -- ones that
+        * allocated the gang header (will have ZIO_FLAG_IO_REWRITE set)
+        * and ones that allocated the constituent blocks. The allocation
+        * throttle needs to know the allocating parent zio so we must find
+        * it here.
+        */
+       if (pio->io_child_type == ZIO_CHILD_GANG) {
+               /*
+                * If our parent is a rewrite gang child then our grandparent
+                * would have been the one that performed the allocation.
+                */
+               if (pio->io_flags & ZIO_FLAG_IO_REWRITE)
+                       pio = zio_unique_parent(pio);
+               flags |= METASLAB_GANG_CHILD;
+       }
+
+       ASSERT(IO_IS_ALLOCATING(pio));
+       ASSERT3P(zio, !=, zio->io_logical);
+       ASSERT(zio->io_logical != NULL);
+       ASSERT(!(zio->io_flags & ZIO_FLAG_IO_REPAIR));
+       ASSERT0(zio->io_flags & ZIO_FLAG_NOPWRITE);
+
+       mutex_enter(&pio->io_lock);
+       metaslab_group_alloc_decrement(zio->io_spa, vd->vdev_id, pio, flags);
+       mutex_exit(&pio->io_lock);
+
+       metaslab_class_throttle_unreserve(spa_normal_class(zio->io_spa),
+           1, pio);
+
+       /*
+        * Call into the pipeline to see if there is more work that
+        * needs to be done. If there is work to be done it will be
+        * dispatched to another taskq thread.
+        */
+       zio_allocate_dispatch(zio->io_spa);
+}
+
 static int
 zio_done(zio_t *zio)
 {
+       /*
+        * Always attempt to keep stack usage minimal here since
+        * we can be called recurisvely up to 19 levels deep.
+        */
        zio_t *pio, *pio_next;
        int c, w;
+       zio_link_t *zl = NULL;
 
        /*
         * If our children haven't all completed,
@@ -3321,6 +3630,33 @@ zio_done(zio_t *zio)
            zio_wait_for_children(zio, ZIO_CHILD_LOGICAL, ZIO_WAIT_DONE))
                return (ZIO_PIPELINE_STOP);
 
+       /*
+        * If the allocation throttle is enabled, then update the accounting.
+        * We only track child I/Os that are part of an allocating async
+        * write. We must do this since the allocation is performed
+        * by the logical I/O but the actual write is done by child I/Os.
+        */
+       if (zio->io_flags & ZIO_FLAG_IO_ALLOCATING &&
+           zio->io_child_type == ZIO_CHILD_VDEV) {
+               ASSERT(spa_normal_class(
+                   zio->io_spa)->mc_alloc_throttle_enabled);
+               zio_dva_throttle_done(zio);
+       }
+
+       /*
+        * If the allocation throttle is enabled, verify that
+        * we have decremented the refcounts for every I/O that was throttled.
+        */
+       if (zio->io_flags & ZIO_FLAG_IO_ALLOCATING) {
+               ASSERT(zio->io_type == ZIO_TYPE_WRITE);
+               ASSERT(zio->io_priority == ZIO_PRIORITY_ASYNC_WRITE);
+               ASSERT(zio->io_bp != NULL);
+               metaslab_group_alloc_verify(zio->io_spa, zio->io_bp, zio);
+               VERIFY(refcount_not_held(
+                   &(spa_normal_class(zio->io_spa)->mc_alloc_slots), zio));
+       }
+
+
        for (c = 0; c < ZIO_CHILD_TYPES; c++)
                for (w = 0; w < ZIO_WAIT_TYPES; w++)
                        ASSERT(zio->io_children[c][w] == 0);
@@ -3506,13 +3842,15 @@ zio_done(zio_t *zio)
                 * trouble (e.g. suspended). This allows "The Godfather"
                 * I/O to return status without blocking.
                 */
-               for (pio = zio_walk_parents(zio); pio != NULL; pio = pio_next) {
-                       zio_link_t *zl = zio->io_walk_link;
-                       pio_next = zio_walk_parents(zio);
+               zl = NULL;
+               for (pio = zio_walk_parents(zio, &zl); pio != NULL;
+                   pio = pio_next) {
+                       zio_link_t *remove_zl = zl;
+                       pio_next = zio_walk_parents(zio, &zl);
 
                        if ((pio->io_flags & ZIO_FLAG_GODFATHER) &&
                            (zio->io_reexecute & ZIO_REEXECUTE_SUSPEND)) {
-                               zio_remove_child(pio, zio, zl);
+                               zio_remove_child(pio, zio, remove_zl);
                                zio_notify_parent(pio, zio, ZIO_WAIT_DONE);
                        }
                }
@@ -3579,10 +3917,11 @@ zio_done(zio_t *zio)
        zio->io_state[ZIO_WAIT_DONE] = 1;
        mutex_exit(&zio->io_lock);
 
-       for (pio = zio_walk_parents(zio); pio != NULL; pio = pio_next) {
-               zio_link_t *zl = zio->io_walk_link;
-               pio_next = zio_walk_parents(zio);
-               zio_remove_child(pio, zio, zl);
+       zl = NULL;
+       for (pio = zio_walk_parents(zio, &zl); pio != NULL; pio = pio_next) {
+               zio_link_t *remove_zl = zl;
+               pio_next = zio_walk_parents(zio, &zl);
+               zio_remove_child(pio, zio, remove_zl);
                zio_notify_parent(pio, zio, ZIO_WAIT_DONE);
        }
 
@@ -3606,9 +3945,10 @@ zio_done(zio_t *zio)
 static zio_pipe_stage_t *zio_pipeline[] = {
        NULL,
        zio_read_bp_init,
+       zio_write_bp_init,
        zio_free_bp_init,
        zio_issue_async,
-       zio_write_bp_init,
+       zio_write_compress,
        zio_checksum_generate,
        zio_nop_write,
        zio_ddt_read_start,
@@ -3617,6 +3957,7 @@ static zio_pipe_stage_t *zio_pipeline[] = {
        zio_ddt_free,
        zio_gang_assemble,
        zio_gang_issue,
+       zio_dva_throttle,
        zio_dva_allocate,
        zio_dva_free,
        zio_dva_claim,
@@ -3778,4 +4119,8 @@ MODULE_PARM_DESC(zfs_sync_pass_dont_compress,
 module_param(zfs_sync_pass_rewrite, int, 0644);
 MODULE_PARM_DESC(zfs_sync_pass_rewrite,
        "Rewrite new bps starting in this pass");
+
+module_param(zio_dva_throttle_enabled, int, 0644);
+MODULE_PARM_DESC(zio_dva_throttle_enabled,
+       "Throttle block allocations in the ZIO pipeline");
 #endif