]> granicus.if.org Git - zfs/blob - module/zfs/zio.c
OpenZFS 8026 - retire zfs_throttle_delay and zfs_throttle_resolution
[zfs] / module / zfs / zio.c
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23  * Copyright (c) 2011, 2017 by Delphix. All rights reserved.
24  * Copyright (c) 2011 Nexenta Systems, Inc. All rights reserved.
25  */
26
27 #include <sys/sysmacros.h>
28 #include <sys/zfs_context.h>
29 #include <sys/fm/fs/zfs.h>
30 #include <sys/spa.h>
31 #include <sys/txg.h>
32 #include <sys/spa_impl.h>
33 #include <sys/vdev_impl.h>
34 #include <sys/zio_impl.h>
35 #include <sys/zio_compress.h>
36 #include <sys/zio_checksum.h>
37 #include <sys/dmu_objset.h>
38 #include <sys/arc.h>
39 #include <sys/ddt.h>
40 #include <sys/blkptr.h>
41 #include <sys/zfeature.h>
42 #include <sys/metaslab_impl.h>
43 #include <sys/time.h>
44 #include <sys/trace_zio.h>
45 #include <sys/abd.h>
46
47 /*
48  * ==========================================================================
49  * I/O type descriptions
50  * ==========================================================================
51  */
52 const char *zio_type_name[ZIO_TYPES] = {
53         /*
54          * Note: Linux kernel thread name length is limited
55          * so these names will differ from upstream open zfs.
56          */
57         "z_null", "z_rd", "z_wr", "z_fr", "z_cl", "z_ioctl"
58 };
59
60 int zio_dva_throttle_enabled = B_TRUE;
61
62 /*
63  * ==========================================================================
64  * I/O kmem caches
65  * ==========================================================================
66  */
67 kmem_cache_t *zio_cache;
68 kmem_cache_t *zio_link_cache;
69 kmem_cache_t *zio_buf_cache[SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT];
70 kmem_cache_t *zio_data_buf_cache[SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT];
71 #if defined(ZFS_DEBUG) && !defined(_KERNEL)
72 uint64_t zio_buf_cache_allocs[SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT];
73 uint64_t zio_buf_cache_frees[SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT];
74 #endif
75
76 int zio_delay_max = ZIO_DELAY_MAX;
77
78 #define ZIO_PIPELINE_CONTINUE           0x100
79 #define ZIO_PIPELINE_STOP               0x101
80
81 #define BP_SPANB(indblkshift, level) \
82         (((uint64_t)1) << ((level) * ((indblkshift) - SPA_BLKPTRSHIFT)))
83 #define COMPARE_META_LEVEL      0x80000000ul
84 /*
85  * The following actions directly effect the spa's sync-to-convergence logic.
86  * The values below define the sync pass when we start performing the action.
87  * Care should be taken when changing these values as they directly impact
88  * spa_sync() performance. Tuning these values may introduce subtle performance
89  * pathologies and should only be done in the context of performance analysis.
90  * These tunables will eventually be removed and replaced with #defines once
91  * enough analysis has been done to determine optimal values.
92  *
93  * The 'zfs_sync_pass_deferred_free' pass must be greater than 1 to ensure that
94  * regular blocks are not deferred.
95  */
96 int zfs_sync_pass_deferred_free = 2; /* defer frees starting in this pass */
97 int zfs_sync_pass_dont_compress = 5; /* don't compress starting in this pass */
98 int zfs_sync_pass_rewrite = 2; /* rewrite new bps starting in this pass */
99
100 /*
101  * An allocating zio is one that either currently has the DVA allocate
102  * stage set or will have it later in its lifetime.
103  */
104 #define IO_IS_ALLOCATING(zio) ((zio)->io_orig_pipeline & ZIO_STAGE_DVA_ALLOCATE)
105
106 int zio_requeue_io_start_cut_in_line = 1;
107
108 #ifdef ZFS_DEBUG
109 int zio_buf_debug_limit = 16384;
110 #else
111 int zio_buf_debug_limit = 0;
112 #endif
113
114 static inline void __zio_execute(zio_t *zio);
115
116 static void zio_taskq_dispatch(zio_t *, zio_taskq_type_t, boolean_t);
117
118 void
119 zio_init(void)
120 {
121         size_t c;
122         vmem_t *data_alloc_arena = NULL;
123
124         zio_cache = kmem_cache_create("zio_cache",
125             sizeof (zio_t), 0, NULL, NULL, NULL, NULL, NULL, 0);
126         zio_link_cache = kmem_cache_create("zio_link_cache",
127             sizeof (zio_link_t), 0, NULL, NULL, NULL, NULL, NULL, 0);
128
129         /*
130          * For small buffers, we want a cache for each multiple of
131          * SPA_MINBLOCKSIZE.  For larger buffers, we want a cache
132          * for each quarter-power of 2.
133          */
134         for (c = 0; c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT; c++) {
135                 size_t size = (c + 1) << SPA_MINBLOCKSHIFT;
136                 size_t p2 = size;
137                 size_t align = 0;
138                 size_t cflags = (size > zio_buf_debug_limit) ? KMC_NODEBUG : 0;
139
140 #if defined(_ILP32) && defined(_KERNEL)
141                 /*
142                  * Cache size limited to 1M on 32-bit platforms until ARC
143                  * buffers no longer require virtual address space.
144                  */
145                 if (size > zfs_max_recordsize)
146                         break;
147 #endif
148
149                 while (!ISP2(p2))
150                         p2 &= p2 - 1;
151
152 #ifndef _KERNEL
153                 /*
154                  * If we are using watchpoints, put each buffer on its own page,
155                  * to eliminate the performance overhead of trapping to the
156                  * kernel when modifying a non-watched buffer that shares the
157                  * page with a watched buffer.
158                  */
159                 if (arc_watch && !IS_P2ALIGNED(size, PAGESIZE))
160                         continue;
161                 /*
162                  * Here's the problem - on 4K native devices in userland on
163                  * Linux using O_DIRECT, buffers must be 4K aligned or I/O
164                  * will fail with EINVAL, causing zdb (and others) to coredump.
165                  * Since userland probably doesn't need optimized buffer caches,
166                  * we just force 4K alignment on everything.
167                  */
168                 align = 8 * SPA_MINBLOCKSIZE;
169 #else
170                 if (size <= 4 * SPA_MINBLOCKSIZE) {
171                         align = SPA_MINBLOCKSIZE;
172                 } else if (IS_P2ALIGNED(size, p2 >> 2)) {
173                         align = MIN(p2 >> 2, PAGESIZE);
174                 }
175 #endif
176
177                 if (align != 0) {
178                         char name[36];
179                         (void) sprintf(name, "zio_buf_%lu", (ulong_t)size);
180                         zio_buf_cache[c] = kmem_cache_create(name, size,
181                             align, NULL, NULL, NULL, NULL, NULL, cflags);
182
183                         (void) sprintf(name, "zio_data_buf_%lu", (ulong_t)size);
184                         zio_data_buf_cache[c] = kmem_cache_create(name, size,
185                             align, NULL, NULL, NULL, NULL,
186                             data_alloc_arena, cflags);
187                 }
188         }
189
190         while (--c != 0) {
191                 ASSERT(zio_buf_cache[c] != NULL);
192                 if (zio_buf_cache[c - 1] == NULL)
193                         zio_buf_cache[c - 1] = zio_buf_cache[c];
194
195                 ASSERT(zio_data_buf_cache[c] != NULL);
196                 if (zio_data_buf_cache[c - 1] == NULL)
197                         zio_data_buf_cache[c - 1] = zio_data_buf_cache[c];
198         }
199
200         zio_inject_init();
201
202         lz4_init();
203 }
204
205 void
206 zio_fini(void)
207 {
208         size_t c;
209         kmem_cache_t *last_cache = NULL;
210         kmem_cache_t *last_data_cache = NULL;
211
212         for (c = 0; c < SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT; c++) {
213 #ifdef _ILP32
214                 /*
215                  * Cache size limited to 1M on 32-bit platforms until ARC
216                  * buffers no longer require virtual address space.
217                  */
218                 if (((c + 1) << SPA_MINBLOCKSHIFT) > zfs_max_recordsize)
219                         break;
220 #endif
221 #if defined(ZFS_DEBUG) && !defined(_KERNEL)
222                 if (zio_buf_cache_allocs[c] != zio_buf_cache_frees[c])
223                         (void) printf("zio_fini: [%d] %llu != %llu\n",
224                             (int)((c + 1) << SPA_MINBLOCKSHIFT),
225                             (long long unsigned)zio_buf_cache_allocs[c],
226                             (long long unsigned)zio_buf_cache_frees[c]);
227 #endif
228                 if (zio_buf_cache[c] != last_cache) {
229                         last_cache = zio_buf_cache[c];
230                         kmem_cache_destroy(zio_buf_cache[c]);
231                 }
232                 zio_buf_cache[c] = NULL;
233
234                 if (zio_data_buf_cache[c] != last_data_cache) {
235                         last_data_cache = zio_data_buf_cache[c];
236                         kmem_cache_destroy(zio_data_buf_cache[c]);
237                 }
238                 zio_data_buf_cache[c] = NULL;
239         }
240
241         kmem_cache_destroy(zio_link_cache);
242         kmem_cache_destroy(zio_cache);
243
244         zio_inject_fini();
245
246         lz4_fini();
247 }
248
249 /*
250  * ==========================================================================
251  * Allocate and free I/O buffers
252  * ==========================================================================
253  */
254
255 /*
256  * Use zio_buf_alloc to allocate ZFS metadata.  This data will appear in a
257  * crashdump if the kernel panics, so use it judiciously.  Obviously, it's
258  * useful to inspect ZFS metadata, but if possible, we should avoid keeping
259  * excess / transient data in-core during a crashdump.
260  */
261 void *
262 zio_buf_alloc(size_t size)
263 {
264         size_t c = (size - 1) >> SPA_MINBLOCKSHIFT;
265
266         VERIFY3U(c, <, SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
267 #if defined(ZFS_DEBUG) && !defined(_KERNEL)
268         atomic_add_64(&zio_buf_cache_allocs[c], 1);
269 #endif
270
271         return (kmem_cache_alloc(zio_buf_cache[c], KM_PUSHPAGE));
272 }
273
274 /*
275  * Use zio_data_buf_alloc to allocate data.  The data will not appear in a
276  * crashdump if the kernel panics.  This exists so that we will limit the amount
277  * of ZFS data that shows up in a kernel crashdump.  (Thus reducing the amount
278  * of kernel heap dumped to disk when the kernel panics)
279  */
280 void *
281 zio_data_buf_alloc(size_t size)
282 {
283         size_t c = (size - 1) >> SPA_MINBLOCKSHIFT;
284
285         VERIFY3U(c, <, SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
286
287         return (kmem_cache_alloc(zio_data_buf_cache[c], KM_PUSHPAGE));
288 }
289
290 void
291 zio_buf_free(void *buf, size_t size)
292 {
293         size_t c = (size - 1) >> SPA_MINBLOCKSHIFT;
294
295         VERIFY3U(c, <, SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
296 #if defined(ZFS_DEBUG) && !defined(_KERNEL)
297         atomic_add_64(&zio_buf_cache_frees[c], 1);
298 #endif
299
300         kmem_cache_free(zio_buf_cache[c], buf);
301 }
302
303 void
304 zio_data_buf_free(void *buf, size_t size)
305 {
306         size_t c = (size - 1) >> SPA_MINBLOCKSHIFT;
307
308         VERIFY3U(c, <, SPA_MAXBLOCKSIZE >> SPA_MINBLOCKSHIFT);
309
310         kmem_cache_free(zio_data_buf_cache[c], buf);
311 }
312
313 static void
314 zio_abd_free(void *abd, size_t size)
315 {
316         abd_free((abd_t *)abd);
317 }
318
319 /*
320  * ==========================================================================
321  * Push and pop I/O transform buffers
322  * ==========================================================================
323  */
324 void
325 zio_push_transform(zio_t *zio, abd_t *data, uint64_t size, uint64_t bufsize,
326     zio_transform_func_t *transform)
327 {
328         zio_transform_t *zt = kmem_alloc(sizeof (zio_transform_t), KM_SLEEP);
329
330         /*
331          * Ensure that anyone expecting this zio to contain a linear ABD isn't
332          * going to get a nasty surprise when they try to access the data.
333          */
334         IMPLY(abd_is_linear(zio->io_abd), abd_is_linear(data));
335
336         zt->zt_orig_abd = zio->io_abd;
337         zt->zt_orig_size = zio->io_size;
338         zt->zt_bufsize = bufsize;
339         zt->zt_transform = transform;
340
341         zt->zt_next = zio->io_transform_stack;
342         zio->io_transform_stack = zt;
343
344         zio->io_abd = data;
345         zio->io_size = size;
346 }
347
348 void
349 zio_pop_transforms(zio_t *zio)
350 {
351         zio_transform_t *zt;
352
353         while ((zt = zio->io_transform_stack) != NULL) {
354                 if (zt->zt_transform != NULL)
355                         zt->zt_transform(zio,
356                             zt->zt_orig_abd, zt->zt_orig_size);
357
358                 if (zt->zt_bufsize != 0)
359                         abd_free(zio->io_abd);
360
361                 zio->io_abd = zt->zt_orig_abd;
362                 zio->io_size = zt->zt_orig_size;
363                 zio->io_transform_stack = zt->zt_next;
364
365                 kmem_free(zt, sizeof (zio_transform_t));
366         }
367 }
368
369 /*
370  * ==========================================================================
371  * I/O transform callbacks for subblocks and decompression
372  * ==========================================================================
373  */
374 static void
375 zio_subblock(zio_t *zio, abd_t *data, uint64_t size)
376 {
377         ASSERT(zio->io_size > size);
378
379         if (zio->io_type == ZIO_TYPE_READ)
380                 abd_copy(data, zio->io_abd, size);
381 }
382
383 static void
384 zio_decompress(zio_t *zio, abd_t *data, uint64_t size)
385 {
386         if (zio->io_error == 0) {
387                 void *tmp = abd_borrow_buf(data, size);
388                 int ret = zio_decompress_data(BP_GET_COMPRESS(zio->io_bp),
389                     zio->io_abd, tmp, zio->io_size, size);
390                 abd_return_buf_copy(data, tmp, size);
391
392                 if (ret != 0)
393                         zio->io_error = SET_ERROR(EIO);
394         }
395 }
396
397 /*
398  * ==========================================================================
399  * I/O parent/child relationships and pipeline interlocks
400  * ==========================================================================
401  */
402 zio_t *
403 zio_walk_parents(zio_t *cio, zio_link_t **zl)
404 {
405         list_t *pl = &cio->io_parent_list;
406
407         *zl = (*zl == NULL) ? list_head(pl) : list_next(pl, *zl);
408         if (*zl == NULL)
409                 return (NULL);
410
411         ASSERT((*zl)->zl_child == cio);
412         return ((*zl)->zl_parent);
413 }
414
415 zio_t *
416 zio_walk_children(zio_t *pio, zio_link_t **zl)
417 {
418         list_t *cl = &pio->io_child_list;
419
420         *zl = (*zl == NULL) ? list_head(cl) : list_next(cl, *zl);
421         if (*zl == NULL)
422                 return (NULL);
423
424         ASSERT((*zl)->zl_parent == pio);
425         return ((*zl)->zl_child);
426 }
427
428 zio_t *
429 zio_unique_parent(zio_t *cio)
430 {
431         zio_link_t *zl = NULL;
432         zio_t *pio = zio_walk_parents(cio, &zl);
433
434         VERIFY3P(zio_walk_parents(cio, &zl), ==, NULL);
435         return (pio);
436 }
437
438 void
439 zio_add_child(zio_t *pio, zio_t *cio)
440 {
441         zio_link_t *zl = kmem_cache_alloc(zio_link_cache, KM_SLEEP);
442         int w;
443
444         /*
445          * Logical I/Os can have logical, gang, or vdev children.
446          * Gang I/Os can have gang or vdev children.
447          * Vdev I/Os can only have vdev children.
448          * The following ASSERT captures all of these constraints.
449          */
450         ASSERT(cio->io_child_type <= pio->io_child_type);
451
452         zl->zl_parent = pio;
453         zl->zl_child = cio;
454
455         mutex_enter(&cio->io_lock);
456         mutex_enter(&pio->io_lock);
457
458         ASSERT(pio->io_state[ZIO_WAIT_DONE] == 0);
459
460         for (w = 0; w < ZIO_WAIT_TYPES; w++)
461                 pio->io_children[cio->io_child_type][w] += !cio->io_state[w];
462
463         list_insert_head(&pio->io_child_list, zl);
464         list_insert_head(&cio->io_parent_list, zl);
465
466         pio->io_child_count++;
467         cio->io_parent_count++;
468
469         mutex_exit(&pio->io_lock);
470         mutex_exit(&cio->io_lock);
471 }
472
473 static void
474 zio_remove_child(zio_t *pio, zio_t *cio, zio_link_t *zl)
475 {
476         ASSERT(zl->zl_parent == pio);
477         ASSERT(zl->zl_child == cio);
478
479         mutex_enter(&cio->io_lock);
480         mutex_enter(&pio->io_lock);
481
482         list_remove(&pio->io_child_list, zl);
483         list_remove(&cio->io_parent_list, zl);
484
485         pio->io_child_count--;
486         cio->io_parent_count--;
487
488         mutex_exit(&pio->io_lock);
489         mutex_exit(&cio->io_lock);
490         kmem_cache_free(zio_link_cache, zl);
491 }
492
493 static boolean_t
494 zio_wait_for_children(zio_t *zio, enum zio_child child, enum zio_wait_type wait)
495 {
496         uint64_t *countp = &zio->io_children[child][wait];
497         boolean_t waiting = B_FALSE;
498
499         mutex_enter(&zio->io_lock);
500         ASSERT(zio->io_stall == NULL);
501         if (*countp != 0) {
502                 zio->io_stage >>= 1;
503                 ASSERT3U(zio->io_stage, !=, ZIO_STAGE_OPEN);
504                 zio->io_stall = countp;
505                 waiting = B_TRUE;
506         }
507         mutex_exit(&zio->io_lock);
508
509         return (waiting);
510 }
511
512 __attribute__((always_inline))
513 static inline void
514 zio_notify_parent(zio_t *pio, zio_t *zio, enum zio_wait_type wait)
515 {
516         uint64_t *countp = &pio->io_children[zio->io_child_type][wait];
517         int *errorp = &pio->io_child_error[zio->io_child_type];
518
519         mutex_enter(&pio->io_lock);
520         if (zio->io_error && !(zio->io_flags & ZIO_FLAG_DONT_PROPAGATE))
521                 *errorp = zio_worst_error(*errorp, zio->io_error);
522         pio->io_reexecute |= zio->io_reexecute;
523         ASSERT3U(*countp, >, 0);
524
525         (*countp)--;
526
527         if (*countp == 0 && pio->io_stall == countp) {
528                 zio_taskq_type_t type =
529                     pio->io_stage < ZIO_STAGE_VDEV_IO_START ? ZIO_TASKQ_ISSUE :
530                     ZIO_TASKQ_INTERRUPT;
531                 pio->io_stall = NULL;
532                 mutex_exit(&pio->io_lock);
533                 /*
534                  * Dispatch the parent zio in its own taskq so that
535                  * the child can continue to make progress. This also
536                  * prevents overflowing the stack when we have deeply nested
537                  * parent-child relationships.
538                  */
539                 zio_taskq_dispatch(pio, type, B_FALSE);
540         } else {
541                 mutex_exit(&pio->io_lock);
542         }
543 }
544
545 static void
546 zio_inherit_child_errors(zio_t *zio, enum zio_child c)
547 {
548         if (zio->io_child_error[c] != 0 && zio->io_error == 0)
549                 zio->io_error = zio->io_child_error[c];
550 }
551
552 int
553 zio_bookmark_compare(const void *x1, const void *x2)
554 {
555         const zio_t *z1 = x1;
556         const zio_t *z2 = x2;
557
558         if (z1->io_bookmark.zb_objset < z2->io_bookmark.zb_objset)
559                 return (-1);
560         if (z1->io_bookmark.zb_objset > z2->io_bookmark.zb_objset)
561                 return (1);
562
563         if (z1->io_bookmark.zb_object < z2->io_bookmark.zb_object)
564                 return (-1);
565         if (z1->io_bookmark.zb_object > z2->io_bookmark.zb_object)
566                 return (1);
567
568         if (z1->io_bookmark.zb_level < z2->io_bookmark.zb_level)
569                 return (-1);
570         if (z1->io_bookmark.zb_level > z2->io_bookmark.zb_level)
571                 return (1);
572
573         if (z1->io_bookmark.zb_blkid < z2->io_bookmark.zb_blkid)
574                 return (-1);
575         if (z1->io_bookmark.zb_blkid > z2->io_bookmark.zb_blkid)
576                 return (1);
577
578         if (z1 < z2)
579                 return (-1);
580         if (z1 > z2)
581                 return (1);
582
583         return (0);
584 }
585
586 /*
587  * ==========================================================================
588  * Create the various types of I/O (read, write, free, etc)
589  * ==========================================================================
590  */
591 static zio_t *
592 zio_create(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
593     abd_t *data, uint64_t lsize, uint64_t psize, zio_done_func_t *done,
594     void *private, zio_type_t type, zio_priority_t priority,
595     enum zio_flag flags, vdev_t *vd, uint64_t offset,
596     const zbookmark_phys_t *zb, enum zio_stage stage,
597     enum zio_stage pipeline)
598 {
599         zio_t *zio;
600
601         ASSERT3U(psize, <=, SPA_MAXBLOCKSIZE);
602         ASSERT(P2PHASE(psize, SPA_MINBLOCKSIZE) == 0);
603         ASSERT(P2PHASE(offset, SPA_MINBLOCKSIZE) == 0);
604
605         ASSERT(!vd || spa_config_held(spa, SCL_STATE_ALL, RW_READER));
606         ASSERT(!bp || !(flags & ZIO_FLAG_CONFIG_WRITER));
607         ASSERT(vd || stage == ZIO_STAGE_OPEN);
608
609         IMPLY(lsize != psize, (flags & ZIO_FLAG_RAW) != 0);
610
611         zio = kmem_cache_alloc(zio_cache, KM_SLEEP);
612         bzero(zio, sizeof (zio_t));
613
614         mutex_init(&zio->io_lock, NULL, MUTEX_NOLOCKDEP, NULL);
615         cv_init(&zio->io_cv, NULL, CV_DEFAULT, NULL);
616
617         list_create(&zio->io_parent_list, sizeof (zio_link_t),
618             offsetof(zio_link_t, zl_parent_node));
619         list_create(&zio->io_child_list, sizeof (zio_link_t),
620             offsetof(zio_link_t, zl_child_node));
621         metaslab_trace_init(&zio->io_alloc_list);
622
623         if (vd != NULL)
624                 zio->io_child_type = ZIO_CHILD_VDEV;
625         else if (flags & ZIO_FLAG_GANG_CHILD)
626                 zio->io_child_type = ZIO_CHILD_GANG;
627         else if (flags & ZIO_FLAG_DDT_CHILD)
628                 zio->io_child_type = ZIO_CHILD_DDT;
629         else
630                 zio->io_child_type = ZIO_CHILD_LOGICAL;
631
632         if (bp != NULL) {
633                 zio->io_bp = (blkptr_t *)bp;
634                 zio->io_bp_copy = *bp;
635                 zio->io_bp_orig = *bp;
636                 if (type != ZIO_TYPE_WRITE ||
637                     zio->io_child_type == ZIO_CHILD_DDT)
638                         zio->io_bp = &zio->io_bp_copy;  /* so caller can free */
639                 if (zio->io_child_type == ZIO_CHILD_LOGICAL)
640                         zio->io_logical = zio;
641                 if (zio->io_child_type > ZIO_CHILD_GANG && BP_IS_GANG(bp))
642                         pipeline |= ZIO_GANG_STAGES;
643         }
644
645         zio->io_spa = spa;
646         zio->io_txg = txg;
647         zio->io_done = done;
648         zio->io_private = private;
649         zio->io_type = type;
650         zio->io_priority = priority;
651         zio->io_vd = vd;
652         zio->io_offset = offset;
653         zio->io_orig_abd = zio->io_abd = data;
654         zio->io_orig_size = zio->io_size = psize;
655         zio->io_lsize = lsize;
656         zio->io_orig_flags = zio->io_flags = flags;
657         zio->io_orig_stage = zio->io_stage = stage;
658         zio->io_orig_pipeline = zio->io_pipeline = pipeline;
659         zio->io_pipeline_trace = ZIO_STAGE_OPEN;
660
661         zio->io_state[ZIO_WAIT_READY] = (stage >= ZIO_STAGE_READY);
662         zio->io_state[ZIO_WAIT_DONE] = (stage >= ZIO_STAGE_DONE);
663
664         if (zb != NULL)
665                 zio->io_bookmark = *zb;
666
667         if (pio != NULL) {
668                 if (zio->io_logical == NULL)
669                         zio->io_logical = pio->io_logical;
670                 if (zio->io_child_type == ZIO_CHILD_GANG)
671                         zio->io_gang_leader = pio->io_gang_leader;
672                 zio_add_child(pio, zio);
673         }
674
675         taskq_init_ent(&zio->io_tqent);
676
677         return (zio);
678 }
679
680 static void
681 zio_destroy(zio_t *zio)
682 {
683         metaslab_trace_fini(&zio->io_alloc_list);
684         list_destroy(&zio->io_parent_list);
685         list_destroy(&zio->io_child_list);
686         mutex_destroy(&zio->io_lock);
687         cv_destroy(&zio->io_cv);
688         kmem_cache_free(zio_cache, zio);
689 }
690
691 zio_t *
692 zio_null(zio_t *pio, spa_t *spa, vdev_t *vd, zio_done_func_t *done,
693     void *private, enum zio_flag flags)
694 {
695         zio_t *zio;
696
697         zio = zio_create(pio, spa, 0, NULL, NULL, 0, 0, done, private,
698             ZIO_TYPE_NULL, ZIO_PRIORITY_NOW, flags, vd, 0, NULL,
699             ZIO_STAGE_OPEN, ZIO_INTERLOCK_PIPELINE);
700
701         return (zio);
702 }
703
704 zio_t *
705 zio_root(spa_t *spa, zio_done_func_t *done, void *private, enum zio_flag flags)
706 {
707         return (zio_null(NULL, spa, NULL, done, private, flags));
708 }
709
710 void
711 zfs_blkptr_verify(spa_t *spa, const blkptr_t *bp)
712 {
713         int i;
714
715         if (!DMU_OT_IS_VALID(BP_GET_TYPE(bp))) {
716                 zfs_panic_recover("blkptr at %p has invalid TYPE %llu",
717                     bp, (longlong_t)BP_GET_TYPE(bp));
718         }
719         if (BP_GET_CHECKSUM(bp) >= ZIO_CHECKSUM_FUNCTIONS ||
720             BP_GET_CHECKSUM(bp) <= ZIO_CHECKSUM_ON) {
721                 zfs_panic_recover("blkptr at %p has invalid CHECKSUM %llu",
722                     bp, (longlong_t)BP_GET_CHECKSUM(bp));
723         }
724         if (BP_GET_COMPRESS(bp) >= ZIO_COMPRESS_FUNCTIONS ||
725             BP_GET_COMPRESS(bp) <= ZIO_COMPRESS_ON) {
726                 zfs_panic_recover("blkptr at %p has invalid COMPRESS %llu",
727                     bp, (longlong_t)BP_GET_COMPRESS(bp));
728         }
729         if (BP_GET_LSIZE(bp) > SPA_MAXBLOCKSIZE) {
730                 zfs_panic_recover("blkptr at %p has invalid LSIZE %llu",
731                     bp, (longlong_t)BP_GET_LSIZE(bp));
732         }
733         if (BP_GET_PSIZE(bp) > SPA_MAXBLOCKSIZE) {
734                 zfs_panic_recover("blkptr at %p has invalid PSIZE %llu",
735                     bp, (longlong_t)BP_GET_PSIZE(bp));
736         }
737
738         if (BP_IS_EMBEDDED(bp)) {
739                 if (BPE_GET_ETYPE(bp) > NUM_BP_EMBEDDED_TYPES) {
740                         zfs_panic_recover("blkptr at %p has invalid ETYPE %llu",
741                             bp, (longlong_t)BPE_GET_ETYPE(bp));
742                 }
743         }
744
745         /*
746          * Pool-specific checks.
747          *
748          * Note: it would be nice to verify that the blk_birth and
749          * BP_PHYSICAL_BIRTH() are not too large.  However, spa_freeze()
750          * allows the birth time of log blocks (and dmu_sync()-ed blocks
751          * that are in the log) to be arbitrarily large.
752          */
753         for (i = 0; i < BP_GET_NDVAS(bp); i++) {
754                 uint64_t vdevid = DVA_GET_VDEV(&bp->blk_dva[i]);
755                 vdev_t *vd;
756                 uint64_t offset, asize;
757                 if (vdevid >= spa->spa_root_vdev->vdev_children) {
758                         zfs_panic_recover("blkptr at %p DVA %u has invalid "
759                             "VDEV %llu",
760                             bp, i, (longlong_t)vdevid);
761                         continue;
762                 }
763                 vd = spa->spa_root_vdev->vdev_child[vdevid];
764                 if (vd == NULL) {
765                         zfs_panic_recover("blkptr at %p DVA %u has invalid "
766                             "VDEV %llu",
767                             bp, i, (longlong_t)vdevid);
768                         continue;
769                 }
770                 if (vd->vdev_ops == &vdev_hole_ops) {
771                         zfs_panic_recover("blkptr at %p DVA %u has hole "
772                             "VDEV %llu",
773                             bp, i, (longlong_t)vdevid);
774                         continue;
775                 }
776                 if (vd->vdev_ops == &vdev_missing_ops) {
777                         /*
778                          * "missing" vdevs are valid during import, but we
779                          * don't have their detailed info (e.g. asize), so
780                          * we can't perform any more checks on them.
781                          */
782                         continue;
783                 }
784                 offset = DVA_GET_OFFSET(&bp->blk_dva[i]);
785                 asize = DVA_GET_ASIZE(&bp->blk_dva[i]);
786                 if (BP_IS_GANG(bp))
787                         asize = vdev_psize_to_asize(vd, SPA_GANGBLOCKSIZE);
788                 if (offset + asize > vd->vdev_asize) {
789                         zfs_panic_recover("blkptr at %p DVA %u has invalid "
790                             "OFFSET %llu",
791                             bp, i, (longlong_t)offset);
792                 }
793         }
794 }
795
796 zio_t *
797 zio_read(zio_t *pio, spa_t *spa, const blkptr_t *bp,
798     abd_t *data, uint64_t size, zio_done_func_t *done, void *private,
799     zio_priority_t priority, enum zio_flag flags, const zbookmark_phys_t *zb)
800 {
801         zio_t *zio;
802
803         zfs_blkptr_verify(spa, bp);
804
805         zio = zio_create(pio, spa, BP_PHYSICAL_BIRTH(bp), bp,
806             data, size, size, done, private,
807             ZIO_TYPE_READ, priority, flags, NULL, 0, zb,
808             ZIO_STAGE_OPEN, (flags & ZIO_FLAG_DDT_CHILD) ?
809             ZIO_DDT_CHILD_READ_PIPELINE : ZIO_READ_PIPELINE);
810
811         return (zio);
812 }
813
814 zio_t *
815 zio_write(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp,
816     abd_t *data, uint64_t lsize, uint64_t psize, const zio_prop_t *zp,
817     zio_done_func_t *ready, zio_done_func_t *children_ready,
818     zio_done_func_t *physdone, zio_done_func_t *done,
819     void *private, zio_priority_t priority, enum zio_flag flags,
820     const zbookmark_phys_t *zb)
821 {
822         zio_t *zio;
823
824         ASSERT(zp->zp_checksum >= ZIO_CHECKSUM_OFF &&
825             zp->zp_checksum < ZIO_CHECKSUM_FUNCTIONS &&
826             zp->zp_compress >= ZIO_COMPRESS_OFF &&
827             zp->zp_compress < ZIO_COMPRESS_FUNCTIONS &&
828             DMU_OT_IS_VALID(zp->zp_type) &&
829             zp->zp_level < 32 &&
830             zp->zp_copies > 0 &&
831             zp->zp_copies <= spa_max_replication(spa));
832
833         zio = zio_create(pio, spa, txg, bp, data, lsize, psize, done, private,
834             ZIO_TYPE_WRITE, priority, flags, NULL, 0, zb,
835             ZIO_STAGE_OPEN, (flags & ZIO_FLAG_DDT_CHILD) ?
836             ZIO_DDT_CHILD_WRITE_PIPELINE : ZIO_WRITE_PIPELINE);
837
838         zio->io_ready = ready;
839         zio->io_children_ready = children_ready;
840         zio->io_physdone = physdone;
841         zio->io_prop = *zp;
842
843         /*
844          * Data can be NULL if we are going to call zio_write_override() to
845          * provide the already-allocated BP.  But we may need the data to
846          * verify a dedup hit (if requested).  In this case, don't try to
847          * dedup (just take the already-allocated BP verbatim).
848          */
849         if (data == NULL && zio->io_prop.zp_dedup_verify) {
850                 zio->io_prop.zp_dedup = zio->io_prop.zp_dedup_verify = B_FALSE;
851         }
852
853         return (zio);
854 }
855
856 zio_t *
857 zio_rewrite(zio_t *pio, spa_t *spa, uint64_t txg, blkptr_t *bp, abd_t *data,
858     uint64_t size, zio_done_func_t *done, void *private,
859     zio_priority_t priority, enum zio_flag flags, zbookmark_phys_t *zb)
860 {
861         zio_t *zio;
862
863         zio = zio_create(pio, spa, txg, bp, data, size, size, done, private,
864             ZIO_TYPE_WRITE, priority, flags | ZIO_FLAG_IO_REWRITE, NULL, 0, zb,
865             ZIO_STAGE_OPEN, ZIO_REWRITE_PIPELINE);
866
867         return (zio);
868 }
869
870 void
871 zio_write_override(zio_t *zio, blkptr_t *bp, int copies, boolean_t nopwrite)
872 {
873         ASSERT(zio->io_type == ZIO_TYPE_WRITE);
874         ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
875         ASSERT(zio->io_stage == ZIO_STAGE_OPEN);
876         ASSERT(zio->io_txg == spa_syncing_txg(zio->io_spa));
877
878         /*
879          * We must reset the io_prop to match the values that existed
880          * when the bp was first written by dmu_sync() keeping in mind
881          * that nopwrite and dedup are mutually exclusive.
882          */
883         zio->io_prop.zp_dedup = nopwrite ? B_FALSE : zio->io_prop.zp_dedup;
884         zio->io_prop.zp_nopwrite = nopwrite;
885         zio->io_prop.zp_copies = copies;
886         zio->io_bp_override = bp;
887 }
888
889 void
890 zio_free(spa_t *spa, uint64_t txg, const blkptr_t *bp)
891 {
892
893         /*
894          * The check for EMBEDDED is a performance optimization.  We
895          * process the free here (by ignoring it) rather than
896          * putting it on the list and then processing it in zio_free_sync().
897          */
898         if (BP_IS_EMBEDDED(bp))
899                 return;
900         metaslab_check_free(spa, bp);
901
902         /*
903          * Frees that are for the currently-syncing txg, are not going to be
904          * deferred, and which will not need to do a read (i.e. not GANG or
905          * DEDUP), can be processed immediately.  Otherwise, put them on the
906          * in-memory list for later processing.
907          */
908         if (BP_IS_GANG(bp) || BP_GET_DEDUP(bp) ||
909             txg != spa->spa_syncing_txg ||
910             spa_sync_pass(spa) >= zfs_sync_pass_deferred_free) {
911                 bplist_append(&spa->spa_free_bplist[txg & TXG_MASK], bp);
912         } else {
913                 VERIFY0(zio_wait(zio_free_sync(NULL, spa, txg, bp, 0)));
914         }
915 }
916
917 zio_t *
918 zio_free_sync(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
919     enum zio_flag flags)
920 {
921         zio_t *zio;
922         enum zio_stage stage = ZIO_FREE_PIPELINE;
923
924         ASSERT(!BP_IS_HOLE(bp));
925         ASSERT(spa_syncing_txg(spa) == txg);
926         ASSERT(spa_sync_pass(spa) < zfs_sync_pass_deferred_free);
927
928         if (BP_IS_EMBEDDED(bp))
929                 return (zio_null(pio, spa, NULL, NULL, NULL, 0));
930
931         metaslab_check_free(spa, bp);
932         arc_freed(spa, bp);
933
934         /*
935          * GANG and DEDUP blocks can induce a read (for the gang block header,
936          * or the DDT), so issue them asynchronously so that this thread is
937          * not tied up.
938          */
939         if (BP_IS_GANG(bp) || BP_GET_DEDUP(bp))
940                 stage |= ZIO_STAGE_ISSUE_ASYNC;
941
942         zio = zio_create(pio, spa, txg, bp, NULL, BP_GET_PSIZE(bp),
943             BP_GET_PSIZE(bp), NULL, NULL, ZIO_TYPE_FREE, ZIO_PRIORITY_NOW,
944             flags, NULL, 0, NULL, ZIO_STAGE_OPEN, stage);
945
946         return (zio);
947 }
948
949 zio_t *
950 zio_claim(zio_t *pio, spa_t *spa, uint64_t txg, const blkptr_t *bp,
951     zio_done_func_t *done, void *private, enum zio_flag flags)
952 {
953         zio_t *zio;
954
955         dprintf_bp(bp, "claiming in txg %llu", txg);
956
957         if (BP_IS_EMBEDDED(bp))
958                 return (zio_null(pio, spa, NULL, NULL, NULL, 0));
959
960         /*
961          * A claim is an allocation of a specific block.  Claims are needed
962          * to support immediate writes in the intent log.  The issue is that
963          * immediate writes contain committed data, but in a txg that was
964          * *not* committed.  Upon opening the pool after an unclean shutdown,
965          * the intent log claims all blocks that contain immediate write data
966          * so that the SPA knows they're in use.
967          *
968          * All claims *must* be resolved in the first txg -- before the SPA
969          * starts allocating blocks -- so that nothing is allocated twice.
970          * If txg == 0 we just verify that the block is claimable.
971          */
972         ASSERT3U(spa->spa_uberblock.ub_rootbp.blk_birth, <, spa_first_txg(spa));
973         ASSERT(txg == spa_first_txg(spa) || txg == 0);
974         ASSERT(!BP_GET_DEDUP(bp) || !spa_writeable(spa));       /* zdb(1M) */
975
976         zio = zio_create(pio, spa, txg, bp, NULL, BP_GET_PSIZE(bp),
977             BP_GET_PSIZE(bp), done, private, ZIO_TYPE_CLAIM, ZIO_PRIORITY_NOW,
978             flags, NULL, 0, NULL, ZIO_STAGE_OPEN, ZIO_CLAIM_PIPELINE);
979         ASSERT0(zio->io_queued_timestamp);
980
981         return (zio);
982 }
983
984 zio_t *
985 zio_ioctl(zio_t *pio, spa_t *spa, vdev_t *vd, int cmd,
986     zio_done_func_t *done, void *private, enum zio_flag flags)
987 {
988         zio_t *zio;
989         int c;
990
991         if (vd->vdev_children == 0) {
992                 zio = zio_create(pio, spa, 0, NULL, NULL, 0, 0, done, private,
993                     ZIO_TYPE_IOCTL, ZIO_PRIORITY_NOW, flags, vd, 0, NULL,
994                     ZIO_STAGE_OPEN, ZIO_IOCTL_PIPELINE);
995
996                 zio->io_cmd = cmd;
997         } else {
998                 zio = zio_null(pio, spa, NULL, NULL, NULL, flags);
999
1000                 for (c = 0; c < vd->vdev_children; c++)
1001                         zio_nowait(zio_ioctl(zio, spa, vd->vdev_child[c], cmd,
1002                             done, private, flags));
1003         }
1004
1005         return (zio);
1006 }
1007
1008 zio_t *
1009 zio_read_phys(zio_t *pio, vdev_t *vd, uint64_t offset, uint64_t size,
1010     abd_t *data, int checksum, zio_done_func_t *done, void *private,
1011     zio_priority_t priority, enum zio_flag flags, boolean_t labels)
1012 {
1013         zio_t *zio;
1014
1015         ASSERT(vd->vdev_children == 0);
1016         ASSERT(!labels || offset + size <= VDEV_LABEL_START_SIZE ||
1017             offset >= vd->vdev_psize - VDEV_LABEL_END_SIZE);
1018         ASSERT3U(offset + size, <=, vd->vdev_psize);
1019
1020         zio = zio_create(pio, vd->vdev_spa, 0, NULL, data, size, size, done,
1021             private, ZIO_TYPE_READ, priority, flags | ZIO_FLAG_PHYSICAL, vd,
1022             offset, NULL, ZIO_STAGE_OPEN, ZIO_READ_PHYS_PIPELINE);
1023
1024         zio->io_prop.zp_checksum = checksum;
1025
1026         return (zio);
1027 }
1028
1029 zio_t *
1030 zio_write_phys(zio_t *pio, vdev_t *vd, uint64_t offset, uint64_t size,
1031     abd_t *data, int checksum, zio_done_func_t *done, void *private,
1032     zio_priority_t priority, enum zio_flag flags, boolean_t labels)
1033 {
1034         zio_t *zio;
1035
1036         ASSERT(vd->vdev_children == 0);
1037         ASSERT(!labels || offset + size <= VDEV_LABEL_START_SIZE ||
1038             offset >= vd->vdev_psize - VDEV_LABEL_END_SIZE);
1039         ASSERT3U(offset + size, <=, vd->vdev_psize);
1040
1041         zio = zio_create(pio, vd->vdev_spa, 0, NULL, data, size, size, done,
1042             private, ZIO_TYPE_WRITE, priority, flags | ZIO_FLAG_PHYSICAL, vd,
1043             offset, NULL, ZIO_STAGE_OPEN, ZIO_WRITE_PHYS_PIPELINE);
1044
1045         zio->io_prop.zp_checksum = checksum;
1046
1047         if (zio_checksum_table[checksum].ci_flags & ZCHECKSUM_FLAG_EMBEDDED) {
1048                 /*
1049                  * zec checksums are necessarily destructive -- they modify
1050                  * the end of the write buffer to hold the verifier/checksum.
1051                  * Therefore, we must make a local copy in case the data is
1052                  * being written to multiple places in parallel.
1053                  */
1054                 abd_t *wbuf = abd_alloc_sametype(data, size);
1055                 abd_copy(wbuf, data, size);
1056
1057                 zio_push_transform(zio, wbuf, size, size, NULL);
1058         }
1059
1060         return (zio);
1061 }
1062
1063 /*
1064  * Create a child I/O to do some work for us.
1065  */
1066 zio_t *
1067 zio_vdev_child_io(zio_t *pio, blkptr_t *bp, vdev_t *vd, uint64_t offset,
1068     abd_t *data, uint64_t size, int type, zio_priority_t priority,
1069     enum zio_flag flags, zio_done_func_t *done, void *private)
1070 {
1071         enum zio_stage pipeline = ZIO_VDEV_CHILD_PIPELINE;
1072         zio_t *zio;
1073
1074         ASSERT(vd->vdev_parent ==
1075             (pio->io_vd ? pio->io_vd : pio->io_spa->spa_root_vdev));
1076
1077         if (type == ZIO_TYPE_READ && bp != NULL) {
1078                 /*
1079                  * If we have the bp, then the child should perform the
1080                  * checksum and the parent need not.  This pushes error
1081                  * detection as close to the leaves as possible and
1082                  * eliminates redundant checksums in the interior nodes.
1083                  */
1084                 pipeline |= ZIO_STAGE_CHECKSUM_VERIFY;
1085                 pio->io_pipeline &= ~ZIO_STAGE_CHECKSUM_VERIFY;
1086         }
1087
1088         if (vd->vdev_children == 0)
1089                 offset += VDEV_LABEL_START_SIZE;
1090
1091         flags |= ZIO_VDEV_CHILD_FLAGS(pio) | ZIO_FLAG_DONT_PROPAGATE;
1092
1093         /*
1094          * If we've decided to do a repair, the write is not speculative --
1095          * even if the original read was.
1096          */
1097         if (flags & ZIO_FLAG_IO_REPAIR)
1098                 flags &= ~ZIO_FLAG_SPECULATIVE;
1099
1100         /*
1101          * If we're creating a child I/O that is not associated with a
1102          * top-level vdev, then the child zio is not an allocating I/O.
1103          * If this is a retried I/O then we ignore it since we will
1104          * have already processed the original allocating I/O.
1105          */
1106         if (flags & ZIO_FLAG_IO_ALLOCATING &&
1107             (vd != vd->vdev_top || (flags & ZIO_FLAG_IO_RETRY))) {
1108                 ASSERTV(metaslab_class_t *mc = spa_normal_class(pio->io_spa));
1109
1110                 ASSERT(mc->mc_alloc_throttle_enabled);
1111                 ASSERT(type == ZIO_TYPE_WRITE);
1112                 ASSERT(priority == ZIO_PRIORITY_ASYNC_WRITE);
1113                 ASSERT(!(flags & ZIO_FLAG_IO_REPAIR));
1114                 ASSERT(!(pio->io_flags & ZIO_FLAG_IO_REWRITE) ||
1115                     pio->io_child_type == ZIO_CHILD_GANG);
1116
1117                 flags &= ~ZIO_FLAG_IO_ALLOCATING;
1118         }
1119
1120
1121         zio = zio_create(pio, pio->io_spa, pio->io_txg, bp, data, size, size,
1122             done, private, type, priority, flags, vd, offset, &pio->io_bookmark,
1123             ZIO_STAGE_VDEV_IO_START >> 1, pipeline);
1124         ASSERT3U(zio->io_child_type, ==, ZIO_CHILD_VDEV);
1125
1126         zio->io_physdone = pio->io_physdone;
1127         if (vd->vdev_ops->vdev_op_leaf && zio->io_logical != NULL)
1128                 zio->io_logical->io_phys_children++;
1129
1130         return (zio);
1131 }
1132
1133 zio_t *
1134 zio_vdev_delegated_io(vdev_t *vd, uint64_t offset, abd_t *data, uint64_t size,
1135     int type, zio_priority_t priority, enum zio_flag flags,
1136     zio_done_func_t *done, void *private)
1137 {
1138         zio_t *zio;
1139
1140         ASSERT(vd->vdev_ops->vdev_op_leaf);
1141
1142         zio = zio_create(NULL, vd->vdev_spa, 0, NULL,
1143             data, size, size, done, private, type, priority,
1144             flags | ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_RETRY | ZIO_FLAG_DELEGATED,
1145             vd, offset, NULL,
1146             ZIO_STAGE_VDEV_IO_START >> 1, ZIO_VDEV_CHILD_PIPELINE);
1147
1148         return (zio);
1149 }
1150
1151 void
1152 zio_flush(zio_t *zio, vdev_t *vd)
1153 {
1154         zio_nowait(zio_ioctl(zio, zio->io_spa, vd, DKIOCFLUSHWRITECACHE,
1155             NULL, NULL,
1156             ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_PROPAGATE | ZIO_FLAG_DONT_RETRY));
1157 }
1158
1159 void
1160 zio_shrink(zio_t *zio, uint64_t size)
1161 {
1162         ASSERT(zio->io_executor == NULL);
1163         ASSERT(zio->io_orig_size == zio->io_size);
1164         ASSERT(size <= zio->io_size);
1165
1166         /*
1167          * We don't shrink for raidz because of problems with the
1168          * reconstruction when reading back less than the block size.
1169          * Note, BP_IS_RAIDZ() assumes no compression.
1170          */
1171         ASSERT(BP_GET_COMPRESS(zio->io_bp) == ZIO_COMPRESS_OFF);
1172         if (!BP_IS_RAIDZ(zio->io_bp)) {
1173                 /* we are not doing a raw write */
1174                 ASSERT3U(zio->io_size, ==, zio->io_lsize);
1175                 zio->io_orig_size = zio->io_size = zio->io_lsize = size;
1176         }
1177 }
1178
1179 /*
1180  * ==========================================================================
1181  * Prepare to read and write logical blocks
1182  * ==========================================================================
1183  */
1184
1185 static int
1186 zio_read_bp_init(zio_t *zio)
1187 {
1188         blkptr_t *bp = zio->io_bp;
1189
1190         if (BP_GET_COMPRESS(bp) != ZIO_COMPRESS_OFF &&
1191             zio->io_child_type == ZIO_CHILD_LOGICAL &&
1192             !(zio->io_flags & ZIO_FLAG_RAW)) {
1193                 uint64_t psize =
1194                     BP_IS_EMBEDDED(bp) ? BPE_GET_PSIZE(bp) : BP_GET_PSIZE(bp);
1195                 zio_push_transform(zio, abd_alloc_sametype(zio->io_abd, psize),
1196                     psize, psize, zio_decompress);
1197         }
1198
1199         if (BP_IS_EMBEDDED(bp) && BPE_GET_ETYPE(bp) == BP_EMBEDDED_TYPE_DATA) {
1200                 int psize = BPE_GET_PSIZE(bp);
1201                 void *data = abd_borrow_buf(zio->io_abd, psize);
1202
1203                 zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
1204                 decode_embedded_bp_compressed(bp, data);
1205                 abd_return_buf_copy(zio->io_abd, data, psize);
1206         } else {
1207                 ASSERT(!BP_IS_EMBEDDED(bp));
1208         }
1209
1210         if (!DMU_OT_IS_METADATA(BP_GET_TYPE(bp)) && BP_GET_LEVEL(bp) == 0)
1211                 zio->io_flags |= ZIO_FLAG_DONT_CACHE;
1212
1213         if (BP_GET_TYPE(bp) == DMU_OT_DDT_ZAP)
1214                 zio->io_flags |= ZIO_FLAG_DONT_CACHE;
1215
1216         if (BP_GET_DEDUP(bp) && zio->io_child_type == ZIO_CHILD_LOGICAL)
1217                 zio->io_pipeline = ZIO_DDT_READ_PIPELINE;
1218
1219         return (ZIO_PIPELINE_CONTINUE);
1220 }
1221
1222 static int
1223 zio_write_bp_init(zio_t *zio)
1224 {
1225
1226         if (!IO_IS_ALLOCATING(zio))
1227                 return (ZIO_PIPELINE_CONTINUE);
1228
1229         ASSERT(zio->io_child_type != ZIO_CHILD_DDT);
1230
1231         if (zio->io_bp_override) {
1232                 blkptr_t *bp = zio->io_bp;
1233                 zio_prop_t *zp = &zio->io_prop;
1234
1235                 ASSERT(bp->blk_birth != zio->io_txg);
1236                 ASSERT(BP_GET_DEDUP(zio->io_bp_override) == 0);
1237
1238                 *bp = *zio->io_bp_override;
1239                 zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
1240
1241                 if (BP_IS_EMBEDDED(bp))
1242                         return (ZIO_PIPELINE_CONTINUE);
1243
1244                 /*
1245                  * If we've been overridden and nopwrite is set then
1246                  * set the flag accordingly to indicate that a nopwrite
1247                  * has already occurred.
1248                  */
1249                 if (!BP_IS_HOLE(bp) && zp->zp_nopwrite) {
1250                         ASSERT(!zp->zp_dedup);
1251                         ASSERT3U(BP_GET_CHECKSUM(bp), ==, zp->zp_checksum);
1252                         zio->io_flags |= ZIO_FLAG_NOPWRITE;
1253                         return (ZIO_PIPELINE_CONTINUE);
1254                 }
1255
1256                 ASSERT(!zp->zp_nopwrite);
1257
1258                 if (BP_IS_HOLE(bp) || !zp->zp_dedup)
1259                         return (ZIO_PIPELINE_CONTINUE);
1260
1261                 ASSERT((zio_checksum_table[zp->zp_checksum].ci_flags &
1262                     ZCHECKSUM_FLAG_DEDUP) || zp->zp_dedup_verify);
1263
1264                 if (BP_GET_CHECKSUM(bp) == zp->zp_checksum) {
1265                         BP_SET_DEDUP(bp, 1);
1266                         zio->io_pipeline |= ZIO_STAGE_DDT_WRITE;
1267                         return (ZIO_PIPELINE_CONTINUE);
1268                 }
1269
1270                 /*
1271                  * We were unable to handle this as an override bp, treat
1272                  * it as a regular write I/O.
1273                  */
1274                 zio->io_bp_override = NULL;
1275                 *bp = zio->io_bp_orig;
1276                 zio->io_pipeline = zio->io_orig_pipeline;
1277         }
1278
1279         return (ZIO_PIPELINE_CONTINUE);
1280 }
1281
1282 static int
1283 zio_write_compress(zio_t *zio)
1284 {
1285         spa_t *spa = zio->io_spa;
1286         zio_prop_t *zp = &zio->io_prop;
1287         enum zio_compress compress = zp->zp_compress;
1288         blkptr_t *bp = zio->io_bp;
1289         uint64_t lsize = zio->io_lsize;
1290         uint64_t psize = zio->io_size;
1291         int pass = 1;
1292
1293         EQUIV(lsize != psize, (zio->io_flags & ZIO_FLAG_RAW) != 0);
1294
1295         /*
1296          * If our children haven't all reached the ready stage,
1297          * wait for them and then repeat this pipeline stage.
1298          */
1299         if (zio_wait_for_children(zio, ZIO_CHILD_GANG, ZIO_WAIT_READY) ||
1300             zio_wait_for_children(zio, ZIO_CHILD_LOGICAL, ZIO_WAIT_READY))
1301                 return (ZIO_PIPELINE_STOP);
1302
1303         if (!IO_IS_ALLOCATING(zio))
1304                 return (ZIO_PIPELINE_CONTINUE);
1305
1306         if (zio->io_children_ready != NULL) {
1307                 /*
1308                  * Now that all our children are ready, run the callback
1309                  * associated with this zio in case it wants to modify the
1310                  * data to be written.
1311                  */
1312                 ASSERT3U(zp->zp_level, >, 0);
1313                 zio->io_children_ready(zio);
1314         }
1315
1316         ASSERT(zio->io_child_type != ZIO_CHILD_DDT);
1317         ASSERT(zio->io_bp_override == NULL);
1318
1319         if (!BP_IS_HOLE(bp) && bp->blk_birth == zio->io_txg) {
1320                 /*
1321                  * We're rewriting an existing block, which means we're
1322                  * working on behalf of spa_sync().  For spa_sync() to
1323                  * converge, it must eventually be the case that we don't
1324                  * have to allocate new blocks.  But compression changes
1325                  * the blocksize, which forces a reallocate, and makes
1326                  * convergence take longer.  Therefore, after the first
1327                  * few passes, stop compressing to ensure convergence.
1328                  */
1329                 pass = spa_sync_pass(spa);
1330
1331                 ASSERT(zio->io_txg == spa_syncing_txg(spa));
1332                 ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
1333                 ASSERT(!BP_GET_DEDUP(bp));
1334
1335                 if (pass >= zfs_sync_pass_dont_compress)
1336                         compress = ZIO_COMPRESS_OFF;
1337
1338                 /* Make sure someone doesn't change their mind on overwrites */
1339                 ASSERT(BP_IS_EMBEDDED(bp) || MIN(zp->zp_copies + BP_IS_GANG(bp),
1340                     spa_max_replication(spa)) == BP_GET_NDVAS(bp));
1341         }
1342
1343         /* If it's a compressed write that is not raw, compress the buffer. */
1344         if (compress != ZIO_COMPRESS_OFF && psize == lsize) {
1345                 void *cbuf = zio_buf_alloc(lsize);
1346                 psize = zio_compress_data(compress, zio->io_abd, cbuf, lsize);
1347                 if (psize == 0 || psize == lsize) {
1348                         compress = ZIO_COMPRESS_OFF;
1349                         zio_buf_free(cbuf, lsize);
1350                 } else if (!zp->zp_dedup && psize <= BPE_PAYLOAD_SIZE &&
1351                     zp->zp_level == 0 && !DMU_OT_HAS_FILL(zp->zp_type) &&
1352                     spa_feature_is_enabled(spa, SPA_FEATURE_EMBEDDED_DATA)) {
1353                         encode_embedded_bp_compressed(bp,
1354                             cbuf, compress, lsize, psize);
1355                         BPE_SET_ETYPE(bp, BP_EMBEDDED_TYPE_DATA);
1356                         BP_SET_TYPE(bp, zio->io_prop.zp_type);
1357                         BP_SET_LEVEL(bp, zio->io_prop.zp_level);
1358                         zio_buf_free(cbuf, lsize);
1359                         bp->blk_birth = zio->io_txg;
1360                         zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
1361                         ASSERT(spa_feature_is_active(spa,
1362                             SPA_FEATURE_EMBEDDED_DATA));
1363                         return (ZIO_PIPELINE_CONTINUE);
1364                 } else {
1365                         /*
1366                          * Round up compressed size up to the ashift
1367                          * of the smallest-ashift device, and zero the tail.
1368                          * This ensures that the compressed size of the BP
1369                          * (and thus compressratio property) are correct,
1370                          * in that we charge for the padding used to fill out
1371                          * the last sector.
1372                          */
1373                         size_t rounded;
1374
1375                         ASSERT3U(spa->spa_min_ashift, >=, SPA_MINBLOCKSHIFT);
1376
1377                         rounded = (size_t)P2ROUNDUP(psize,
1378                             1ULL << spa->spa_min_ashift);
1379                         if (rounded >= lsize) {
1380                                 compress = ZIO_COMPRESS_OFF;
1381                                 zio_buf_free(cbuf, lsize);
1382                                 psize = lsize;
1383                         } else {
1384                                 abd_t *cdata = abd_get_from_buf(cbuf, lsize);
1385                                 abd_take_ownership_of_buf(cdata, B_TRUE);
1386                                 abd_zero_off(cdata, psize, rounded - psize);
1387                                 psize = rounded;
1388                                 zio_push_transform(zio, cdata,
1389                                     psize, lsize, NULL);
1390                         }
1391                 }
1392
1393                 /*
1394                  * We were unable to handle this as an override bp, treat
1395                  * it as a regular write I/O.
1396                  */
1397                 zio->io_bp_override = NULL;
1398                 *bp = zio->io_bp_orig;
1399                 zio->io_pipeline = zio->io_orig_pipeline;
1400
1401         } else {
1402                 ASSERT3U(psize, !=, 0);
1403
1404         }
1405
1406         /*
1407          * The final pass of spa_sync() must be all rewrites, but the first
1408          * few passes offer a trade-off: allocating blocks defers convergence,
1409          * but newly allocated blocks are sequential, so they can be written
1410          * to disk faster.  Therefore, we allow the first few passes of
1411          * spa_sync() to allocate new blocks, but force rewrites after that.
1412          * There should only be a handful of blocks after pass 1 in any case.
1413          */
1414         if (!BP_IS_HOLE(bp) && bp->blk_birth == zio->io_txg &&
1415             BP_GET_PSIZE(bp) == psize &&
1416             pass >= zfs_sync_pass_rewrite) {
1417                 enum zio_stage gang_stages = zio->io_pipeline & ZIO_GANG_STAGES;
1418                 ASSERT(psize != 0);
1419                 zio->io_pipeline = ZIO_REWRITE_PIPELINE | gang_stages;
1420                 zio->io_flags |= ZIO_FLAG_IO_REWRITE;
1421         } else {
1422                 BP_ZERO(bp);
1423                 zio->io_pipeline = ZIO_WRITE_PIPELINE;
1424         }
1425
1426         if (psize == 0) {
1427                 if (zio->io_bp_orig.blk_birth != 0 &&
1428                     spa_feature_is_active(spa, SPA_FEATURE_HOLE_BIRTH)) {
1429                         BP_SET_LSIZE(bp, lsize);
1430                         BP_SET_TYPE(bp, zp->zp_type);
1431                         BP_SET_LEVEL(bp, zp->zp_level);
1432                         BP_SET_BIRTH(bp, zio->io_txg, 0);
1433                 }
1434                 zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
1435         } else {
1436                 ASSERT(zp->zp_checksum != ZIO_CHECKSUM_GANG_HEADER);
1437                 BP_SET_LSIZE(bp, lsize);
1438                 BP_SET_TYPE(bp, zp->zp_type);
1439                 BP_SET_LEVEL(bp, zp->zp_level);
1440                 BP_SET_PSIZE(bp, psize);
1441                 BP_SET_COMPRESS(bp, compress);
1442                 BP_SET_CHECKSUM(bp, zp->zp_checksum);
1443                 BP_SET_DEDUP(bp, zp->zp_dedup);
1444                 BP_SET_BYTEORDER(bp, ZFS_HOST_BYTEORDER);
1445                 if (zp->zp_dedup) {
1446                         ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
1447                         ASSERT(!(zio->io_flags & ZIO_FLAG_IO_REWRITE));
1448                         zio->io_pipeline = ZIO_DDT_WRITE_PIPELINE;
1449                 }
1450                 if (zp->zp_nopwrite) {
1451                         ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
1452                         ASSERT(!(zio->io_flags & ZIO_FLAG_IO_REWRITE));
1453                         zio->io_pipeline |= ZIO_STAGE_NOP_WRITE;
1454                 }
1455         }
1456         return (ZIO_PIPELINE_CONTINUE);
1457 }
1458
1459 static int
1460 zio_free_bp_init(zio_t *zio)
1461 {
1462         blkptr_t *bp = zio->io_bp;
1463
1464         if (zio->io_child_type == ZIO_CHILD_LOGICAL) {
1465                 if (BP_GET_DEDUP(bp))
1466                         zio->io_pipeline = ZIO_DDT_FREE_PIPELINE;
1467         }
1468
1469         return (ZIO_PIPELINE_CONTINUE);
1470 }
1471
1472 /*
1473  * ==========================================================================
1474  * Execute the I/O pipeline
1475  * ==========================================================================
1476  */
1477
1478 static void
1479 zio_taskq_dispatch(zio_t *zio, zio_taskq_type_t q, boolean_t cutinline)
1480 {
1481         spa_t *spa = zio->io_spa;
1482         zio_type_t t = zio->io_type;
1483         int flags = (cutinline ? TQ_FRONT : 0);
1484
1485         /*
1486          * If we're a config writer or a probe, the normal issue and
1487          * interrupt threads may all be blocked waiting for the config lock.
1488          * In this case, select the otherwise-unused taskq for ZIO_TYPE_NULL.
1489          */
1490         if (zio->io_flags & (ZIO_FLAG_CONFIG_WRITER | ZIO_FLAG_PROBE))
1491                 t = ZIO_TYPE_NULL;
1492
1493         /*
1494          * A similar issue exists for the L2ARC write thread until L2ARC 2.0.
1495          */
1496         if (t == ZIO_TYPE_WRITE && zio->io_vd && zio->io_vd->vdev_aux)
1497                 t = ZIO_TYPE_NULL;
1498
1499         /*
1500          * If this is a high priority I/O, then use the high priority taskq if
1501          * available.
1502          */
1503         if (zio->io_priority == ZIO_PRIORITY_NOW &&
1504             spa->spa_zio_taskq[t][q + 1].stqs_count != 0)
1505                 q++;
1506
1507         ASSERT3U(q, <, ZIO_TASKQ_TYPES);
1508
1509         /*
1510          * NB: We are assuming that the zio can only be dispatched
1511          * to a single taskq at a time.  It would be a grievous error
1512          * to dispatch the zio to another taskq at the same time.
1513          */
1514         ASSERT(taskq_empty_ent(&zio->io_tqent));
1515         spa_taskq_dispatch_ent(spa, t, q, (task_func_t *)zio_execute, zio,
1516             flags, &zio->io_tqent);
1517 }
1518
1519 static boolean_t
1520 zio_taskq_member(zio_t *zio, zio_taskq_type_t q)
1521 {
1522         kthread_t *executor = zio->io_executor;
1523         spa_t *spa = zio->io_spa;
1524         zio_type_t t;
1525
1526         for (t = 0; t < ZIO_TYPES; t++) {
1527                 spa_taskqs_t *tqs = &spa->spa_zio_taskq[t][q];
1528                 uint_t i;
1529                 for (i = 0; i < tqs->stqs_count; i++) {
1530                         if (taskq_member(tqs->stqs_taskq[i], executor))
1531                                 return (B_TRUE);
1532                 }
1533         }
1534
1535         return (B_FALSE);
1536 }
1537
1538 static int
1539 zio_issue_async(zio_t *zio)
1540 {
1541         zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE, B_FALSE);
1542
1543         return (ZIO_PIPELINE_STOP);
1544 }
1545
1546 void
1547 zio_interrupt(zio_t *zio)
1548 {
1549         zio_taskq_dispatch(zio, ZIO_TASKQ_INTERRUPT, B_FALSE);
1550 }
1551
1552 void
1553 zio_delay_interrupt(zio_t *zio)
1554 {
1555         /*
1556          * The timeout_generic() function isn't defined in userspace, so
1557          * rather than trying to implement the function, the zio delay
1558          * functionality has been disabled for userspace builds.
1559          */
1560
1561 #ifdef _KERNEL
1562         /*
1563          * If io_target_timestamp is zero, then no delay has been registered
1564          * for this IO, thus jump to the end of this function and "skip" the
1565          * delay; issuing it directly to the zio layer.
1566          */
1567         if (zio->io_target_timestamp != 0) {
1568                 hrtime_t now = gethrtime();
1569
1570                 if (now >= zio->io_target_timestamp) {
1571                         /*
1572                          * This IO has already taken longer than the target
1573                          * delay to complete, so we don't want to delay it
1574                          * any longer; we "miss" the delay and issue it
1575                          * directly to the zio layer. This is likely due to
1576                          * the target latency being set to a value less than
1577                          * the underlying hardware can satisfy (e.g. delay
1578                          * set to 1ms, but the disks take 10ms to complete an
1579                          * IO request).
1580                          */
1581
1582                         DTRACE_PROBE2(zio__delay__miss, zio_t *, zio,
1583                             hrtime_t, now);
1584
1585                         zio_interrupt(zio);
1586                 } else {
1587                         taskqid_t tid;
1588                         hrtime_t diff = zio->io_target_timestamp - now;
1589                         clock_t expire_at_tick = ddi_get_lbolt() +
1590                             NSEC_TO_TICK(diff);
1591
1592                         DTRACE_PROBE3(zio__delay__hit, zio_t *, zio,
1593                             hrtime_t, now, hrtime_t, diff);
1594
1595                         if (NSEC_TO_TICK(diff) == 0) {
1596                                 /* Our delay is less than a jiffy - just spin */
1597                                 zfs_sleep_until(zio->io_target_timestamp);
1598                         } else {
1599                                 /*
1600                                  * Use taskq_dispatch_delay() in the place of
1601                                  * OpenZFS's timeout_generic().
1602                                  */
1603                                 tid = taskq_dispatch_delay(system_taskq,
1604                                     (task_func_t *)zio_interrupt,
1605                                     zio, TQ_NOSLEEP, expire_at_tick);
1606                                 if (tid == TASKQID_INVALID) {
1607                                         /*
1608                                          * Couldn't allocate a task.  Just
1609                                          * finish the zio without a delay.
1610                                          */
1611                                         zio_interrupt(zio);
1612                                 }
1613                         }
1614                 }
1615                 return;
1616         }
1617 #endif
1618         DTRACE_PROBE1(zio__delay__skip, zio_t *, zio);
1619         zio_interrupt(zio);
1620 }
1621
1622 /*
1623  * Execute the I/O pipeline until one of the following occurs:
1624  * (1) the I/O completes; (2) the pipeline stalls waiting for
1625  * dependent child I/Os; (3) the I/O issues, so we're waiting
1626  * for an I/O completion interrupt; (4) the I/O is delegated by
1627  * vdev-level caching or aggregation; (5) the I/O is deferred
1628  * due to vdev-level queueing; (6) the I/O is handed off to
1629  * another thread.  In all cases, the pipeline stops whenever
1630  * there's no CPU work; it never burns a thread in cv_wait_io().
1631  *
1632  * There's no locking on io_stage because there's no legitimate way
1633  * for multiple threads to be attempting to process the same I/O.
1634  */
1635 static zio_pipe_stage_t *zio_pipeline[];
1636
1637 /*
1638  * zio_execute() is a wrapper around the static function
1639  * __zio_execute() so that we can force  __zio_execute() to be
1640  * inlined.  This reduces stack overhead which is important
1641  * because __zio_execute() is called recursively in several zio
1642  * code paths.  zio_execute() itself cannot be inlined because
1643  * it is externally visible.
1644  */
1645 void
1646 zio_execute(zio_t *zio)
1647 {
1648         fstrans_cookie_t cookie;
1649
1650         cookie = spl_fstrans_mark();
1651         __zio_execute(zio);
1652         spl_fstrans_unmark(cookie);
1653 }
1654
1655 /*
1656  * Used to determine if in the current context the stack is sized large
1657  * enough to allow zio_execute() to be called recursively.  A minimum
1658  * stack size of 16K is required to avoid needing to re-dispatch the zio.
1659  */
1660 boolean_t
1661 zio_execute_stack_check(zio_t *zio)
1662 {
1663 #if !defined(HAVE_LARGE_STACKS)
1664         dsl_pool_t *dp = spa_get_dsl(zio->io_spa);
1665
1666         /* Executing in txg_sync_thread() context. */
1667         if (dp && curthread == dp->dp_tx.tx_sync_thread)
1668                 return (B_TRUE);
1669
1670         /* Pool initialization outside of zio_taskq context. */
1671         if (dp && spa_is_initializing(dp->dp_spa) &&
1672             !zio_taskq_member(zio, ZIO_TASKQ_ISSUE) &&
1673             !zio_taskq_member(zio, ZIO_TASKQ_ISSUE_HIGH))
1674                 return (B_TRUE);
1675 #endif /* HAVE_LARGE_STACKS */
1676
1677         return (B_FALSE);
1678 }
1679
1680 __attribute__((always_inline))
1681 static inline void
1682 __zio_execute(zio_t *zio)
1683 {
1684         zio->io_executor = curthread;
1685
1686         ASSERT3U(zio->io_queued_timestamp, >, 0);
1687
1688         while (zio->io_stage < ZIO_STAGE_DONE) {
1689                 enum zio_stage pipeline = zio->io_pipeline;
1690                 enum zio_stage stage = zio->io_stage;
1691                 int rv;
1692
1693                 ASSERT(!MUTEX_HELD(&zio->io_lock));
1694                 ASSERT(ISP2(stage));
1695                 ASSERT(zio->io_stall == NULL);
1696
1697                 do {
1698                         stage <<= 1;
1699                 } while ((stage & pipeline) == 0);
1700
1701                 ASSERT(stage <= ZIO_STAGE_DONE);
1702
1703                 /*
1704                  * If we are in interrupt context and this pipeline stage
1705                  * will grab a config lock that is held across I/O,
1706                  * or may wait for an I/O that needs an interrupt thread
1707                  * to complete, issue async to avoid deadlock.
1708                  *
1709                  * For VDEV_IO_START, we cut in line so that the io will
1710                  * be sent to disk promptly.
1711                  */
1712                 if ((stage & ZIO_BLOCKING_STAGES) && zio->io_vd == NULL &&
1713                     zio_taskq_member(zio, ZIO_TASKQ_INTERRUPT)) {
1714                         boolean_t cut = (stage == ZIO_STAGE_VDEV_IO_START) ?
1715                             zio_requeue_io_start_cut_in_line : B_FALSE;
1716                         zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE, cut);
1717                         return;
1718                 }
1719
1720                 /*
1721                  * If the current context doesn't have large enough stacks
1722                  * the zio must be issued asynchronously to prevent overflow.
1723                  */
1724                 if (zio_execute_stack_check(zio)) {
1725                         boolean_t cut = (stage == ZIO_STAGE_VDEV_IO_START) ?
1726                             zio_requeue_io_start_cut_in_line : B_FALSE;
1727                         zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE, cut);
1728                         return;
1729                 }
1730
1731                 zio->io_stage = stage;
1732                 zio->io_pipeline_trace |= zio->io_stage;
1733                 rv = zio_pipeline[highbit64(stage) - 1](zio);
1734
1735                 if (rv == ZIO_PIPELINE_STOP)
1736                         return;
1737
1738                 ASSERT(rv == ZIO_PIPELINE_CONTINUE);
1739         }
1740 }
1741
1742
1743 /*
1744  * ==========================================================================
1745  * Initiate I/O, either sync or async
1746  * ==========================================================================
1747  */
1748 int
1749 zio_wait(zio_t *zio)
1750 {
1751         int error;
1752
1753         ASSERT(zio->io_stage == ZIO_STAGE_OPEN);
1754         ASSERT(zio->io_executor == NULL);
1755
1756         zio->io_waiter = curthread;
1757         ASSERT0(zio->io_queued_timestamp);
1758         zio->io_queued_timestamp = gethrtime();
1759
1760         __zio_execute(zio);
1761
1762         mutex_enter(&zio->io_lock);
1763         while (zio->io_executor != NULL)
1764                 cv_wait_io(&zio->io_cv, &zio->io_lock);
1765         mutex_exit(&zio->io_lock);
1766
1767         error = zio->io_error;
1768         zio_destroy(zio);
1769
1770         return (error);
1771 }
1772
1773 void
1774 zio_nowait(zio_t *zio)
1775 {
1776         ASSERT(zio->io_executor == NULL);
1777
1778         if (zio->io_child_type == ZIO_CHILD_LOGICAL &&
1779             zio_unique_parent(zio) == NULL) {
1780                 zio_t *pio;
1781
1782                 /*
1783                  * This is a logical async I/O with no parent to wait for it.
1784                  * We add it to the spa_async_root_zio "Godfather" I/O which
1785                  * will ensure they complete prior to unloading the pool.
1786                  */
1787                 spa_t *spa = zio->io_spa;
1788                 kpreempt_disable();
1789                 pio = spa->spa_async_zio_root[CPU_SEQID];
1790                 kpreempt_enable();
1791
1792                 zio_add_child(pio, zio);
1793         }
1794
1795         ASSERT0(zio->io_queued_timestamp);
1796         zio->io_queued_timestamp = gethrtime();
1797         __zio_execute(zio);
1798 }
1799
1800 /*
1801  * ==========================================================================
1802  * Reexecute or suspend/resume failed I/O
1803  * ==========================================================================
1804  */
1805
1806 static void
1807 zio_reexecute(zio_t *pio)
1808 {
1809         zio_t *cio, *cio_next;
1810         int c, w;
1811         zio_link_t *zl = NULL;
1812
1813         ASSERT(pio->io_child_type == ZIO_CHILD_LOGICAL);
1814         ASSERT(pio->io_orig_stage == ZIO_STAGE_OPEN);
1815         ASSERT(pio->io_gang_leader == NULL);
1816         ASSERT(pio->io_gang_tree == NULL);
1817
1818         pio->io_flags = pio->io_orig_flags;
1819         pio->io_stage = pio->io_orig_stage;
1820         pio->io_pipeline = pio->io_orig_pipeline;
1821         pio->io_reexecute = 0;
1822         pio->io_flags |= ZIO_FLAG_REEXECUTED;
1823         pio->io_pipeline_trace = 0;
1824         pio->io_error = 0;
1825         for (w = 0; w < ZIO_WAIT_TYPES; w++)
1826                 pio->io_state[w] = 0;
1827         for (c = 0; c < ZIO_CHILD_TYPES; c++)
1828                 pio->io_child_error[c] = 0;
1829
1830         if (IO_IS_ALLOCATING(pio))
1831                 BP_ZERO(pio->io_bp);
1832
1833         /*
1834          * As we reexecute pio's children, new children could be created.
1835          * New children go to the head of pio's io_child_list, however,
1836          * so we will (correctly) not reexecute them.  The key is that
1837          * the remainder of pio's io_child_list, from 'cio_next' onward,
1838          * cannot be affected by any side effects of reexecuting 'cio'.
1839          */
1840         for (cio = zio_walk_children(pio, &zl); cio != NULL; cio = cio_next) {
1841                 cio_next = zio_walk_children(pio, &zl);
1842                 mutex_enter(&pio->io_lock);
1843                 for (w = 0; w < ZIO_WAIT_TYPES; w++)
1844                         pio->io_children[cio->io_child_type][w]++;
1845                 mutex_exit(&pio->io_lock);
1846                 zio_reexecute(cio);
1847         }
1848
1849         /*
1850          * Now that all children have been reexecuted, execute the parent.
1851          * We don't reexecute "The Godfather" I/O here as it's the
1852          * responsibility of the caller to wait on it.
1853          */
1854         if (!(pio->io_flags & ZIO_FLAG_GODFATHER)) {
1855                 pio->io_queued_timestamp = gethrtime();
1856                 __zio_execute(pio);
1857         }
1858 }
1859
1860 void
1861 zio_suspend(spa_t *spa, zio_t *zio)
1862 {
1863         if (spa_get_failmode(spa) == ZIO_FAILURE_MODE_PANIC)
1864                 fm_panic("Pool '%s' has encountered an uncorrectable I/O "
1865                     "failure and the failure mode property for this pool "
1866                     "is set to panic.", spa_name(spa));
1867
1868         cmn_err(CE_WARN, "Pool '%s' has encountered an uncorrectable I/O "
1869             "failure and has been suspended.\n", spa_name(spa));
1870
1871         zfs_ereport_post(FM_EREPORT_ZFS_IO_FAILURE, spa, NULL, NULL, 0, 0);
1872
1873         mutex_enter(&spa->spa_suspend_lock);
1874
1875         if (spa->spa_suspend_zio_root == NULL)
1876                 spa->spa_suspend_zio_root = zio_root(spa, NULL, NULL,
1877                     ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE |
1878                     ZIO_FLAG_GODFATHER);
1879
1880         spa->spa_suspended = B_TRUE;
1881
1882         if (zio != NULL) {
1883                 ASSERT(!(zio->io_flags & ZIO_FLAG_GODFATHER));
1884                 ASSERT(zio != spa->spa_suspend_zio_root);
1885                 ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
1886                 ASSERT(zio_unique_parent(zio) == NULL);
1887                 ASSERT(zio->io_stage == ZIO_STAGE_DONE);
1888                 zio_add_child(spa->spa_suspend_zio_root, zio);
1889         }
1890
1891         mutex_exit(&spa->spa_suspend_lock);
1892 }
1893
1894 int
1895 zio_resume(spa_t *spa)
1896 {
1897         zio_t *pio;
1898
1899         /*
1900          * Reexecute all previously suspended i/o.
1901          */
1902         mutex_enter(&spa->spa_suspend_lock);
1903         spa->spa_suspended = B_FALSE;
1904         cv_broadcast(&spa->spa_suspend_cv);
1905         pio = spa->spa_suspend_zio_root;
1906         spa->spa_suspend_zio_root = NULL;
1907         mutex_exit(&spa->spa_suspend_lock);
1908
1909         if (pio == NULL)
1910                 return (0);
1911
1912         zio_reexecute(pio);
1913         return (zio_wait(pio));
1914 }
1915
1916 void
1917 zio_resume_wait(spa_t *spa)
1918 {
1919         mutex_enter(&spa->spa_suspend_lock);
1920         while (spa_suspended(spa))
1921                 cv_wait(&spa->spa_suspend_cv, &spa->spa_suspend_lock);
1922         mutex_exit(&spa->spa_suspend_lock);
1923 }
1924
1925 /*
1926  * ==========================================================================
1927  * Gang blocks.
1928  *
1929  * A gang block is a collection of small blocks that looks to the DMU
1930  * like one large block.  When zio_dva_allocate() cannot find a block
1931  * of the requested size, due to either severe fragmentation or the pool
1932  * being nearly full, it calls zio_write_gang_block() to construct the
1933  * block from smaller fragments.
1934  *
1935  * A gang block consists of a gang header (zio_gbh_phys_t) and up to
1936  * three (SPA_GBH_NBLKPTRS) gang members.  The gang header is just like
1937  * an indirect block: it's an array of block pointers.  It consumes
1938  * only one sector and hence is allocatable regardless of fragmentation.
1939  * The gang header's bps point to its gang members, which hold the data.
1940  *
1941  * Gang blocks are self-checksumming, using the bp's <vdev, offset, txg>
1942  * as the verifier to ensure uniqueness of the SHA256 checksum.
1943  * Critically, the gang block bp's blk_cksum is the checksum of the data,
1944  * not the gang header.  This ensures that data block signatures (needed for
1945  * deduplication) are independent of how the block is physically stored.
1946  *
1947  * Gang blocks can be nested: a gang member may itself be a gang block.
1948  * Thus every gang block is a tree in which root and all interior nodes are
1949  * gang headers, and the leaves are normal blocks that contain user data.
1950  * The root of the gang tree is called the gang leader.
1951  *
1952  * To perform any operation (read, rewrite, free, claim) on a gang block,
1953  * zio_gang_assemble() first assembles the gang tree (minus data leaves)
1954  * in the io_gang_tree field of the original logical i/o by recursively
1955  * reading the gang leader and all gang headers below it.  This yields
1956  * an in-core tree containing the contents of every gang header and the
1957  * bps for every constituent of the gang block.
1958  *
1959  * With the gang tree now assembled, zio_gang_issue() just walks the gang tree
1960  * and invokes a callback on each bp.  To free a gang block, zio_gang_issue()
1961  * calls zio_free_gang() -- a trivial wrapper around zio_free() -- for each bp.
1962  * zio_claim_gang() provides a similarly trivial wrapper for zio_claim().
1963  * zio_read_gang() is a wrapper around zio_read() that omits reading gang
1964  * headers, since we already have those in io_gang_tree.  zio_rewrite_gang()
1965  * performs a zio_rewrite() of the data or, for gang headers, a zio_rewrite()
1966  * of the gang header plus zio_checksum_compute() of the data to update the
1967  * gang header's blk_cksum as described above.
1968  *
1969  * The two-phase assemble/issue model solves the problem of partial failure --
1970  * what if you'd freed part of a gang block but then couldn't read the
1971  * gang header for another part?  Assembling the entire gang tree first
1972  * ensures that all the necessary gang header I/O has succeeded before
1973  * starting the actual work of free, claim, or write.  Once the gang tree
1974  * is assembled, free and claim are in-memory operations that cannot fail.
1975  *
1976  * In the event that a gang write fails, zio_dva_unallocate() walks the
1977  * gang tree to immediately free (i.e. insert back into the space map)
1978  * everything we've allocated.  This ensures that we don't get ENOSPC
1979  * errors during repeated suspend/resume cycles due to a flaky device.
1980  *
1981  * Gang rewrites only happen during sync-to-convergence.  If we can't assemble
1982  * the gang tree, we won't modify the block, so we can safely defer the free
1983  * (knowing that the block is still intact).  If we *can* assemble the gang
1984  * tree, then even if some of the rewrites fail, zio_dva_unallocate() will free
1985  * each constituent bp and we can allocate a new block on the next sync pass.
1986  *
1987  * In all cases, the gang tree allows complete recovery from partial failure.
1988  * ==========================================================================
1989  */
1990
1991 static void
1992 zio_gang_issue_func_done(zio_t *zio)
1993 {
1994         abd_put(zio->io_abd);
1995 }
1996
1997 static zio_t *
1998 zio_read_gang(zio_t *pio, blkptr_t *bp, zio_gang_node_t *gn, abd_t *data,
1999     uint64_t offset)
2000 {
2001         if (gn != NULL)
2002                 return (pio);
2003
2004         return (zio_read(pio, pio->io_spa, bp, abd_get_offset(data, offset),
2005             BP_GET_PSIZE(bp), zio_gang_issue_func_done,
2006             NULL, pio->io_priority, ZIO_GANG_CHILD_FLAGS(pio),
2007             &pio->io_bookmark));
2008 }
2009
2010 static zio_t *
2011 zio_rewrite_gang(zio_t *pio, blkptr_t *bp, zio_gang_node_t *gn, abd_t *data,
2012     uint64_t offset)
2013 {
2014         zio_t *zio;
2015
2016         if (gn != NULL) {
2017                 abd_t *gbh_abd =
2018                     abd_get_from_buf(gn->gn_gbh, SPA_GANGBLOCKSIZE);
2019                 zio = zio_rewrite(pio, pio->io_spa, pio->io_txg, bp,
2020                     gbh_abd, SPA_GANGBLOCKSIZE, zio_gang_issue_func_done, NULL,
2021                     pio->io_priority, ZIO_GANG_CHILD_FLAGS(pio),
2022                     &pio->io_bookmark);
2023                 /*
2024                  * As we rewrite each gang header, the pipeline will compute
2025                  * a new gang block header checksum for it; but no one will
2026                  * compute a new data checksum, so we do that here.  The one
2027                  * exception is the gang leader: the pipeline already computed
2028                  * its data checksum because that stage precedes gang assembly.
2029                  * (Presently, nothing actually uses interior data checksums;
2030                  * this is just good hygiene.)
2031                  */
2032                 if (gn != pio->io_gang_leader->io_gang_tree) {
2033                         abd_t *buf = abd_get_offset(data, offset);
2034
2035                         zio_checksum_compute(zio, BP_GET_CHECKSUM(bp),
2036                             buf, BP_GET_PSIZE(bp));
2037
2038                         abd_put(buf);
2039                 }
2040                 /*
2041                  * If we are here to damage data for testing purposes,
2042                  * leave the GBH alone so that we can detect the damage.
2043                  */
2044                 if (pio->io_gang_leader->io_flags & ZIO_FLAG_INDUCE_DAMAGE)
2045                         zio->io_pipeline &= ~ZIO_VDEV_IO_STAGES;
2046         } else {
2047                 zio = zio_rewrite(pio, pio->io_spa, pio->io_txg, bp,
2048                     abd_get_offset(data, offset), BP_GET_PSIZE(bp),
2049                     zio_gang_issue_func_done, NULL, pio->io_priority,
2050                     ZIO_GANG_CHILD_FLAGS(pio), &pio->io_bookmark);
2051         }
2052
2053         return (zio);
2054 }
2055
2056 /* ARGSUSED */
2057 static zio_t *
2058 zio_free_gang(zio_t *pio, blkptr_t *bp, zio_gang_node_t *gn, abd_t *data,
2059     uint64_t offset)
2060 {
2061         return (zio_free_sync(pio, pio->io_spa, pio->io_txg, bp,
2062             ZIO_GANG_CHILD_FLAGS(pio)));
2063 }
2064
2065 /* ARGSUSED */
2066 static zio_t *
2067 zio_claim_gang(zio_t *pio, blkptr_t *bp, zio_gang_node_t *gn, abd_t *data,
2068     uint64_t offset)
2069 {
2070         return (zio_claim(pio, pio->io_spa, pio->io_txg, bp,
2071             NULL, NULL, ZIO_GANG_CHILD_FLAGS(pio)));
2072 }
2073
2074 static zio_gang_issue_func_t *zio_gang_issue_func[ZIO_TYPES] = {
2075         NULL,
2076         zio_read_gang,
2077         zio_rewrite_gang,
2078         zio_free_gang,
2079         zio_claim_gang,
2080         NULL
2081 };
2082
2083 static void zio_gang_tree_assemble_done(zio_t *zio);
2084
2085 static zio_gang_node_t *
2086 zio_gang_node_alloc(zio_gang_node_t **gnpp)
2087 {
2088         zio_gang_node_t *gn;
2089
2090         ASSERT(*gnpp == NULL);
2091
2092         gn = kmem_zalloc(sizeof (*gn), KM_SLEEP);
2093         gn->gn_gbh = zio_buf_alloc(SPA_GANGBLOCKSIZE);
2094         *gnpp = gn;
2095
2096         return (gn);
2097 }
2098
2099 static void
2100 zio_gang_node_free(zio_gang_node_t **gnpp)
2101 {
2102         zio_gang_node_t *gn = *gnpp;
2103         int g;
2104
2105         for (g = 0; g < SPA_GBH_NBLKPTRS; g++)
2106                 ASSERT(gn->gn_child[g] == NULL);
2107
2108         zio_buf_free(gn->gn_gbh, SPA_GANGBLOCKSIZE);
2109         kmem_free(gn, sizeof (*gn));
2110         *gnpp = NULL;
2111 }
2112
2113 static void
2114 zio_gang_tree_free(zio_gang_node_t **gnpp)
2115 {
2116         zio_gang_node_t *gn = *gnpp;
2117         int g;
2118
2119         if (gn == NULL)
2120                 return;
2121
2122         for (g = 0; g < SPA_GBH_NBLKPTRS; g++)
2123                 zio_gang_tree_free(&gn->gn_child[g]);
2124
2125         zio_gang_node_free(gnpp);
2126 }
2127
2128 static void
2129 zio_gang_tree_assemble(zio_t *gio, blkptr_t *bp, zio_gang_node_t **gnpp)
2130 {
2131         zio_gang_node_t *gn = zio_gang_node_alloc(gnpp);
2132         abd_t *gbh_abd = abd_get_from_buf(gn->gn_gbh, SPA_GANGBLOCKSIZE);
2133
2134         ASSERT(gio->io_gang_leader == gio);
2135         ASSERT(BP_IS_GANG(bp));
2136
2137         zio_nowait(zio_read(gio, gio->io_spa, bp, gbh_abd, SPA_GANGBLOCKSIZE,
2138             zio_gang_tree_assemble_done, gn, gio->io_priority,
2139             ZIO_GANG_CHILD_FLAGS(gio), &gio->io_bookmark));
2140 }
2141
2142 static void
2143 zio_gang_tree_assemble_done(zio_t *zio)
2144 {
2145         zio_t *gio = zio->io_gang_leader;
2146         zio_gang_node_t *gn = zio->io_private;
2147         blkptr_t *bp = zio->io_bp;
2148         int g;
2149
2150         ASSERT(gio == zio_unique_parent(zio));
2151         ASSERT(zio->io_child_count == 0);
2152
2153         if (zio->io_error)
2154                 return;
2155
2156         /* this ABD was created from a linear buf in zio_gang_tree_assemble */
2157         if (BP_SHOULD_BYTESWAP(bp))
2158                 byteswap_uint64_array(abd_to_buf(zio->io_abd), zio->io_size);
2159
2160         ASSERT3P(abd_to_buf(zio->io_abd), ==, gn->gn_gbh);
2161         ASSERT(zio->io_size == SPA_GANGBLOCKSIZE);
2162         ASSERT(gn->gn_gbh->zg_tail.zec_magic == ZEC_MAGIC);
2163
2164         abd_put(zio->io_abd);
2165
2166         for (g = 0; g < SPA_GBH_NBLKPTRS; g++) {
2167                 blkptr_t *gbp = &gn->gn_gbh->zg_blkptr[g];
2168                 if (!BP_IS_GANG(gbp))
2169                         continue;
2170                 zio_gang_tree_assemble(gio, gbp, &gn->gn_child[g]);
2171         }
2172 }
2173
2174 static void
2175 zio_gang_tree_issue(zio_t *pio, zio_gang_node_t *gn, blkptr_t *bp, abd_t *data,
2176     uint64_t offset)
2177 {
2178         zio_t *gio = pio->io_gang_leader;
2179         zio_t *zio;
2180         int g;
2181
2182         ASSERT(BP_IS_GANG(bp) == !!gn);
2183         ASSERT(BP_GET_CHECKSUM(bp) == BP_GET_CHECKSUM(gio->io_bp));
2184         ASSERT(BP_GET_LSIZE(bp) == BP_GET_PSIZE(bp) || gn == gio->io_gang_tree);
2185
2186         /*
2187          * If you're a gang header, your data is in gn->gn_gbh.
2188          * If you're a gang member, your data is in 'data' and gn == NULL.
2189          */
2190         zio = zio_gang_issue_func[gio->io_type](pio, bp, gn, data, offset);
2191
2192         if (gn != NULL) {
2193                 ASSERT(gn->gn_gbh->zg_tail.zec_magic == ZEC_MAGIC);
2194
2195                 for (g = 0; g < SPA_GBH_NBLKPTRS; g++) {
2196                         blkptr_t *gbp = &gn->gn_gbh->zg_blkptr[g];
2197                         if (BP_IS_HOLE(gbp))
2198                                 continue;
2199                         zio_gang_tree_issue(zio, gn->gn_child[g], gbp, data,
2200                             offset);
2201                         offset += BP_GET_PSIZE(gbp);
2202                 }
2203         }
2204
2205         if (gn == gio->io_gang_tree)
2206                 ASSERT3U(gio->io_size, ==, offset);
2207
2208         if (zio != pio)
2209                 zio_nowait(zio);
2210 }
2211
2212 static int
2213 zio_gang_assemble(zio_t *zio)
2214 {
2215         blkptr_t *bp = zio->io_bp;
2216
2217         ASSERT(BP_IS_GANG(bp) && zio->io_gang_leader == NULL);
2218         ASSERT(zio->io_child_type > ZIO_CHILD_GANG);
2219
2220         zio->io_gang_leader = zio;
2221
2222         zio_gang_tree_assemble(zio, bp, &zio->io_gang_tree);
2223
2224         return (ZIO_PIPELINE_CONTINUE);
2225 }
2226
2227 static int
2228 zio_gang_issue(zio_t *zio)
2229 {
2230         blkptr_t *bp = zio->io_bp;
2231
2232         if (zio_wait_for_children(zio, ZIO_CHILD_GANG, ZIO_WAIT_DONE))
2233                 return (ZIO_PIPELINE_STOP);
2234
2235         ASSERT(BP_IS_GANG(bp) && zio->io_gang_leader == zio);
2236         ASSERT(zio->io_child_type > ZIO_CHILD_GANG);
2237
2238         if (zio->io_child_error[ZIO_CHILD_GANG] == 0)
2239                 zio_gang_tree_issue(zio, zio->io_gang_tree, bp, zio->io_abd,
2240                     0);
2241         else
2242                 zio_gang_tree_free(&zio->io_gang_tree);
2243
2244         zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
2245
2246         return (ZIO_PIPELINE_CONTINUE);
2247 }
2248
2249 static void
2250 zio_write_gang_member_ready(zio_t *zio)
2251 {
2252         zio_t *pio = zio_unique_parent(zio);
2253         dva_t *cdva = zio->io_bp->blk_dva;
2254         dva_t *pdva = pio->io_bp->blk_dva;
2255         uint64_t asize;
2256         int d;
2257         ASSERTV(zio_t *gio = zio->io_gang_leader);
2258
2259         if (BP_IS_HOLE(zio->io_bp))
2260                 return;
2261
2262         ASSERT(BP_IS_HOLE(&zio->io_bp_orig));
2263
2264         ASSERT(zio->io_child_type == ZIO_CHILD_GANG);
2265         ASSERT3U(zio->io_prop.zp_copies, ==, gio->io_prop.zp_copies);
2266         ASSERT3U(zio->io_prop.zp_copies, <=, BP_GET_NDVAS(zio->io_bp));
2267         ASSERT3U(pio->io_prop.zp_copies, <=, BP_GET_NDVAS(pio->io_bp));
2268         ASSERT3U(BP_GET_NDVAS(zio->io_bp), <=, BP_GET_NDVAS(pio->io_bp));
2269
2270         mutex_enter(&pio->io_lock);
2271         for (d = 0; d < BP_GET_NDVAS(zio->io_bp); d++) {
2272                 ASSERT(DVA_GET_GANG(&pdva[d]));
2273                 asize = DVA_GET_ASIZE(&pdva[d]);
2274                 asize += DVA_GET_ASIZE(&cdva[d]);
2275                 DVA_SET_ASIZE(&pdva[d], asize);
2276         }
2277         mutex_exit(&pio->io_lock);
2278 }
2279
2280 static void
2281 zio_write_gang_done(zio_t *zio)
2282 {
2283         abd_put(zio->io_abd);
2284 }
2285
2286 static int
2287 zio_write_gang_block(zio_t *pio)
2288 {
2289         spa_t *spa = pio->io_spa;
2290         metaslab_class_t *mc = spa_normal_class(spa);
2291         blkptr_t *bp = pio->io_bp;
2292         zio_t *gio = pio->io_gang_leader;
2293         zio_t *zio;
2294         zio_gang_node_t *gn, **gnpp;
2295         zio_gbh_phys_t *gbh;
2296         abd_t *gbh_abd;
2297         uint64_t txg = pio->io_txg;
2298         uint64_t resid = pio->io_size;
2299         uint64_t lsize;
2300         int copies = gio->io_prop.zp_copies;
2301         int gbh_copies = MIN(copies + 1, spa_max_replication(spa));
2302         zio_prop_t zp;
2303         int g, error;
2304
2305         int flags = METASLAB_HINTBP_FAVOR | METASLAB_GANG_HEADER;
2306         if (pio->io_flags & ZIO_FLAG_IO_ALLOCATING) {
2307                 ASSERT(pio->io_priority == ZIO_PRIORITY_ASYNC_WRITE);
2308                 ASSERT(!(pio->io_flags & ZIO_FLAG_NODATA));
2309
2310                 flags |= METASLAB_ASYNC_ALLOC;
2311                 VERIFY(refcount_held(&mc->mc_alloc_slots, pio));
2312
2313                 /*
2314                  * The logical zio has already placed a reservation for
2315                  * 'copies' allocation slots but gang blocks may require
2316                  * additional copies. These additional copies
2317                  * (i.e. gbh_copies - copies) are guaranteed to succeed
2318                  * since metaslab_class_throttle_reserve() always allows
2319                  * additional reservations for gang blocks.
2320                  */
2321                 VERIFY(metaslab_class_throttle_reserve(mc, gbh_copies - copies,
2322                     pio, flags));
2323         }
2324
2325         error = metaslab_alloc(spa, mc, SPA_GANGBLOCKSIZE,
2326             bp, gbh_copies, txg, pio == gio ? NULL : gio->io_bp, flags,
2327             &pio->io_alloc_list, pio);
2328         if (error) {
2329                 if (pio->io_flags & ZIO_FLAG_IO_ALLOCATING) {
2330                         ASSERT(pio->io_priority == ZIO_PRIORITY_ASYNC_WRITE);
2331                         ASSERT(!(pio->io_flags & ZIO_FLAG_NODATA));
2332
2333                         /*
2334                          * If we failed to allocate the gang block header then
2335                          * we remove any additional allocation reservations that
2336                          * we placed here. The original reservation will
2337                          * be removed when the logical I/O goes to the ready
2338                          * stage.
2339                          */
2340                         metaslab_class_throttle_unreserve(mc,
2341                             gbh_copies - copies, pio);
2342                 }
2343
2344                 pio->io_error = error;
2345                 return (ZIO_PIPELINE_CONTINUE);
2346         }
2347
2348         if (pio == gio) {
2349                 gnpp = &gio->io_gang_tree;
2350         } else {
2351                 gnpp = pio->io_private;
2352                 ASSERT(pio->io_ready == zio_write_gang_member_ready);
2353         }
2354
2355         gn = zio_gang_node_alloc(gnpp);
2356         gbh = gn->gn_gbh;
2357         bzero(gbh, SPA_GANGBLOCKSIZE);
2358         gbh_abd = abd_get_from_buf(gbh, SPA_GANGBLOCKSIZE);
2359
2360         /*
2361          * Create the gang header.
2362          */
2363         zio = zio_rewrite(pio, spa, txg, bp, gbh_abd, SPA_GANGBLOCKSIZE,
2364             zio_write_gang_done, NULL, pio->io_priority,
2365             ZIO_GANG_CHILD_FLAGS(pio), &pio->io_bookmark);
2366
2367         /*
2368          * Create and nowait the gang children.
2369          */
2370         for (g = 0; resid != 0; resid -= lsize, g++) {
2371                 zio_t *cio;
2372
2373                 lsize = P2ROUNDUP(resid / (SPA_GBH_NBLKPTRS - g),
2374                     SPA_MINBLOCKSIZE);
2375                 ASSERT(lsize >= SPA_MINBLOCKSIZE && lsize <= resid);
2376
2377                 zp.zp_checksum = gio->io_prop.zp_checksum;
2378                 zp.zp_compress = ZIO_COMPRESS_OFF;
2379                 zp.zp_type = DMU_OT_NONE;
2380                 zp.zp_level = 0;
2381                 zp.zp_copies = gio->io_prop.zp_copies;
2382                 zp.zp_dedup = B_FALSE;
2383                 zp.zp_dedup_verify = B_FALSE;
2384                 zp.zp_nopwrite = B_FALSE;
2385
2386                 cio = zio_write(zio, spa, txg, &gbh->zg_blkptr[g],
2387                     abd_get_offset(pio->io_abd, pio->io_size - resid), lsize,
2388                     lsize, &zp, zio_write_gang_member_ready, NULL, NULL,
2389                     zio_write_gang_done, &gn->gn_child[g], pio->io_priority,
2390                     ZIO_GANG_CHILD_FLAGS(pio), &pio->io_bookmark);
2391
2392                 if (pio->io_flags & ZIO_FLAG_IO_ALLOCATING) {
2393                         ASSERT(pio->io_priority == ZIO_PRIORITY_ASYNC_WRITE);
2394                         ASSERT(!(pio->io_flags & ZIO_FLAG_NODATA));
2395
2396                         /*
2397                          * Gang children won't throttle but we should
2398                          * account for their work, so reserve an allocation
2399                          * slot for them here.
2400                          */
2401                         VERIFY(metaslab_class_throttle_reserve(mc,
2402                             zp.zp_copies, cio, flags));
2403                 }
2404                 zio_nowait(cio);
2405         }
2406
2407         /*
2408          * Set pio's pipeline to just wait for zio to finish.
2409          */
2410         pio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
2411
2412         /*
2413          * We didn't allocate this bp, so make sure it doesn't get unmarked.
2414          */
2415         pio->io_flags &= ~ZIO_FLAG_FASTWRITE;
2416
2417         zio_nowait(zio);
2418
2419         return (ZIO_PIPELINE_CONTINUE);
2420 }
2421
2422 /*
2423  * The zio_nop_write stage in the pipeline determines if allocating a
2424  * new bp is necessary.  The nopwrite feature can handle writes in
2425  * either syncing or open context (i.e. zil writes) and as a result is
2426  * mutually exclusive with dedup.
2427  *
2428  * By leveraging a cryptographically secure checksum, such as SHA256, we
2429  * can compare the checksums of the new data and the old to determine if
2430  * allocating a new block is required.  Note that our requirements for
2431  * cryptographic strength are fairly weak: there can't be any accidental
2432  * hash collisions, but we don't need to be secure against intentional
2433  * (malicious) collisions.  To trigger a nopwrite, you have to be able
2434  * to write the file to begin with, and triggering an incorrect (hash
2435  * collision) nopwrite is no worse than simply writing to the file.
2436  * That said, there are no known attacks against the checksum algorithms
2437  * used for nopwrite, assuming that the salt and the checksums
2438  * themselves remain secret.
2439  */
2440 static int
2441 zio_nop_write(zio_t *zio)
2442 {
2443         blkptr_t *bp = zio->io_bp;
2444         blkptr_t *bp_orig = &zio->io_bp_orig;
2445         zio_prop_t *zp = &zio->io_prop;
2446
2447         ASSERT(BP_GET_LEVEL(bp) == 0);
2448         ASSERT(!(zio->io_flags & ZIO_FLAG_IO_REWRITE));
2449         ASSERT(zp->zp_nopwrite);
2450         ASSERT(!zp->zp_dedup);
2451         ASSERT(zio->io_bp_override == NULL);
2452         ASSERT(IO_IS_ALLOCATING(zio));
2453
2454         /*
2455          * Check to see if the original bp and the new bp have matching
2456          * characteristics (i.e. same checksum, compression algorithms, etc).
2457          * If they don't then just continue with the pipeline which will
2458          * allocate a new bp.
2459          */
2460         if (BP_IS_HOLE(bp_orig) ||
2461             !(zio_checksum_table[BP_GET_CHECKSUM(bp)].ci_flags &
2462             ZCHECKSUM_FLAG_NOPWRITE) ||
2463             BP_GET_CHECKSUM(bp) != BP_GET_CHECKSUM(bp_orig) ||
2464             BP_GET_COMPRESS(bp) != BP_GET_COMPRESS(bp_orig) ||
2465             BP_GET_DEDUP(bp) != BP_GET_DEDUP(bp_orig) ||
2466             zp->zp_copies != BP_GET_NDVAS(bp_orig))
2467                 return (ZIO_PIPELINE_CONTINUE);
2468
2469         /*
2470          * If the checksums match then reset the pipeline so that we
2471          * avoid allocating a new bp and issuing any I/O.
2472          */
2473         if (ZIO_CHECKSUM_EQUAL(bp->blk_cksum, bp_orig->blk_cksum)) {
2474                 ASSERT(zio_checksum_table[zp->zp_checksum].ci_flags &
2475                     ZCHECKSUM_FLAG_NOPWRITE);
2476                 ASSERT3U(BP_GET_PSIZE(bp), ==, BP_GET_PSIZE(bp_orig));
2477                 ASSERT3U(BP_GET_LSIZE(bp), ==, BP_GET_LSIZE(bp_orig));
2478                 ASSERT(zp->zp_compress != ZIO_COMPRESS_OFF);
2479                 ASSERT(bcmp(&bp->blk_prop, &bp_orig->blk_prop,
2480                     sizeof (uint64_t)) == 0);
2481
2482                 *bp = *bp_orig;
2483                 zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
2484                 zio->io_flags |= ZIO_FLAG_NOPWRITE;
2485         }
2486
2487         return (ZIO_PIPELINE_CONTINUE);
2488 }
2489
2490 /*
2491  * ==========================================================================
2492  * Dedup
2493  * ==========================================================================
2494  */
2495 static void
2496 zio_ddt_child_read_done(zio_t *zio)
2497 {
2498         blkptr_t *bp = zio->io_bp;
2499         ddt_entry_t *dde = zio->io_private;
2500         ddt_phys_t *ddp;
2501         zio_t *pio = zio_unique_parent(zio);
2502
2503         mutex_enter(&pio->io_lock);
2504         ddp = ddt_phys_select(dde, bp);
2505         if (zio->io_error == 0)
2506                 ddt_phys_clear(ddp);    /* this ddp doesn't need repair */
2507
2508         if (zio->io_error == 0 && dde->dde_repair_abd == NULL)
2509                 dde->dde_repair_abd = zio->io_abd;
2510         else
2511                 abd_free(zio->io_abd);
2512         mutex_exit(&pio->io_lock);
2513 }
2514
2515 static int
2516 zio_ddt_read_start(zio_t *zio)
2517 {
2518         blkptr_t *bp = zio->io_bp;
2519         int p;
2520
2521         ASSERT(BP_GET_DEDUP(bp));
2522         ASSERT(BP_GET_PSIZE(bp) == zio->io_size);
2523         ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
2524
2525         if (zio->io_child_error[ZIO_CHILD_DDT]) {
2526                 ddt_t *ddt = ddt_select(zio->io_spa, bp);
2527                 ddt_entry_t *dde = ddt_repair_start(ddt, bp);
2528                 ddt_phys_t *ddp = dde->dde_phys;
2529                 ddt_phys_t *ddp_self = ddt_phys_select(dde, bp);
2530                 blkptr_t blk;
2531
2532                 ASSERT(zio->io_vsd == NULL);
2533                 zio->io_vsd = dde;
2534
2535                 if (ddp_self == NULL)
2536                         return (ZIO_PIPELINE_CONTINUE);
2537
2538                 for (p = 0; p < DDT_PHYS_TYPES; p++, ddp++) {
2539                         if (ddp->ddp_phys_birth == 0 || ddp == ddp_self)
2540                                 continue;
2541                         ddt_bp_create(ddt->ddt_checksum, &dde->dde_key, ddp,
2542                             &blk);
2543                         zio_nowait(zio_read(zio, zio->io_spa, &blk,
2544                             abd_alloc_for_io(zio->io_size, B_TRUE),
2545                             zio->io_size, zio_ddt_child_read_done, dde,
2546                             zio->io_priority, ZIO_DDT_CHILD_FLAGS(zio) |
2547                             ZIO_FLAG_DONT_PROPAGATE, &zio->io_bookmark));
2548                 }
2549                 return (ZIO_PIPELINE_CONTINUE);
2550         }
2551
2552         zio_nowait(zio_read(zio, zio->io_spa, bp,
2553             zio->io_abd, zio->io_size, NULL, NULL, zio->io_priority,
2554             ZIO_DDT_CHILD_FLAGS(zio), &zio->io_bookmark));
2555
2556         return (ZIO_PIPELINE_CONTINUE);
2557 }
2558
2559 static int
2560 zio_ddt_read_done(zio_t *zio)
2561 {
2562         blkptr_t *bp = zio->io_bp;
2563
2564         if (zio_wait_for_children(zio, ZIO_CHILD_DDT, ZIO_WAIT_DONE))
2565                 return (ZIO_PIPELINE_STOP);
2566
2567         ASSERT(BP_GET_DEDUP(bp));
2568         ASSERT(BP_GET_PSIZE(bp) == zio->io_size);
2569         ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
2570
2571         if (zio->io_child_error[ZIO_CHILD_DDT]) {
2572                 ddt_t *ddt = ddt_select(zio->io_spa, bp);
2573                 ddt_entry_t *dde = zio->io_vsd;
2574                 if (ddt == NULL) {
2575                         ASSERT(spa_load_state(zio->io_spa) != SPA_LOAD_NONE);
2576                         return (ZIO_PIPELINE_CONTINUE);
2577                 }
2578                 if (dde == NULL) {
2579                         zio->io_stage = ZIO_STAGE_DDT_READ_START >> 1;
2580                         zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE, B_FALSE);
2581                         return (ZIO_PIPELINE_STOP);
2582                 }
2583                 if (dde->dde_repair_abd != NULL) {
2584                         abd_copy(zio->io_abd, dde->dde_repair_abd,
2585                             zio->io_size);
2586                         zio->io_child_error[ZIO_CHILD_DDT] = 0;
2587                 }
2588                 ddt_repair_done(ddt, dde);
2589                 zio->io_vsd = NULL;
2590         }
2591
2592         ASSERT(zio->io_vsd == NULL);
2593
2594         return (ZIO_PIPELINE_CONTINUE);
2595 }
2596
2597 static boolean_t
2598 zio_ddt_collision(zio_t *zio, ddt_t *ddt, ddt_entry_t *dde)
2599 {
2600         spa_t *spa = zio->io_spa;
2601         int p;
2602         boolean_t do_raw = !!(zio->io_flags & ZIO_FLAG_RAW);
2603
2604         ASSERT(!(zio->io_bp_override && do_raw));
2605
2606         /*
2607          * Note: we compare the original data, not the transformed data,
2608          * because when zio->io_bp is an override bp, we will not have
2609          * pushed the I/O transforms.  That's an important optimization
2610          * because otherwise we'd compress/encrypt all dmu_sync() data twice.
2611          * However, we should never get a raw, override zio so in these
2612          * cases we can compare the io_data directly. This is useful because
2613          * it allows us to do dedup verification even if we don't have access
2614          * to the original data (for instance, if the encryption keys aren't
2615          * loaded).
2616          */
2617
2618         for (p = DDT_PHYS_SINGLE; p <= DDT_PHYS_TRIPLE; p++) {
2619                 zio_t *lio = dde->dde_lead_zio[p];
2620
2621                 if (lio != NULL && do_raw) {
2622                         return (lio->io_size != zio->io_size ||
2623                             abd_cmp(zio->io_abd, lio->io_abd) != 0);
2624                 } else if (lio != NULL) {
2625                         return (lio->io_orig_size != zio->io_orig_size ||
2626                             abd_cmp(zio->io_orig_abd, lio->io_orig_abd) != 0);
2627                 }
2628         }
2629
2630         for (p = DDT_PHYS_SINGLE; p <= DDT_PHYS_TRIPLE; p++) {
2631                 ddt_phys_t *ddp = &dde->dde_phys[p];
2632
2633                 if (ddp->ddp_phys_birth != 0 && do_raw) {
2634                         blkptr_t blk = *zio->io_bp;
2635                         uint64_t psize;
2636                         abd_t *tmpabd;
2637                         int error;
2638
2639                         ddt_bp_fill(ddp, &blk, ddp->ddp_phys_birth);
2640                         psize = BP_GET_PSIZE(&blk);
2641
2642                         if (psize != zio->io_size)
2643                                 return (B_TRUE);
2644
2645                         ddt_exit(ddt);
2646
2647                         tmpabd = abd_alloc_for_io(psize, B_TRUE);
2648
2649                         error = zio_wait(zio_read(NULL, spa, &blk, tmpabd,
2650                             psize, NULL, NULL, ZIO_PRIORITY_SYNC_READ,
2651                             ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE |
2652                             ZIO_FLAG_RAW, &zio->io_bookmark));
2653
2654                         if (error == 0) {
2655                                 if (abd_cmp(tmpabd, zio->io_abd) != 0)
2656                                         error = SET_ERROR(ENOENT);
2657                         }
2658
2659                         abd_free(tmpabd);
2660                         ddt_enter(ddt);
2661                         return (error != 0);
2662                 } else if (ddp->ddp_phys_birth != 0) {
2663                         arc_buf_t *abuf = NULL;
2664                         arc_flags_t aflags = ARC_FLAG_WAIT;
2665                         blkptr_t blk = *zio->io_bp;
2666                         int error;
2667
2668                         ddt_bp_fill(ddp, &blk, ddp->ddp_phys_birth);
2669
2670                         if (BP_GET_LSIZE(&blk) != zio->io_orig_size)
2671                                 return (B_TRUE);
2672
2673                         ddt_exit(ddt);
2674
2675                         error = arc_read(NULL, spa, &blk,
2676                             arc_getbuf_func, &abuf, ZIO_PRIORITY_SYNC_READ,
2677                             ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE,
2678                             &aflags, &zio->io_bookmark);
2679
2680                         if (error == 0) {
2681                                 if (abd_cmp_buf(zio->io_orig_abd, abuf->b_data,
2682                                     zio->io_orig_size) != 0)
2683                                         error = SET_ERROR(ENOENT);
2684                                 arc_buf_destroy(abuf, &abuf);
2685                         }
2686
2687                         ddt_enter(ddt);
2688                         return (error != 0);
2689                 }
2690         }
2691
2692         return (B_FALSE);
2693 }
2694
2695 static void
2696 zio_ddt_child_write_ready(zio_t *zio)
2697 {
2698         int p = zio->io_prop.zp_copies;
2699         ddt_t *ddt = ddt_select(zio->io_spa, zio->io_bp);
2700         ddt_entry_t *dde = zio->io_private;
2701         ddt_phys_t *ddp = &dde->dde_phys[p];
2702         zio_t *pio;
2703         zio_link_t *zl;
2704
2705         if (zio->io_error)
2706                 return;
2707
2708         ddt_enter(ddt);
2709
2710         ASSERT(dde->dde_lead_zio[p] == zio);
2711
2712         ddt_phys_fill(ddp, zio->io_bp);
2713
2714         zl = NULL;
2715         while ((pio = zio_walk_parents(zio, &zl)) != NULL)
2716                 ddt_bp_fill(ddp, pio->io_bp, zio->io_txg);
2717
2718         ddt_exit(ddt);
2719 }
2720
2721 static void
2722 zio_ddt_child_write_done(zio_t *zio)
2723 {
2724         int p = zio->io_prop.zp_copies;
2725         ddt_t *ddt = ddt_select(zio->io_spa, zio->io_bp);
2726         ddt_entry_t *dde = zio->io_private;
2727         ddt_phys_t *ddp = &dde->dde_phys[p];
2728
2729         ddt_enter(ddt);
2730
2731         ASSERT(ddp->ddp_refcnt == 0);
2732         ASSERT(dde->dde_lead_zio[p] == zio);
2733         dde->dde_lead_zio[p] = NULL;
2734
2735         if (zio->io_error == 0) {
2736                 zio_link_t *zl = NULL;
2737                 while (zio_walk_parents(zio, &zl) != NULL)
2738                         ddt_phys_addref(ddp);
2739         } else {
2740                 ddt_phys_clear(ddp);
2741         }
2742
2743         ddt_exit(ddt);
2744 }
2745
2746 static void
2747 zio_ddt_ditto_write_done(zio_t *zio)
2748 {
2749         int p = DDT_PHYS_DITTO;
2750         blkptr_t *bp = zio->io_bp;
2751         ddt_t *ddt = ddt_select(zio->io_spa, bp);
2752         ddt_entry_t *dde = zio->io_private;
2753         ddt_phys_t *ddp = &dde->dde_phys[p];
2754         ddt_key_t *ddk = &dde->dde_key;
2755         ASSERTV(zio_prop_t *zp = &zio->io_prop);
2756
2757         ddt_enter(ddt);
2758
2759         ASSERT(ddp->ddp_refcnt == 0);
2760         ASSERT(dde->dde_lead_zio[p] == zio);
2761         dde->dde_lead_zio[p] = NULL;
2762
2763         if (zio->io_error == 0) {
2764                 ASSERT(ZIO_CHECKSUM_EQUAL(bp->blk_cksum, ddk->ddk_cksum));
2765                 ASSERT(zp->zp_copies < SPA_DVAS_PER_BP);
2766                 ASSERT(zp->zp_copies == BP_GET_NDVAS(bp) - BP_IS_GANG(bp));
2767                 if (ddp->ddp_phys_birth != 0)
2768                         ddt_phys_free(ddt, ddk, ddp, zio->io_txg);
2769                 ddt_phys_fill(ddp, bp);
2770         }
2771
2772         ddt_exit(ddt);
2773 }
2774
2775 static int
2776 zio_ddt_write(zio_t *zio)
2777 {
2778         spa_t *spa = zio->io_spa;
2779         blkptr_t *bp = zio->io_bp;
2780         uint64_t txg = zio->io_txg;
2781         zio_prop_t *zp = &zio->io_prop;
2782         int p = zp->zp_copies;
2783         int ditto_copies;
2784         zio_t *cio = NULL;
2785         zio_t *dio = NULL;
2786         ddt_t *ddt = ddt_select(spa, bp);
2787         ddt_entry_t *dde;
2788         ddt_phys_t *ddp;
2789
2790         ASSERT(BP_GET_DEDUP(bp));
2791         ASSERT(BP_GET_CHECKSUM(bp) == zp->zp_checksum);
2792         ASSERT(BP_IS_HOLE(bp) || zio->io_bp_override);
2793         ASSERT(!(zio->io_bp_override && (zio->io_flags & ZIO_FLAG_RAW)));
2794
2795         ddt_enter(ddt);
2796         dde = ddt_lookup(ddt, bp, B_TRUE);
2797         ddp = &dde->dde_phys[p];
2798
2799         if (zp->zp_dedup_verify && zio_ddt_collision(zio, ddt, dde)) {
2800                 /*
2801                  * If we're using a weak checksum, upgrade to a strong checksum
2802                  * and try again.  If we're already using a strong checksum,
2803                  * we can't resolve it, so just convert to an ordinary write.
2804                  * (And automatically e-mail a paper to Nature?)
2805                  */
2806                 if (!(zio_checksum_table[zp->zp_checksum].ci_flags &
2807                     ZCHECKSUM_FLAG_DEDUP)) {
2808                         zp->zp_checksum = spa_dedup_checksum(spa);
2809                         zio_pop_transforms(zio);
2810                         zio->io_stage = ZIO_STAGE_OPEN;
2811                         BP_ZERO(bp);
2812                 } else {
2813                         zp->zp_dedup = B_FALSE;
2814                 }
2815                 zio->io_pipeline = ZIO_WRITE_PIPELINE;
2816                 ddt_exit(ddt);
2817                 return (ZIO_PIPELINE_CONTINUE);
2818         }
2819
2820         ditto_copies = ddt_ditto_copies_needed(ddt, dde, ddp);
2821         ASSERT(ditto_copies < SPA_DVAS_PER_BP);
2822
2823         if (ditto_copies > ddt_ditto_copies_present(dde) &&
2824             dde->dde_lead_zio[DDT_PHYS_DITTO] == NULL) {
2825                 zio_prop_t czp = *zp;
2826
2827                 czp.zp_copies = ditto_copies;
2828
2829                 /*
2830                  * If we arrived here with an override bp, we won't have run
2831                  * the transform stack, so we won't have the data we need to
2832                  * generate a child i/o.  So, toss the override bp and restart.
2833                  * This is safe, because using the override bp is just an
2834                  * optimization; and it's rare, so the cost doesn't matter.
2835                  */
2836                 if (zio->io_bp_override) {
2837                         zio_pop_transforms(zio);
2838                         zio->io_stage = ZIO_STAGE_OPEN;
2839                         zio->io_pipeline = ZIO_WRITE_PIPELINE;
2840                         zio->io_bp_override = NULL;
2841                         BP_ZERO(bp);
2842                         ddt_exit(ddt);
2843                         return (ZIO_PIPELINE_CONTINUE);
2844                 }
2845
2846                 dio = zio_write(zio, spa, txg, bp, zio->io_orig_abd,
2847                     zio->io_orig_size, zio->io_orig_size, &czp, NULL, NULL,
2848                     NULL, zio_ddt_ditto_write_done, dde, zio->io_priority,
2849                     ZIO_DDT_CHILD_FLAGS(zio), &zio->io_bookmark);
2850
2851                 zio_push_transform(dio, zio->io_abd, zio->io_size, 0, NULL);
2852                 dde->dde_lead_zio[DDT_PHYS_DITTO] = dio;
2853         }
2854
2855         if (ddp->ddp_phys_birth != 0 || dde->dde_lead_zio[p] != NULL) {
2856                 if (ddp->ddp_phys_birth != 0)
2857                         ddt_bp_fill(ddp, bp, txg);
2858                 if (dde->dde_lead_zio[p] != NULL)
2859                         zio_add_child(zio, dde->dde_lead_zio[p]);
2860                 else
2861                         ddt_phys_addref(ddp);
2862         } else if (zio->io_bp_override) {
2863                 ASSERT(bp->blk_birth == txg);
2864                 ASSERT(BP_EQUAL(bp, zio->io_bp_override));
2865                 ddt_phys_fill(ddp, bp);
2866                 ddt_phys_addref(ddp);
2867         } else {
2868                 cio = zio_write(zio, spa, txg, bp, zio->io_orig_abd,
2869                     zio->io_orig_size, zio->io_orig_size, zp,
2870                     zio_ddt_child_write_ready, NULL, NULL,
2871                     zio_ddt_child_write_done, dde, zio->io_priority,
2872                     ZIO_DDT_CHILD_FLAGS(zio), &zio->io_bookmark);
2873
2874                 zio_push_transform(cio, zio->io_abd, zio->io_size, 0, NULL);
2875                 dde->dde_lead_zio[p] = cio;
2876         }
2877
2878         ddt_exit(ddt);
2879
2880         if (cio)
2881                 zio_nowait(cio);
2882         if (dio)
2883                 zio_nowait(dio);
2884
2885         return (ZIO_PIPELINE_CONTINUE);
2886 }
2887
2888 ddt_entry_t *freedde; /* for debugging */
2889
2890 static int
2891 zio_ddt_free(zio_t *zio)
2892 {
2893         spa_t *spa = zio->io_spa;
2894         blkptr_t *bp = zio->io_bp;
2895         ddt_t *ddt = ddt_select(spa, bp);
2896         ddt_entry_t *dde;
2897         ddt_phys_t *ddp;
2898
2899         ASSERT(BP_GET_DEDUP(bp));
2900         ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
2901
2902         ddt_enter(ddt);
2903         freedde = dde = ddt_lookup(ddt, bp, B_TRUE);
2904         if (dde) {
2905                 ddp = ddt_phys_select(dde, bp);
2906                 if (ddp)
2907                         ddt_phys_decref(ddp);
2908         }
2909         ddt_exit(ddt);
2910
2911         return (ZIO_PIPELINE_CONTINUE);
2912 }
2913
2914 /*
2915  * ==========================================================================
2916  * Allocate and free blocks
2917  * ==========================================================================
2918  */
2919
2920 static zio_t *
2921 zio_io_to_allocate(spa_t *spa)
2922 {
2923         zio_t *zio;
2924
2925         ASSERT(MUTEX_HELD(&spa->spa_alloc_lock));
2926
2927         zio = avl_first(&spa->spa_alloc_tree);
2928         if (zio == NULL)
2929                 return (NULL);
2930
2931         ASSERT(IO_IS_ALLOCATING(zio));
2932
2933         /*
2934          * Try to place a reservation for this zio. If we're unable to
2935          * reserve then we throttle.
2936          */
2937         if (!metaslab_class_throttle_reserve(spa_normal_class(spa),
2938             zio->io_prop.zp_copies, zio, 0)) {
2939                 return (NULL);
2940         }
2941
2942         avl_remove(&spa->spa_alloc_tree, zio);
2943         ASSERT3U(zio->io_stage, <, ZIO_STAGE_DVA_ALLOCATE);
2944
2945         return (zio);
2946 }
2947
2948 static int
2949 zio_dva_throttle(zio_t *zio)
2950 {
2951         spa_t *spa = zio->io_spa;
2952         zio_t *nio;
2953
2954         if (zio->io_priority == ZIO_PRIORITY_SYNC_WRITE ||
2955             !spa_normal_class(zio->io_spa)->mc_alloc_throttle_enabled ||
2956             zio->io_child_type == ZIO_CHILD_GANG ||
2957             zio->io_flags & ZIO_FLAG_NODATA) {
2958                 return (ZIO_PIPELINE_CONTINUE);
2959         }
2960
2961         ASSERT(zio->io_child_type > ZIO_CHILD_GANG);
2962
2963         ASSERT3U(zio->io_queued_timestamp, >, 0);
2964         ASSERT(zio->io_stage == ZIO_STAGE_DVA_THROTTLE);
2965
2966         mutex_enter(&spa->spa_alloc_lock);
2967
2968         ASSERT(zio->io_type == ZIO_TYPE_WRITE);
2969         avl_add(&spa->spa_alloc_tree, zio);
2970
2971         nio = zio_io_to_allocate(zio->io_spa);
2972         mutex_exit(&spa->spa_alloc_lock);
2973
2974         if (nio == zio)
2975                 return (ZIO_PIPELINE_CONTINUE);
2976
2977         if (nio != NULL) {
2978                 ASSERT(nio->io_stage == ZIO_STAGE_DVA_THROTTLE);
2979                 /*
2980                  * We are passing control to a new zio so make sure that
2981                  * it is processed by a different thread. We do this to
2982                  * avoid stack overflows that can occur when parents are
2983                  * throttled and children are making progress. We allow
2984                  * it to go to the head of the taskq since it's already
2985                  * been waiting.
2986                  */
2987                 zio_taskq_dispatch(nio, ZIO_TASKQ_ISSUE, B_TRUE);
2988         }
2989         return (ZIO_PIPELINE_STOP);
2990 }
2991
2992 void
2993 zio_allocate_dispatch(spa_t *spa)
2994 {
2995         zio_t *zio;
2996
2997         mutex_enter(&spa->spa_alloc_lock);
2998         zio = zio_io_to_allocate(spa);
2999         mutex_exit(&spa->spa_alloc_lock);
3000         if (zio == NULL)
3001                 return;
3002
3003         ASSERT3U(zio->io_stage, ==, ZIO_STAGE_DVA_THROTTLE);
3004         ASSERT0(zio->io_error);
3005         zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE, B_TRUE);
3006 }
3007
3008 static int
3009 zio_dva_allocate(zio_t *zio)
3010 {
3011         spa_t *spa = zio->io_spa;
3012         metaslab_class_t *mc = spa_normal_class(spa);
3013         blkptr_t *bp = zio->io_bp;
3014         int error;
3015         int flags = 0;
3016
3017         if (zio->io_gang_leader == NULL) {
3018                 ASSERT(zio->io_child_type > ZIO_CHILD_GANG);
3019                 zio->io_gang_leader = zio;
3020         }
3021
3022         ASSERT(BP_IS_HOLE(bp));
3023         ASSERT0(BP_GET_NDVAS(bp));
3024         ASSERT3U(zio->io_prop.zp_copies, >, 0);
3025         ASSERT3U(zio->io_prop.zp_copies, <=, spa_max_replication(spa));
3026         ASSERT3U(zio->io_size, ==, BP_GET_PSIZE(bp));
3027
3028         flags |= (zio->io_flags & ZIO_FLAG_FASTWRITE) ? METASLAB_FASTWRITE : 0;
3029         if (zio->io_flags & ZIO_FLAG_NODATA)
3030                 flags |= METASLAB_DONT_THROTTLE;
3031         if (zio->io_flags & ZIO_FLAG_GANG_CHILD)
3032                 flags |= METASLAB_GANG_CHILD;
3033         if (zio->io_priority == ZIO_PRIORITY_ASYNC_WRITE)
3034                 flags |= METASLAB_ASYNC_ALLOC;
3035
3036         error = metaslab_alloc(spa, mc, zio->io_size, bp,
3037             zio->io_prop.zp_copies, zio->io_txg, NULL, flags,
3038             &zio->io_alloc_list, zio);
3039
3040         if (error != 0) {
3041                 spa_dbgmsg(spa, "%s: metaslab allocation failure: zio %p, "
3042                     "size %llu, error %d", spa_name(spa), zio, zio->io_size,
3043                     error);
3044                 if (error == ENOSPC && zio->io_size > SPA_MINBLOCKSIZE)
3045                         return (zio_write_gang_block(zio));
3046                 zio->io_error = error;
3047         }
3048
3049         return (ZIO_PIPELINE_CONTINUE);
3050 }
3051
3052 static int
3053 zio_dva_free(zio_t *zio)
3054 {
3055         metaslab_free(zio->io_spa, zio->io_bp, zio->io_txg, B_FALSE);
3056
3057         return (ZIO_PIPELINE_CONTINUE);
3058 }
3059
3060 static int
3061 zio_dva_claim(zio_t *zio)
3062 {
3063         int error;
3064
3065         error = metaslab_claim(zio->io_spa, zio->io_bp, zio->io_txg);
3066         if (error)
3067                 zio->io_error = error;
3068
3069         return (ZIO_PIPELINE_CONTINUE);
3070 }
3071
3072 /*
3073  * Undo an allocation.  This is used by zio_done() when an I/O fails
3074  * and we want to give back the block we just allocated.
3075  * This handles both normal blocks and gang blocks.
3076  */
3077 static void
3078 zio_dva_unallocate(zio_t *zio, zio_gang_node_t *gn, blkptr_t *bp)
3079 {
3080         int g;
3081
3082         ASSERT(bp->blk_birth == zio->io_txg || BP_IS_HOLE(bp));
3083         ASSERT(zio->io_bp_override == NULL);
3084
3085         if (!BP_IS_HOLE(bp))
3086                 metaslab_free(zio->io_spa, bp, bp->blk_birth, B_TRUE);
3087
3088         if (gn != NULL) {
3089                 for (g = 0; g < SPA_GBH_NBLKPTRS; g++) {
3090                         zio_dva_unallocate(zio, gn->gn_child[g],
3091                             &gn->gn_gbh->zg_blkptr[g]);
3092                 }
3093         }
3094 }
3095
3096 /*
3097  * Try to allocate an intent log block.  Return 0 on success, errno on failure.
3098  */
3099 int
3100 zio_alloc_zil(spa_t *spa, uint64_t txg, blkptr_t *new_bp, uint64_t size,
3101     boolean_t use_slog)
3102 {
3103         int error = 1;
3104         zio_alloc_list_t io_alloc_list;
3105
3106         ASSERT(txg > spa_syncing_txg(spa));
3107
3108         metaslab_trace_init(&io_alloc_list);
3109
3110         if (use_slog) {
3111                 error = metaslab_alloc(spa, spa_log_class(spa), size,
3112                     new_bp, 1, txg, NULL, METASLAB_FASTWRITE,
3113                     &io_alloc_list, NULL);
3114         }
3115
3116         if (error) {
3117                 error = metaslab_alloc(spa, spa_normal_class(spa), size,
3118                     new_bp, 1, txg, NULL, METASLAB_FASTWRITE,
3119                     &io_alloc_list, NULL);
3120         }
3121         metaslab_trace_fini(&io_alloc_list);
3122
3123         if (error == 0) {
3124                 BP_SET_LSIZE(new_bp, size);
3125                 BP_SET_PSIZE(new_bp, size);
3126                 BP_SET_COMPRESS(new_bp, ZIO_COMPRESS_OFF);
3127                 BP_SET_CHECKSUM(new_bp,
3128                     spa_version(spa) >= SPA_VERSION_SLIM_ZIL
3129                     ? ZIO_CHECKSUM_ZILOG2 : ZIO_CHECKSUM_ZILOG);
3130                 BP_SET_TYPE(new_bp, DMU_OT_INTENT_LOG);
3131                 BP_SET_LEVEL(new_bp, 0);
3132                 BP_SET_DEDUP(new_bp, 0);
3133                 BP_SET_BYTEORDER(new_bp, ZFS_HOST_BYTEORDER);
3134         }
3135
3136         return (error);
3137 }
3138
3139 /*
3140  * Free an intent log block.
3141  */
3142 void
3143 zio_free_zil(spa_t *spa, uint64_t txg, blkptr_t *bp)
3144 {
3145         ASSERT(BP_GET_TYPE(bp) == DMU_OT_INTENT_LOG);
3146         ASSERT(!BP_IS_GANG(bp));
3147
3148         zio_free(spa, txg, bp);
3149 }
3150
3151 /*
3152  * ==========================================================================
3153  * Read and write to physical devices
3154  * ==========================================================================
3155  */
3156
3157
3158 /*
3159  * Issue an I/O to the underlying vdev. Typically the issue pipeline
3160  * stops after this stage and will resume upon I/O completion.
3161  * However, there are instances where the vdev layer may need to
3162  * continue the pipeline when an I/O was not issued. Since the I/O
3163  * that was sent to the vdev layer might be different than the one
3164  * currently active in the pipeline (see vdev_queue_io()), we explicitly
3165  * force the underlying vdev layers to call either zio_execute() or
3166  * zio_interrupt() to ensure that the pipeline continues with the correct I/O.
3167  */
3168 static int
3169 zio_vdev_io_start(zio_t *zio)
3170 {
3171         vdev_t *vd = zio->io_vd;
3172         uint64_t align;
3173         spa_t *spa = zio->io_spa;
3174
3175         zio->io_delay = 0;
3176
3177         ASSERT(zio->io_error == 0);
3178         ASSERT(zio->io_child_error[ZIO_CHILD_VDEV] == 0);
3179
3180         if (vd == NULL) {
3181                 if (!(zio->io_flags & ZIO_FLAG_CONFIG_WRITER))
3182                         spa_config_enter(spa, SCL_ZIO, zio, RW_READER);
3183
3184                 /*
3185                  * The mirror_ops handle multiple DVAs in a single BP.
3186                  */
3187                 vdev_mirror_ops.vdev_op_io_start(zio);
3188                 return (ZIO_PIPELINE_STOP);
3189         }
3190
3191         ASSERT3P(zio->io_logical, !=, zio);
3192
3193         /*
3194          * We keep track of time-sensitive I/Os so that the scan thread
3195          * can quickly react to certain workloads.  In particular, we care
3196          * about non-scrubbing, top-level reads and writes with the following
3197          * characteristics:
3198          *      - synchronous writes of user data to non-slog devices
3199          *      - any reads of user data
3200          * When these conditions are met, adjust the timestamp of spa_last_io
3201          * which allows the scan thread to adjust its workload accordingly.
3202          */
3203         if (!(zio->io_flags & ZIO_FLAG_SCAN_THREAD) && zio->io_bp != NULL &&
3204             vd == vd->vdev_top && !vd->vdev_islog &&
3205             zio->io_bookmark.zb_objset != DMU_META_OBJSET &&
3206             zio->io_txg != spa_syncing_txg(spa)) {
3207                 uint64_t old = spa->spa_last_io;
3208                 uint64_t new = ddi_get_lbolt64();
3209                 if (old != new)
3210                         (void) atomic_cas_64(&spa->spa_last_io, old, new);
3211         }
3212
3213         align = 1ULL << vd->vdev_top->vdev_ashift;
3214
3215         if (!(zio->io_flags & ZIO_FLAG_PHYSICAL) &&
3216             P2PHASE(zio->io_size, align) != 0) {
3217                 /* Transform logical writes to be a full physical block size. */
3218                 uint64_t asize = P2ROUNDUP(zio->io_size, align);
3219                 abd_t *abuf = abd_alloc_sametype(zio->io_abd, asize);
3220                 ASSERT(vd == vd->vdev_top);
3221                 if (zio->io_type == ZIO_TYPE_WRITE) {
3222                         abd_copy(abuf, zio->io_abd, zio->io_size);
3223                         abd_zero_off(abuf, zio->io_size, asize - zio->io_size);
3224                 }
3225                 zio_push_transform(zio, abuf, asize, asize, zio_subblock);
3226         }
3227
3228         /*
3229          * If this is not a physical io, make sure that it is properly aligned
3230          * before proceeding.
3231          */
3232         if (!(zio->io_flags & ZIO_FLAG_PHYSICAL)) {
3233                 ASSERT0(P2PHASE(zio->io_offset, align));
3234                 ASSERT0(P2PHASE(zio->io_size, align));
3235         } else {
3236                 /*
3237                  * For physical writes, we allow 512b aligned writes and assume
3238                  * the device will perform a read-modify-write as necessary.
3239                  */
3240                 ASSERT0(P2PHASE(zio->io_offset, SPA_MINBLOCKSIZE));
3241                 ASSERT0(P2PHASE(zio->io_size, SPA_MINBLOCKSIZE));
3242         }
3243
3244         VERIFY(zio->io_type != ZIO_TYPE_WRITE || spa_writeable(spa));
3245
3246         /*
3247          * If this is a repair I/O, and there's no self-healing involved --
3248          * that is, we're just resilvering what we expect to resilver --
3249          * then don't do the I/O unless zio's txg is actually in vd's DTL.
3250          * This prevents spurious resilvering with nested replication.
3251          * For example, given a mirror of mirrors, (A+B)+(C+D), if only
3252          * A is out of date, we'll read from C+D, then use the data to
3253          * resilver A+B -- but we don't actually want to resilver B, just A.
3254          * The top-level mirror has no way to know this, so instead we just
3255          * discard unnecessary repairs as we work our way down the vdev tree.
3256          * The same logic applies to any form of nested replication:
3257          * ditto + mirror, RAID-Z + replacing, etc.  This covers them all.
3258          */
3259         if ((zio->io_flags & ZIO_FLAG_IO_REPAIR) &&
3260             !(zio->io_flags & ZIO_FLAG_SELF_HEAL) &&
3261             zio->io_txg != 0 && /* not a delegated i/o */
3262             !vdev_dtl_contains(vd, DTL_PARTIAL, zio->io_txg, 1)) {
3263                 ASSERT(zio->io_type == ZIO_TYPE_WRITE);
3264                 zio_vdev_io_bypass(zio);
3265                 return (ZIO_PIPELINE_CONTINUE);
3266         }
3267
3268         if (vd->vdev_ops->vdev_op_leaf &&
3269             (zio->io_type == ZIO_TYPE_READ || zio->io_type == ZIO_TYPE_WRITE)) {
3270
3271                 if (zio->io_type == ZIO_TYPE_READ && vdev_cache_read(zio))
3272                         return (ZIO_PIPELINE_CONTINUE);
3273
3274                 if ((zio = vdev_queue_io(zio)) == NULL)
3275                         return (ZIO_PIPELINE_STOP);
3276
3277                 if (!vdev_accessible(vd, zio)) {
3278                         zio->io_error = SET_ERROR(ENXIO);
3279                         zio_interrupt(zio);
3280                         return (ZIO_PIPELINE_STOP);
3281                 }
3282         }
3283
3284         zio->io_delay = gethrtime();
3285         vd->vdev_ops->vdev_op_io_start(zio);
3286         return (ZIO_PIPELINE_STOP);
3287 }
3288
3289 static int
3290 zio_vdev_io_done(zio_t *zio)
3291 {
3292         vdev_t *vd = zio->io_vd;
3293         vdev_ops_t *ops = vd ? vd->vdev_ops : &vdev_mirror_ops;
3294         boolean_t unexpected_error = B_FALSE;
3295
3296         if (zio_wait_for_children(zio, ZIO_CHILD_VDEV, ZIO_WAIT_DONE))
3297                 return (ZIO_PIPELINE_STOP);
3298
3299         ASSERT(zio->io_type == ZIO_TYPE_READ || zio->io_type == ZIO_TYPE_WRITE);
3300
3301         if (zio->io_delay)
3302                 zio->io_delay = gethrtime() - zio->io_delay;
3303
3304         if (vd != NULL && vd->vdev_ops->vdev_op_leaf) {
3305
3306                 vdev_queue_io_done(zio);
3307
3308                 if (zio->io_type == ZIO_TYPE_WRITE)
3309                         vdev_cache_write(zio);
3310
3311                 if (zio_injection_enabled && zio->io_error == 0)
3312                         zio->io_error = zio_handle_device_injection(vd,
3313                             zio, EIO);
3314
3315                 if (zio_injection_enabled && zio->io_error == 0)
3316                         zio->io_error = zio_handle_label_injection(zio, EIO);
3317
3318                 if (zio->io_error) {
3319                         if (!vdev_accessible(vd, zio)) {
3320                                 zio->io_error = SET_ERROR(ENXIO);
3321                         } else {
3322                                 unexpected_error = B_TRUE;
3323                         }
3324                 }
3325         }
3326
3327         ops->vdev_op_io_done(zio);
3328
3329         if (unexpected_error)
3330                 VERIFY(vdev_probe(vd, zio) == NULL);
3331
3332         return (ZIO_PIPELINE_CONTINUE);
3333 }
3334
3335 /*
3336  * For non-raidz ZIOs, we can just copy aside the bad data read from the
3337  * disk, and use that to finish the checksum ereport later.
3338  */
3339 static void
3340 zio_vsd_default_cksum_finish(zio_cksum_report_t *zcr,
3341     const abd_t *good_buf)
3342 {
3343         /* no processing needed */
3344         zfs_ereport_finish_checksum(zcr, good_buf, zcr->zcr_cbdata, B_FALSE);
3345 }
3346
3347 /*ARGSUSED*/
3348 void
3349 zio_vsd_default_cksum_report(zio_t *zio, zio_cksum_report_t *zcr, void *ignored)
3350 {
3351         void *abd = abd_alloc_sametype(zio->io_abd, zio->io_size);
3352
3353         abd_copy(abd, zio->io_abd, zio->io_size);
3354
3355         zcr->zcr_cbinfo = zio->io_size;
3356         zcr->zcr_cbdata = abd;
3357         zcr->zcr_finish = zio_vsd_default_cksum_finish;
3358         zcr->zcr_free = zio_abd_free;
3359 }
3360
3361 static int
3362 zio_vdev_io_assess(zio_t *zio)
3363 {
3364         vdev_t *vd = zio->io_vd;
3365
3366         if (zio_wait_for_children(zio, ZIO_CHILD_VDEV, ZIO_WAIT_DONE))
3367                 return (ZIO_PIPELINE_STOP);
3368
3369         if (vd == NULL && !(zio->io_flags & ZIO_FLAG_CONFIG_WRITER))
3370                 spa_config_exit(zio->io_spa, SCL_ZIO, zio);
3371
3372         if (zio->io_vsd != NULL) {
3373                 zio->io_vsd_ops->vsd_free(zio);
3374                 zio->io_vsd = NULL;
3375         }
3376
3377         if (zio_injection_enabled && zio->io_error == 0)
3378                 zio->io_error = zio_handle_fault_injection(zio, EIO);
3379
3380         /*
3381          * If the I/O failed, determine whether we should attempt to retry it.
3382          *
3383          * On retry, we cut in line in the issue queue, since we don't want
3384          * compression/checksumming/etc. work to prevent our (cheap) IO reissue.
3385          */
3386         if (zio->io_error && vd == NULL &&
3387             !(zio->io_flags & (ZIO_FLAG_DONT_RETRY | ZIO_FLAG_IO_RETRY))) {
3388                 ASSERT(!(zio->io_flags & ZIO_FLAG_DONT_QUEUE)); /* not a leaf */
3389                 ASSERT(!(zio->io_flags & ZIO_FLAG_IO_BYPASS));  /* not a leaf */
3390                 zio->io_error = 0;
3391                 zio->io_flags |= ZIO_FLAG_IO_RETRY |
3392                     ZIO_FLAG_DONT_CACHE | ZIO_FLAG_DONT_AGGREGATE;
3393                 zio->io_stage = ZIO_STAGE_VDEV_IO_START >> 1;
3394                 zio_taskq_dispatch(zio, ZIO_TASKQ_ISSUE,
3395                     zio_requeue_io_start_cut_in_line);
3396                 return (ZIO_PIPELINE_STOP);
3397         }
3398
3399         /*
3400          * If we got an error on a leaf device, convert it to ENXIO
3401          * if the device is not accessible at all.
3402          */
3403         if (zio->io_error && vd != NULL && vd->vdev_ops->vdev_op_leaf &&
3404             !vdev_accessible(vd, zio))
3405                 zio->io_error = SET_ERROR(ENXIO);
3406
3407         /*
3408          * If we can't write to an interior vdev (mirror or RAID-Z),
3409          * set vdev_cant_write so that we stop trying to allocate from it.
3410          */
3411         if (zio->io_error == ENXIO && zio->io_type == ZIO_TYPE_WRITE &&
3412             vd != NULL && !vd->vdev_ops->vdev_op_leaf) {
3413                 vd->vdev_cant_write = B_TRUE;
3414         }
3415
3416         /*
3417          * If a cache flush returns ENOTSUP or ENOTTY, we know that no future
3418          * attempts will ever succeed. In this case we set a persistent bit so
3419          * that we don't bother with it in the future.
3420          */
3421         if ((zio->io_error == ENOTSUP || zio->io_error == ENOTTY) &&
3422             zio->io_type == ZIO_TYPE_IOCTL &&
3423             zio->io_cmd == DKIOCFLUSHWRITECACHE && vd != NULL)
3424                 vd->vdev_nowritecache = B_TRUE;
3425
3426         if (zio->io_error)
3427                 zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
3428
3429         if (vd != NULL && vd->vdev_ops->vdev_op_leaf &&
3430             zio->io_physdone != NULL) {
3431                 ASSERT(!(zio->io_flags & ZIO_FLAG_DELEGATED));
3432                 ASSERT(zio->io_child_type == ZIO_CHILD_VDEV);
3433                 zio->io_physdone(zio->io_logical);
3434         }
3435
3436         return (ZIO_PIPELINE_CONTINUE);
3437 }
3438
3439 void
3440 zio_vdev_io_reissue(zio_t *zio)
3441 {
3442         ASSERT(zio->io_stage == ZIO_STAGE_VDEV_IO_START);
3443         ASSERT(zio->io_error == 0);
3444
3445         zio->io_stage >>= 1;
3446 }
3447
3448 void
3449 zio_vdev_io_redone(zio_t *zio)
3450 {
3451         ASSERT(zio->io_stage == ZIO_STAGE_VDEV_IO_DONE);
3452
3453         zio->io_stage >>= 1;
3454 }
3455
3456 void
3457 zio_vdev_io_bypass(zio_t *zio)
3458 {
3459         ASSERT(zio->io_stage == ZIO_STAGE_VDEV_IO_START);
3460         ASSERT(zio->io_error == 0);
3461
3462         zio->io_flags |= ZIO_FLAG_IO_BYPASS;
3463         zio->io_stage = ZIO_STAGE_VDEV_IO_ASSESS >> 1;
3464 }
3465
3466 /*
3467  * ==========================================================================
3468  * Generate and verify checksums
3469  * ==========================================================================
3470  */
3471 static int
3472 zio_checksum_generate(zio_t *zio)
3473 {
3474         blkptr_t *bp = zio->io_bp;
3475         enum zio_checksum checksum;
3476
3477         if (bp == NULL) {
3478                 /*
3479                  * This is zio_write_phys().
3480                  * We're either generating a label checksum, or none at all.
3481                  */
3482                 checksum = zio->io_prop.zp_checksum;
3483
3484                 if (checksum == ZIO_CHECKSUM_OFF)
3485                         return (ZIO_PIPELINE_CONTINUE);
3486
3487                 ASSERT(checksum == ZIO_CHECKSUM_LABEL);
3488         } else {
3489                 if (BP_IS_GANG(bp) && zio->io_child_type == ZIO_CHILD_GANG) {
3490                         ASSERT(!IO_IS_ALLOCATING(zio));
3491                         checksum = ZIO_CHECKSUM_GANG_HEADER;
3492                 } else {
3493                         checksum = BP_GET_CHECKSUM(bp);
3494                 }
3495         }
3496
3497         zio_checksum_compute(zio, checksum, zio->io_abd, zio->io_size);
3498
3499         return (ZIO_PIPELINE_CONTINUE);
3500 }
3501
3502 static int
3503 zio_checksum_verify(zio_t *zio)
3504 {
3505         zio_bad_cksum_t info;
3506         blkptr_t *bp = zio->io_bp;
3507         int error;
3508
3509         ASSERT(zio->io_vd != NULL);
3510
3511         if (bp == NULL) {
3512                 /*
3513                  * This is zio_read_phys().
3514                  * We're either verifying a label checksum, or nothing at all.
3515                  */
3516                 if (zio->io_prop.zp_checksum == ZIO_CHECKSUM_OFF)
3517                         return (ZIO_PIPELINE_CONTINUE);
3518
3519                 ASSERT(zio->io_prop.zp_checksum == ZIO_CHECKSUM_LABEL);
3520         }
3521
3522         if ((error = zio_checksum_error(zio, &info)) != 0) {
3523                 zio->io_error = error;
3524                 if (error == ECKSUM &&
3525                     !(zio->io_flags & ZIO_FLAG_SPECULATIVE)) {
3526                         zfs_ereport_start_checksum(zio->io_spa,
3527                             zio->io_vd, zio, zio->io_offset,
3528                             zio->io_size, NULL, &info);
3529                 }
3530         }
3531
3532         return (ZIO_PIPELINE_CONTINUE);
3533 }
3534
3535 /*
3536  * Called by RAID-Z to ensure we don't compute the checksum twice.
3537  */
3538 void
3539 zio_checksum_verified(zio_t *zio)
3540 {
3541         zio->io_pipeline &= ~ZIO_STAGE_CHECKSUM_VERIFY;
3542 }
3543
3544 /*
3545  * ==========================================================================
3546  * Error rank.  Error are ranked in the order 0, ENXIO, ECKSUM, EIO, other.
3547  * An error of 0 indicates success.  ENXIO indicates whole-device failure,
3548  * which may be transient (e.g. unplugged) or permament.  ECKSUM and EIO
3549  * indicate errors that are specific to one I/O, and most likely permanent.
3550  * Any other error is presumed to be worse because we weren't expecting it.
3551  * ==========================================================================
3552  */
3553 int
3554 zio_worst_error(int e1, int e2)
3555 {
3556         static int zio_error_rank[] = { 0, ENXIO, ECKSUM, EIO };
3557         int r1, r2;
3558
3559         for (r1 = 0; r1 < sizeof (zio_error_rank) / sizeof (int); r1++)
3560                 if (e1 == zio_error_rank[r1])
3561                         break;
3562
3563         for (r2 = 0; r2 < sizeof (zio_error_rank) / sizeof (int); r2++)
3564                 if (e2 == zio_error_rank[r2])
3565                         break;
3566
3567         return (r1 > r2 ? e1 : e2);
3568 }
3569
3570 /*
3571  * ==========================================================================
3572  * I/O completion
3573  * ==========================================================================
3574  */
3575 static int
3576 zio_ready(zio_t *zio)
3577 {
3578         blkptr_t *bp = zio->io_bp;
3579         zio_t *pio, *pio_next;
3580         zio_link_t *zl = NULL;
3581
3582         if (zio_wait_for_children(zio, ZIO_CHILD_GANG, ZIO_WAIT_READY) ||
3583             zio_wait_for_children(zio, ZIO_CHILD_DDT, ZIO_WAIT_READY))
3584                 return (ZIO_PIPELINE_STOP);
3585
3586         if (zio->io_ready) {
3587                 ASSERT(IO_IS_ALLOCATING(zio));
3588                 ASSERT(bp->blk_birth == zio->io_txg || BP_IS_HOLE(bp) ||
3589                     (zio->io_flags & ZIO_FLAG_NOPWRITE));
3590                 ASSERT(zio->io_children[ZIO_CHILD_GANG][ZIO_WAIT_READY] == 0);
3591
3592                 zio->io_ready(zio);
3593         }
3594
3595         if (bp != NULL && bp != &zio->io_bp_copy)
3596                 zio->io_bp_copy = *bp;
3597
3598         if (zio->io_error != 0) {
3599                 zio->io_pipeline = ZIO_INTERLOCK_PIPELINE;
3600
3601                 if (zio->io_flags & ZIO_FLAG_IO_ALLOCATING) {
3602                         ASSERT(IO_IS_ALLOCATING(zio));
3603                         ASSERT(zio->io_priority == ZIO_PRIORITY_ASYNC_WRITE);
3604                         /*
3605                          * We were unable to allocate anything, unreserve and
3606                          * issue the next I/O to allocate.
3607                          */
3608                         metaslab_class_throttle_unreserve(
3609                             spa_normal_class(zio->io_spa),
3610                             zio->io_prop.zp_copies, zio);
3611                         zio_allocate_dispatch(zio->io_spa);
3612                 }
3613         }
3614
3615         mutex_enter(&zio->io_lock);
3616         zio->io_state[ZIO_WAIT_READY] = 1;
3617         pio = zio_walk_parents(zio, &zl);
3618         mutex_exit(&zio->io_lock);
3619
3620         /*
3621          * As we notify zio's parents, new parents could be added.
3622          * New parents go to the head of zio's io_parent_list, however,
3623          * so we will (correctly) not notify them.  The remainder of zio's
3624          * io_parent_list, from 'pio_next' onward, cannot change because
3625          * all parents must wait for us to be done before they can be done.
3626          */
3627         for (; pio != NULL; pio = pio_next) {
3628                 pio_next = zio_walk_parents(zio, &zl);
3629                 zio_notify_parent(pio, zio, ZIO_WAIT_READY);
3630         }
3631
3632         if (zio->io_flags & ZIO_FLAG_NODATA) {
3633                 if (BP_IS_GANG(bp)) {
3634                         zio->io_flags &= ~ZIO_FLAG_NODATA;
3635                 } else {
3636                         ASSERT((uintptr_t)zio->io_abd < SPA_MAXBLOCKSIZE);
3637                         zio->io_pipeline &= ~ZIO_VDEV_IO_STAGES;
3638                 }
3639         }
3640
3641         if (zio_injection_enabled &&
3642             zio->io_spa->spa_syncing_txg == zio->io_txg)
3643                 zio_handle_ignored_writes(zio);
3644
3645         return (ZIO_PIPELINE_CONTINUE);
3646 }
3647
3648 /*
3649  * Update the allocation throttle accounting.
3650  */
3651 static void
3652 zio_dva_throttle_done(zio_t *zio)
3653 {
3654         zio_t *pio = zio_unique_parent(zio);
3655         vdev_t *vd = zio->io_vd;
3656         int flags = METASLAB_ASYNC_ALLOC;
3657         ASSERTV(zio_t *lio = zio->io_logical);
3658
3659         ASSERT3P(zio->io_bp, !=, NULL);
3660         ASSERT3U(zio->io_type, ==, ZIO_TYPE_WRITE);
3661         ASSERT3U(zio->io_priority, ==, ZIO_PRIORITY_ASYNC_WRITE);
3662         ASSERT3U(zio->io_child_type, ==, ZIO_CHILD_VDEV);
3663         ASSERT(vd != NULL);
3664         ASSERT3P(vd, ==, vd->vdev_top);
3665         ASSERT(!(zio->io_flags & (ZIO_FLAG_IO_REPAIR | ZIO_FLAG_IO_RETRY)));
3666         ASSERT(zio->io_flags & ZIO_FLAG_IO_ALLOCATING);
3667         ASSERT(!(lio->io_flags & ZIO_FLAG_IO_REWRITE));
3668         ASSERT(!(lio->io_orig_flags & ZIO_FLAG_NODATA));
3669
3670         /*
3671          * Parents of gang children can have two flavors -- ones that
3672          * allocated the gang header (will have ZIO_FLAG_IO_REWRITE set)
3673          * and ones that allocated the constituent blocks. The allocation
3674          * throttle needs to know the allocating parent zio so we must find
3675          * it here.
3676          */
3677         if (pio->io_child_type == ZIO_CHILD_GANG) {
3678                 /*
3679                  * If our parent is a rewrite gang child then our grandparent
3680                  * would have been the one that performed the allocation.
3681                  */
3682                 if (pio->io_flags & ZIO_FLAG_IO_REWRITE)
3683                         pio = zio_unique_parent(pio);
3684                 flags |= METASLAB_GANG_CHILD;
3685         }
3686
3687         ASSERT(IO_IS_ALLOCATING(pio));
3688         ASSERT3P(zio, !=, zio->io_logical);
3689         ASSERT(zio->io_logical != NULL);
3690         ASSERT(!(zio->io_flags & ZIO_FLAG_IO_REPAIR));
3691         ASSERT0(zio->io_flags & ZIO_FLAG_NOPWRITE);
3692
3693         mutex_enter(&pio->io_lock);
3694         metaslab_group_alloc_decrement(zio->io_spa, vd->vdev_id, pio, flags);
3695         mutex_exit(&pio->io_lock);
3696
3697         metaslab_class_throttle_unreserve(spa_normal_class(zio->io_spa),
3698             1, pio);
3699
3700         /*
3701          * Call into the pipeline to see if there is more work that
3702          * needs to be done. If there is work to be done it will be
3703          * dispatched to another taskq thread.
3704          */
3705         zio_allocate_dispatch(zio->io_spa);
3706 }
3707
3708 static int
3709 zio_done(zio_t *zio)
3710 {
3711         /*
3712          * Always attempt to keep stack usage minimal here since
3713          * we can be called recurisvely up to 19 levels deep.
3714          */
3715         const uint64_t psize = zio->io_size;
3716         zio_t *pio, *pio_next;
3717         int c, w;
3718         zio_link_t *zl = NULL;
3719
3720         /*
3721          * If our children haven't all completed,
3722          * wait for them and then repeat this pipeline stage.
3723          */
3724         if (zio_wait_for_children(zio, ZIO_CHILD_VDEV, ZIO_WAIT_DONE) ||
3725             zio_wait_for_children(zio, ZIO_CHILD_GANG, ZIO_WAIT_DONE) ||
3726             zio_wait_for_children(zio, ZIO_CHILD_DDT, ZIO_WAIT_DONE) ||
3727             zio_wait_for_children(zio, ZIO_CHILD_LOGICAL, ZIO_WAIT_DONE))
3728                 return (ZIO_PIPELINE_STOP);
3729
3730         /*
3731          * If the allocation throttle is enabled, then update the accounting.
3732          * We only track child I/Os that are part of an allocating async
3733          * write. We must do this since the allocation is performed
3734          * by the logical I/O but the actual write is done by child I/Os.
3735          */
3736         if (zio->io_flags & ZIO_FLAG_IO_ALLOCATING &&
3737             zio->io_child_type == ZIO_CHILD_VDEV) {
3738                 ASSERT(spa_normal_class(
3739                     zio->io_spa)->mc_alloc_throttle_enabled);
3740                 zio_dva_throttle_done(zio);
3741         }
3742
3743         /*
3744          * If the allocation throttle is enabled, verify that
3745          * we have decremented the refcounts for every I/O that was throttled.
3746          */
3747         if (zio->io_flags & ZIO_FLAG_IO_ALLOCATING) {
3748                 ASSERT(zio->io_type == ZIO_TYPE_WRITE);
3749                 ASSERT(zio->io_priority == ZIO_PRIORITY_ASYNC_WRITE);
3750                 ASSERT(zio->io_bp != NULL);
3751                 metaslab_group_alloc_verify(zio->io_spa, zio->io_bp, zio);
3752                 VERIFY(refcount_not_held(
3753                     &(spa_normal_class(zio->io_spa)->mc_alloc_slots), zio));
3754         }
3755
3756
3757         for (c = 0; c < ZIO_CHILD_TYPES; c++)
3758                 for (w = 0; w < ZIO_WAIT_TYPES; w++)
3759                         ASSERT(zio->io_children[c][w] == 0);
3760
3761         if (zio->io_bp != NULL && !BP_IS_EMBEDDED(zio->io_bp)) {
3762                 ASSERT(zio->io_bp->blk_pad[0] == 0);
3763                 ASSERT(zio->io_bp->blk_pad[1] == 0);
3764                 ASSERT(bcmp(zio->io_bp, &zio->io_bp_copy,
3765                     sizeof (blkptr_t)) == 0 ||
3766                     (zio->io_bp == zio_unique_parent(zio)->io_bp));
3767                 if (zio->io_type == ZIO_TYPE_WRITE && !BP_IS_HOLE(zio->io_bp) &&
3768                     zio->io_bp_override == NULL &&
3769                     !(zio->io_flags & ZIO_FLAG_IO_REPAIR)) {
3770                         ASSERT(!BP_SHOULD_BYTESWAP(zio->io_bp));
3771                         ASSERT3U(zio->io_prop.zp_copies, <=,
3772                             BP_GET_NDVAS(zio->io_bp));
3773                         ASSERT(BP_COUNT_GANG(zio->io_bp) == 0 ||
3774                             (BP_COUNT_GANG(zio->io_bp) ==
3775                             BP_GET_NDVAS(zio->io_bp)));
3776                 }
3777                 if (zio->io_flags & ZIO_FLAG_NOPWRITE)
3778                         VERIFY(BP_EQUAL(zio->io_bp, &zio->io_bp_orig));
3779         }
3780
3781         /*
3782          * If there were child vdev/gang/ddt errors, they apply to us now.
3783          */
3784         zio_inherit_child_errors(zio, ZIO_CHILD_VDEV);
3785         zio_inherit_child_errors(zio, ZIO_CHILD_GANG);
3786         zio_inherit_child_errors(zio, ZIO_CHILD_DDT);
3787
3788         /*
3789          * If the I/O on the transformed data was successful, generate any
3790          * checksum reports now while we still have the transformed data.
3791          */
3792         if (zio->io_error == 0) {
3793                 while (zio->io_cksum_report != NULL) {
3794                         zio_cksum_report_t *zcr = zio->io_cksum_report;
3795                         uint64_t align = zcr->zcr_align;
3796                         uint64_t asize = P2ROUNDUP(psize, align);
3797                         abd_t *adata = zio->io_abd;
3798
3799                         if (asize != psize) {
3800                                 adata = abd_alloc(asize, B_TRUE);
3801                                 abd_copy(adata, zio->io_abd, psize);
3802                                 abd_zero_off(adata, psize, asize - psize);
3803                         }
3804
3805                         zio->io_cksum_report = zcr->zcr_next;
3806                         zcr->zcr_next = NULL;
3807                         zcr->zcr_finish(zcr, adata);
3808                         zfs_ereport_free_checksum(zcr);
3809
3810                         if (asize != psize)
3811                                 abd_free(adata);
3812                 }
3813         }
3814
3815         zio_pop_transforms(zio);        /* note: may set zio->io_error */
3816
3817         vdev_stat_update(zio, psize);
3818
3819         /*
3820          * If this I/O is attached to a particular vdev is slow, exceeding
3821          * 30 seconds to complete, post an error described the I/O delay.
3822          * We ignore these errors if the device is currently unavailable.
3823          */
3824         if (zio->io_delay >= MSEC2NSEC(zio_delay_max)) {
3825                 if (zio->io_vd != NULL && !vdev_is_dead(zio->io_vd))
3826                         zfs_ereport_post(FM_EREPORT_ZFS_DELAY, zio->io_spa,
3827                             zio->io_vd, zio, 0, 0);
3828         }
3829
3830         if (zio->io_error) {
3831                 /*
3832                  * If this I/O is attached to a particular vdev,
3833                  * generate an error message describing the I/O failure
3834                  * at the block level.  We ignore these errors if the
3835                  * device is currently unavailable.
3836                  */
3837                 if (zio->io_error != ECKSUM && zio->io_vd != NULL &&
3838                     !vdev_is_dead(zio->io_vd))
3839                         zfs_ereport_post(FM_EREPORT_ZFS_IO, zio->io_spa,
3840                             zio->io_vd, zio, 0, 0);
3841
3842                 if ((zio->io_error == EIO || !(zio->io_flags &
3843                     (ZIO_FLAG_SPECULATIVE | ZIO_FLAG_DONT_PROPAGATE))) &&
3844                     zio == zio->io_logical) {
3845                         /*
3846                          * For logical I/O requests, tell the SPA to log the
3847                          * error and generate a logical data ereport.
3848                          */
3849                         spa_log_error(zio->io_spa, zio);
3850                         zfs_ereport_post(FM_EREPORT_ZFS_DATA, zio->io_spa,
3851                             NULL, zio, 0, 0);
3852                 }
3853         }
3854
3855         if (zio->io_error && zio == zio->io_logical) {
3856                 /*
3857                  * Determine whether zio should be reexecuted.  This will
3858                  * propagate all the way to the root via zio_notify_parent().
3859                  */
3860                 ASSERT(zio->io_vd == NULL && zio->io_bp != NULL);
3861                 ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
3862
3863                 if (IO_IS_ALLOCATING(zio) &&
3864                     !(zio->io_flags & ZIO_FLAG_CANFAIL)) {
3865                         if (zio->io_error != ENOSPC)
3866                                 zio->io_reexecute |= ZIO_REEXECUTE_NOW;
3867                         else
3868                                 zio->io_reexecute |= ZIO_REEXECUTE_SUSPEND;
3869                 }
3870
3871                 if ((zio->io_type == ZIO_TYPE_READ ||
3872                     zio->io_type == ZIO_TYPE_FREE) &&
3873                     !(zio->io_flags & ZIO_FLAG_SCAN_THREAD) &&
3874                     zio->io_error == ENXIO &&
3875                     spa_load_state(zio->io_spa) == SPA_LOAD_NONE &&
3876                     spa_get_failmode(zio->io_spa) != ZIO_FAILURE_MODE_CONTINUE)
3877                         zio->io_reexecute |= ZIO_REEXECUTE_SUSPEND;
3878
3879                 if (!(zio->io_flags & ZIO_FLAG_CANFAIL) && !zio->io_reexecute)
3880                         zio->io_reexecute |= ZIO_REEXECUTE_SUSPEND;
3881
3882                 /*
3883                  * Here is a possibly good place to attempt to do
3884                  * either combinatorial reconstruction or error correction
3885                  * based on checksums.  It also might be a good place
3886                  * to send out preliminary ereports before we suspend
3887                  * processing.
3888                  */
3889         }
3890
3891         /*
3892          * If there were logical child errors, they apply to us now.
3893          * We defer this until now to avoid conflating logical child
3894          * errors with errors that happened to the zio itself when
3895          * updating vdev stats and reporting FMA events above.
3896          */
3897         zio_inherit_child_errors(zio, ZIO_CHILD_LOGICAL);
3898
3899         if ((zio->io_error || zio->io_reexecute) &&
3900             IO_IS_ALLOCATING(zio) && zio->io_gang_leader == zio &&
3901             !(zio->io_flags & (ZIO_FLAG_IO_REWRITE | ZIO_FLAG_NOPWRITE)))
3902                 zio_dva_unallocate(zio, zio->io_gang_tree, zio->io_bp);
3903
3904         zio_gang_tree_free(&zio->io_gang_tree);
3905
3906         /*
3907          * Godfather I/Os should never suspend.
3908          */
3909         if ((zio->io_flags & ZIO_FLAG_GODFATHER) &&
3910             (zio->io_reexecute & ZIO_REEXECUTE_SUSPEND))
3911                 zio->io_reexecute &= ~ZIO_REEXECUTE_SUSPEND;
3912
3913         if (zio->io_reexecute) {
3914                 /*
3915                  * This is a logical I/O that wants to reexecute.
3916                  *
3917                  * Reexecute is top-down.  When an i/o fails, if it's not
3918                  * the root, it simply notifies its parent and sticks around.
3919                  * The parent, seeing that it still has children in zio_done(),
3920                  * does the same.  This percolates all the way up to the root.
3921                  * The root i/o will reexecute or suspend the entire tree.
3922                  *
3923                  * This approach ensures that zio_reexecute() honors
3924                  * all the original i/o dependency relationships, e.g.
3925                  * parents not executing until children are ready.
3926                  */
3927                 ASSERT(zio->io_child_type == ZIO_CHILD_LOGICAL);
3928
3929                 zio->io_gang_leader = NULL;
3930
3931                 mutex_enter(&zio->io_lock);
3932                 zio->io_state[ZIO_WAIT_DONE] = 1;
3933                 mutex_exit(&zio->io_lock);
3934
3935                 /*
3936                  * "The Godfather" I/O monitors its children but is
3937                  * not a true parent to them. It will track them through
3938                  * the pipeline but severs its ties whenever they get into
3939                  * trouble (e.g. suspended). This allows "The Godfather"
3940                  * I/O to return status without blocking.
3941                  */
3942                 zl = NULL;
3943                 for (pio = zio_walk_parents(zio, &zl); pio != NULL;
3944                     pio = pio_next) {
3945                         zio_link_t *remove_zl = zl;
3946                         pio_next = zio_walk_parents(zio, &zl);
3947
3948                         if ((pio->io_flags & ZIO_FLAG_GODFATHER) &&
3949                             (zio->io_reexecute & ZIO_REEXECUTE_SUSPEND)) {
3950                                 zio_remove_child(pio, zio, remove_zl);
3951                                 zio_notify_parent(pio, zio, ZIO_WAIT_DONE);
3952                         }
3953                 }
3954
3955                 if ((pio = zio_unique_parent(zio)) != NULL) {
3956                         /*
3957                          * We're not a root i/o, so there's nothing to do
3958                          * but notify our parent.  Don't propagate errors
3959                          * upward since we haven't permanently failed yet.
3960                          */
3961                         ASSERT(!(zio->io_flags & ZIO_FLAG_GODFATHER));
3962                         zio->io_flags |= ZIO_FLAG_DONT_PROPAGATE;
3963                         zio_notify_parent(pio, zio, ZIO_WAIT_DONE);
3964                 } else if (zio->io_reexecute & ZIO_REEXECUTE_SUSPEND) {
3965                         /*
3966                          * We'd fail again if we reexecuted now, so suspend
3967                          * until conditions improve (e.g. device comes online).
3968                          */
3969                         zio_suspend(zio->io_spa, zio);
3970                 } else {
3971                         /*
3972                          * Reexecution is potentially a huge amount of work.
3973                          * Hand it off to the otherwise-unused claim taskq.
3974                          */
3975                         ASSERT(taskq_empty_ent(&zio->io_tqent));
3976                         spa_taskq_dispatch_ent(zio->io_spa,
3977                             ZIO_TYPE_CLAIM, ZIO_TASKQ_ISSUE,
3978                             (task_func_t *)zio_reexecute, zio, 0,
3979                             &zio->io_tqent);
3980                 }
3981                 return (ZIO_PIPELINE_STOP);
3982         }
3983
3984         ASSERT(zio->io_child_count == 0);
3985         ASSERT(zio->io_reexecute == 0);
3986         ASSERT(zio->io_error == 0 || (zio->io_flags & ZIO_FLAG_CANFAIL));
3987
3988         /*
3989          * Report any checksum errors, since the I/O is complete.
3990          */
3991         while (zio->io_cksum_report != NULL) {
3992                 zio_cksum_report_t *zcr = zio->io_cksum_report;
3993                 zio->io_cksum_report = zcr->zcr_next;
3994                 zcr->zcr_next = NULL;
3995                 zcr->zcr_finish(zcr, NULL);
3996                 zfs_ereport_free_checksum(zcr);
3997         }
3998
3999         if (zio->io_flags & ZIO_FLAG_FASTWRITE && zio->io_bp &&
4000             !BP_IS_HOLE(zio->io_bp) && !BP_IS_EMBEDDED(zio->io_bp) &&
4001             !(zio->io_flags & ZIO_FLAG_NOPWRITE)) {
4002                 metaslab_fastwrite_unmark(zio->io_spa, zio->io_bp);
4003         }
4004
4005         /*
4006          * It is the responsibility of the done callback to ensure that this
4007          * particular zio is no longer discoverable for adoption, and as
4008          * such, cannot acquire any new parents.
4009          */
4010         if (zio->io_done)
4011                 zio->io_done(zio);
4012
4013         mutex_enter(&zio->io_lock);
4014         zio->io_state[ZIO_WAIT_DONE] = 1;
4015         mutex_exit(&zio->io_lock);
4016
4017         zl = NULL;
4018         for (pio = zio_walk_parents(zio, &zl); pio != NULL; pio = pio_next) {
4019                 zio_link_t *remove_zl = zl;
4020                 pio_next = zio_walk_parents(zio, &zl);
4021                 zio_remove_child(pio, zio, remove_zl);
4022                 zio_notify_parent(pio, zio, ZIO_WAIT_DONE);
4023         }
4024
4025         if (zio->io_waiter != NULL) {
4026                 mutex_enter(&zio->io_lock);
4027                 zio->io_executor = NULL;
4028                 cv_broadcast(&zio->io_cv);
4029                 mutex_exit(&zio->io_lock);
4030         } else {
4031                 zio_destroy(zio);
4032         }
4033
4034         return (ZIO_PIPELINE_STOP);
4035 }
4036
4037 /*
4038  * ==========================================================================
4039  * I/O pipeline definition
4040  * ==========================================================================
4041  */
4042 static zio_pipe_stage_t *zio_pipeline[] = {
4043         NULL,
4044         zio_read_bp_init,
4045         zio_write_bp_init,
4046         zio_free_bp_init,
4047         zio_issue_async,
4048         zio_write_compress,
4049         zio_checksum_generate,
4050         zio_nop_write,
4051         zio_ddt_read_start,
4052         zio_ddt_read_done,
4053         zio_ddt_write,
4054         zio_ddt_free,
4055         zio_gang_assemble,
4056         zio_gang_issue,
4057         zio_dva_throttle,
4058         zio_dva_allocate,
4059         zio_dva_free,
4060         zio_dva_claim,
4061         zio_ready,
4062         zio_vdev_io_start,
4063         zio_vdev_io_done,
4064         zio_vdev_io_assess,
4065         zio_checksum_verify,
4066         zio_done
4067 };
4068
4069
4070
4071
4072 /*
4073  * Compare two zbookmark_phys_t's to see which we would reach first in a
4074  * pre-order traversal of the object tree.
4075  *
4076  * This is simple in every case aside from the meta-dnode object. For all other
4077  * objects, we traverse them in order (object 1 before object 2, and so on).
4078  * However, all of these objects are traversed while traversing object 0, since
4079  * the data it points to is the list of objects.  Thus, we need to convert to a
4080  * canonical representation so we can compare meta-dnode bookmarks to
4081  * non-meta-dnode bookmarks.
4082  *
4083  * We do this by calculating "equivalents" for each field of the zbookmark.
4084  * zbookmarks outside of the meta-dnode use their own object and level, and
4085  * calculate the level 0 equivalent (the first L0 blkid that is contained in the
4086  * blocks this bookmark refers to) by multiplying their blkid by their span
4087  * (the number of L0 blocks contained within one block at their level).
4088  * zbookmarks inside the meta-dnode calculate their object equivalent
4089  * (which is L0equiv * dnodes per data block), use 0 for their L0equiv, and use
4090  * level + 1<<31 (any value larger than a level could ever be) for their level.
4091  * This causes them to always compare before a bookmark in their object
4092  * equivalent, compare appropriately to bookmarks in other objects, and to
4093  * compare appropriately to other bookmarks in the meta-dnode.
4094  */
4095 int
4096 zbookmark_compare(uint16_t dbss1, uint8_t ibs1, uint16_t dbss2, uint8_t ibs2,
4097     const zbookmark_phys_t *zb1, const zbookmark_phys_t *zb2)
4098 {
4099         /*
4100          * These variables represent the "equivalent" values for the zbookmark,
4101          * after converting zbookmarks inside the meta dnode to their
4102          * normal-object equivalents.
4103          */
4104         uint64_t zb1obj, zb2obj;
4105         uint64_t zb1L0, zb2L0;
4106         uint64_t zb1level, zb2level;
4107
4108         if (zb1->zb_object == zb2->zb_object &&
4109             zb1->zb_level == zb2->zb_level &&
4110             zb1->zb_blkid == zb2->zb_blkid)
4111                 return (0);
4112
4113         /*
4114          * BP_SPANB calculates the span in blocks.
4115          */
4116         zb1L0 = (zb1->zb_blkid) * BP_SPANB(ibs1, zb1->zb_level);
4117         zb2L0 = (zb2->zb_blkid) * BP_SPANB(ibs2, zb2->zb_level);
4118
4119         if (zb1->zb_object == DMU_META_DNODE_OBJECT) {
4120                 zb1obj = zb1L0 * (dbss1 << (SPA_MINBLOCKSHIFT - DNODE_SHIFT));
4121                 zb1L0 = 0;
4122                 zb1level = zb1->zb_level + COMPARE_META_LEVEL;
4123         } else {
4124                 zb1obj = zb1->zb_object;
4125                 zb1level = zb1->zb_level;
4126         }
4127
4128         if (zb2->zb_object == DMU_META_DNODE_OBJECT) {
4129                 zb2obj = zb2L0 * (dbss2 << (SPA_MINBLOCKSHIFT - DNODE_SHIFT));
4130                 zb2L0 = 0;
4131                 zb2level = zb2->zb_level + COMPARE_META_LEVEL;
4132         } else {
4133                 zb2obj = zb2->zb_object;
4134                 zb2level = zb2->zb_level;
4135         }
4136
4137         /* Now that we have a canonical representation, do the comparison. */
4138         if (zb1obj != zb2obj)
4139                 return (zb1obj < zb2obj ? -1 : 1);
4140         else if (zb1L0 != zb2L0)
4141                 return (zb1L0 < zb2L0 ? -1 : 1);
4142         else if (zb1level != zb2level)
4143                 return (zb1level > zb2level ? -1 : 1);
4144         /*
4145          * This can (theoretically) happen if the bookmarks have the same object
4146          * and level, but different blkids, if the block sizes are not the same.
4147          * There is presently no way to change the indirect block sizes
4148          */
4149         return (0);
4150 }
4151
4152 /*
4153  *  This function checks the following: given that last_block is the place that
4154  *  our traversal stopped last time, does that guarantee that we've visited
4155  *  every node under subtree_root?  Therefore, we can't just use the raw output
4156  *  of zbookmark_compare.  We have to pass in a modified version of
4157  *  subtree_root; by incrementing the block id, and then checking whether
4158  *  last_block is before or equal to that, we can tell whether or not having
4159  *  visited last_block implies that all of subtree_root's children have been
4160  *  visited.
4161  */
4162 boolean_t
4163 zbookmark_subtree_completed(const dnode_phys_t *dnp,
4164     const zbookmark_phys_t *subtree_root, const zbookmark_phys_t *last_block)
4165 {
4166         zbookmark_phys_t mod_zb = *subtree_root;
4167         mod_zb.zb_blkid++;
4168         ASSERT(last_block->zb_level == 0);
4169
4170         /* The objset_phys_t isn't before anything. */
4171         if (dnp == NULL)
4172                 return (B_FALSE);
4173
4174         /*
4175          * We pass in 1ULL << (DNODE_BLOCK_SHIFT - SPA_MINBLOCKSHIFT) for the
4176          * data block size in sectors, because that variable is only used if
4177          * the bookmark refers to a block in the meta-dnode.  Since we don't
4178          * know without examining it what object it refers to, and there's no
4179          * harm in passing in this value in other cases, we always pass it in.
4180          *
4181          * We pass in 0 for the indirect block size shift because zb2 must be
4182          * level 0.  The indirect block size is only used to calculate the span
4183          * of the bookmark, but since the bookmark must be level 0, the span is
4184          * always 1, so the math works out.
4185          *
4186          * If you make changes to how the zbookmark_compare code works, be sure
4187          * to make sure that this code still works afterwards.
4188          */
4189         return (zbookmark_compare(dnp->dn_datablkszsec, dnp->dn_indblkshift,
4190             1ULL << (DNODE_BLOCK_SHIFT - SPA_MINBLOCKSHIFT), 0, &mod_zb,
4191             last_block) <= 0);
4192 }
4193
4194 #if defined(_KERNEL) && defined(HAVE_SPL)
4195 EXPORT_SYMBOL(zio_type_name);
4196 EXPORT_SYMBOL(zio_buf_alloc);
4197 EXPORT_SYMBOL(zio_data_buf_alloc);
4198 EXPORT_SYMBOL(zio_buf_free);
4199 EXPORT_SYMBOL(zio_data_buf_free);
4200
4201 module_param(zio_delay_max, int, 0644);
4202 MODULE_PARM_DESC(zio_delay_max, "Max zio millisec delay before posting event");
4203
4204 module_param(zio_requeue_io_start_cut_in_line, int, 0644);
4205 MODULE_PARM_DESC(zio_requeue_io_start_cut_in_line, "Prioritize requeued I/O");
4206
4207 module_param(zfs_sync_pass_deferred_free, int, 0644);
4208 MODULE_PARM_DESC(zfs_sync_pass_deferred_free,
4209         "Defer frees starting in this pass");
4210
4211 module_param(zfs_sync_pass_dont_compress, int, 0644);
4212 MODULE_PARM_DESC(zfs_sync_pass_dont_compress,
4213         "Don't compress starting in this pass");
4214
4215 module_param(zfs_sync_pass_rewrite, int, 0644);
4216 MODULE_PARM_DESC(zfs_sync_pass_rewrite,
4217         "Rewrite new bps starting in this pass");
4218
4219 module_param(zio_dva_throttle_enabled, int, 0644);
4220 MODULE_PARM_DESC(zio_dva_throttle_enabled,
4221         "Throttle block allocations in the ZIO pipeline");
4222 #endif