]> granicus.if.org Git - zfs/blob - module/zfs/dmu_send.c
OpenZFS 9682 - page fault in dsl_async_clone_destroy() while opening pool
[zfs] / module / zfs / dmu_send.c
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23  * Copyright 2011 Nexenta Systems, Inc. All rights reserved.
24  * Copyright (c) 2011, 2015 by Delphix. All rights reserved.
25  * Copyright (c) 2014, Joyent, Inc. All rights reserved.
26  * Copyright 2014 HybridCluster. All rights reserved.
27  * Copyright 2016 RackTop Systems.
28  * Copyright (c) 2016 Actifio, Inc. All rights reserved.
29  */
30
31 #include <sys/dmu.h>
32 #include <sys/dmu_impl.h>
33 #include <sys/dmu_tx.h>
34 #include <sys/dbuf.h>
35 #include <sys/dnode.h>
36 #include <sys/zfs_context.h>
37 #include <sys/dmu_objset.h>
38 #include <sys/dmu_traverse.h>
39 #include <sys/dsl_dataset.h>
40 #include <sys/dsl_dir.h>
41 #include <sys/dsl_prop.h>
42 #include <sys/dsl_pool.h>
43 #include <sys/dsl_synctask.h>
44 #include <sys/spa_impl.h>
45 #include <sys/zfs_ioctl.h>
46 #include <sys/zap.h>
47 #include <sys/zio_checksum.h>
48 #include <sys/zfs_znode.h>
49 #include <zfs_fletcher.h>
50 #include <sys/avl.h>
51 #include <sys/ddt.h>
52 #include <sys/zfs_onexit.h>
53 #include <sys/dmu_send.h>
54 #include <sys/dsl_destroy.h>
55 #include <sys/blkptr.h>
56 #include <sys/dsl_bookmark.h>
57 #include <sys/zfeature.h>
58 #include <sys/bqueue.h>
59 #include <sys/zvol.h>
60 #include <sys/policy.h>
61
62 /* Set this tunable to TRUE to replace corrupt data with 0x2f5baddb10c */
63 int zfs_send_corrupt_data = B_FALSE;
64 int zfs_send_queue_length = SPA_MAXBLOCKSIZE;
65 /* Set this tunable to FALSE to disable setting of DRR_FLAG_FREERECORDS */
66 int zfs_send_set_freerecords_bit = B_TRUE;
67
68 /*
69  * Use this to override the recordsize calculation for fast zfs send estimates.
70  */
71 unsigned long zfs_override_estimate_recordsize = 0;
72
73 #define BP_SPAN(datablkszsec, indblkshift, level) \
74         (((uint64_t)datablkszsec) << (SPA_MINBLOCKSHIFT + \
75         (level) * (indblkshift - SPA_BLKPTRSHIFT)))
76
77 struct send_thread_arg {
78         bqueue_t        q;
79         dsl_dataset_t   *ds;            /* Dataset to traverse */
80         uint64_t        fromtxg;        /* Traverse from this txg */
81         int             flags;          /* flags to pass to traverse_dataset */
82         int             error_code;
83         boolean_t       cancel;
84         zbookmark_phys_t resume;
85 };
86
87 struct send_block_record {
88         boolean_t               eos_marker; /* Marks the end of the stream */
89         blkptr_t                bp;
90         zbookmark_phys_t        zb;
91         uint8_t                 indblkshift;
92         uint16_t                datablkszsec;
93         bqueue_node_t           ln;
94 };
95
96 typedef struct dump_bytes_io {
97         dmu_sendarg_t   *dbi_dsp;
98         void            *dbi_buf;
99         int             dbi_len;
100 } dump_bytes_io_t;
101
102 static void
103 dump_bytes_cb(void *arg)
104 {
105         dump_bytes_io_t *dbi = (dump_bytes_io_t *)arg;
106         dmu_sendarg_t *dsp = dbi->dbi_dsp;
107         dsl_dataset_t *ds = dmu_objset_ds(dsp->dsa_os);
108         ssize_t resid; /* have to get resid to get detailed errno */
109
110         /*
111          * The code does not rely on len being a multiple of 8.  We keep
112          * this assertion because of the corresponding assertion in
113          * receive_read().  Keeping this assertion ensures that we do not
114          * inadvertently break backwards compatibility (causing the assertion
115          * in receive_read() to trigger on old software). Newer feature flags
116          * (such as raw send) may break this assertion since they were
117          * introduced after the requirement was made obsolete.
118          */
119
120         ASSERT(dbi->dbi_len % 8 == 0 ||
121             (dsp->dsa_featureflags & DMU_BACKUP_FEATURE_RAW) != 0);
122
123         dsp->dsa_err = vn_rdwr(UIO_WRITE, dsp->dsa_vp,
124             (caddr_t)dbi->dbi_buf, dbi->dbi_len,
125             0, UIO_SYSSPACE, FAPPEND, RLIM64_INFINITY, CRED(), &resid);
126
127         mutex_enter(&ds->ds_sendstream_lock);
128         *dsp->dsa_off += dbi->dbi_len;
129         mutex_exit(&ds->ds_sendstream_lock);
130 }
131
132 static int
133 dump_bytes(dmu_sendarg_t *dsp, void *buf, int len)
134 {
135         dump_bytes_io_t dbi;
136
137         dbi.dbi_dsp = dsp;
138         dbi.dbi_buf = buf;
139         dbi.dbi_len = len;
140
141 #if defined(HAVE_LARGE_STACKS)
142         dump_bytes_cb(&dbi);
143 #else
144         /*
145          * The vn_rdwr() call is performed in a taskq to ensure that there is
146          * always enough stack space to write safely to the target filesystem.
147          * The ZIO_TYPE_FREE threads are used because there can be a lot of
148          * them and they are used in vdev_file.c for a similar purpose.
149          */
150         spa_taskq_dispatch_sync(dmu_objset_spa(dsp->dsa_os), ZIO_TYPE_FREE,
151             ZIO_TASKQ_ISSUE, dump_bytes_cb, &dbi, TQ_SLEEP);
152 #endif /* HAVE_LARGE_STACKS */
153
154         return (dsp->dsa_err);
155 }
156
157 /*
158  * For all record types except BEGIN, fill in the checksum (overlaid in
159  * drr_u.drr_checksum.drr_checksum).  The checksum verifies everything
160  * up to the start of the checksum itself.
161  */
162 static int
163 dump_record(dmu_sendarg_t *dsp, void *payload, int payload_len)
164 {
165         ASSERT3U(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
166             ==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));
167         (void) fletcher_4_incremental_native(dsp->dsa_drr,
168             offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
169             &dsp->dsa_zc);
170         if (dsp->dsa_drr->drr_type == DRR_BEGIN) {
171                 dsp->dsa_sent_begin = B_TRUE;
172         } else {
173                 ASSERT(ZIO_CHECKSUM_IS_ZERO(&dsp->dsa_drr->drr_u.
174                     drr_checksum.drr_checksum));
175                 dsp->dsa_drr->drr_u.drr_checksum.drr_checksum = dsp->dsa_zc;
176         }
177         if (dsp->dsa_drr->drr_type == DRR_END) {
178                 dsp->dsa_sent_end = B_TRUE;
179         }
180         (void) fletcher_4_incremental_native(&dsp->dsa_drr->
181             drr_u.drr_checksum.drr_checksum,
182             sizeof (zio_cksum_t), &dsp->dsa_zc);
183         if (dump_bytes(dsp, dsp->dsa_drr, sizeof (dmu_replay_record_t)) != 0)
184                 return (SET_ERROR(EINTR));
185         if (payload_len != 0) {
186                 (void) fletcher_4_incremental_native(payload, payload_len,
187                     &dsp->dsa_zc);
188                 if (dump_bytes(dsp, payload, payload_len) != 0)
189                         return (SET_ERROR(EINTR));
190         }
191         return (0);
192 }
193
194 /*
195  * Fill in the drr_free struct, or perform aggregation if the previous record is
196  * also a free record, and the two are adjacent.
197  *
198  * Note that we send free records even for a full send, because we want to be
199  * able to receive a full send as a clone, which requires a list of all the free
200  * and freeobject records that were generated on the source.
201  */
202 static int
203 dump_free(dmu_sendarg_t *dsp, uint64_t object, uint64_t offset,
204     uint64_t length)
205 {
206         struct drr_free *drrf = &(dsp->dsa_drr->drr_u.drr_free);
207
208         /*
209          * When we receive a free record, dbuf_free_range() assumes
210          * that the receiving system doesn't have any dbufs in the range
211          * being freed.  This is always true because there is a one-record
212          * constraint: we only send one WRITE record for any given
213          * object,offset.  We know that the one-record constraint is
214          * true because we always send data in increasing order by
215          * object,offset.
216          *
217          * If the increasing-order constraint ever changes, we should find
218          * another way to assert that the one-record constraint is still
219          * satisfied.
220          */
221         ASSERT(object > dsp->dsa_last_data_object ||
222             (object == dsp->dsa_last_data_object &&
223             offset > dsp->dsa_last_data_offset));
224
225         /*
226          * If there is a pending op, but it's not PENDING_FREE, push it out,
227          * since free block aggregation can only be done for blocks of the
228          * same type (i.e., DRR_FREE records can only be aggregated with
229          * other DRR_FREE records.  DRR_FREEOBJECTS records can only be
230          * aggregated with other DRR_FREEOBJECTS records.
231          */
232         if (dsp->dsa_pending_op != PENDING_NONE &&
233             dsp->dsa_pending_op != PENDING_FREE) {
234                 if (dump_record(dsp, NULL, 0) != 0)
235                         return (SET_ERROR(EINTR));
236                 dsp->dsa_pending_op = PENDING_NONE;
237         }
238
239         if (dsp->dsa_pending_op == PENDING_FREE) {
240                 /*
241                  * There should never be a PENDING_FREE if length is
242                  * DMU_OBJECT_END (because dump_dnode is the only place where
243                  * this function is called with a DMU_OBJECT_END, and only after
244                  * flushing any pending record).
245                  */
246                 ASSERT(length != DMU_OBJECT_END);
247                 /*
248                  * Check to see whether this free block can be aggregated
249                  * with pending one.
250                  */
251                 if (drrf->drr_object == object && drrf->drr_offset +
252                     drrf->drr_length == offset) {
253                         if (offset + length < offset)
254                                 drrf->drr_length = DMU_OBJECT_END;
255                         else
256                                 drrf->drr_length += length;
257                         return (0);
258                 } else {
259                         /* not a continuation.  Push out pending record */
260                         if (dump_record(dsp, NULL, 0) != 0)
261                                 return (SET_ERROR(EINTR));
262                         dsp->dsa_pending_op = PENDING_NONE;
263                 }
264         }
265         /* create a FREE record and make it pending */
266         bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
267         dsp->dsa_drr->drr_type = DRR_FREE;
268         drrf->drr_object = object;
269         drrf->drr_offset = offset;
270         if (offset + length < offset)
271                 drrf->drr_length = DMU_OBJECT_END;
272         else
273                 drrf->drr_length = length;
274         drrf->drr_toguid = dsp->dsa_toguid;
275         if (length == DMU_OBJECT_END) {
276                 if (dump_record(dsp, NULL, 0) != 0)
277                         return (SET_ERROR(EINTR));
278         } else {
279                 dsp->dsa_pending_op = PENDING_FREE;
280         }
281
282         return (0);
283 }
284
285 static int
286 dump_write(dmu_sendarg_t *dsp, dmu_object_type_t type, uint64_t object,
287     uint64_t offset, int lsize, int psize, const blkptr_t *bp, void *data)
288 {
289         uint64_t payload_size;
290         boolean_t raw = (dsp->dsa_featureflags & DMU_BACKUP_FEATURE_RAW);
291         struct drr_write *drrw = &(dsp->dsa_drr->drr_u.drr_write);
292
293         /*
294          * We send data in increasing object, offset order.
295          * See comment in dump_free() for details.
296          */
297         ASSERT(object > dsp->dsa_last_data_object ||
298             (object == dsp->dsa_last_data_object &&
299             offset > dsp->dsa_last_data_offset));
300         dsp->dsa_last_data_object = object;
301         dsp->dsa_last_data_offset = offset + lsize - 1;
302
303         /*
304          * If there is any kind of pending aggregation (currently either
305          * a grouping of free objects or free blocks), push it out to
306          * the stream, since aggregation can't be done across operations
307          * of different types.
308          */
309         if (dsp->dsa_pending_op != PENDING_NONE) {
310                 if (dump_record(dsp, NULL, 0) != 0)
311                         return (SET_ERROR(EINTR));
312                 dsp->dsa_pending_op = PENDING_NONE;
313         }
314         /* write a WRITE record */
315         bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
316         dsp->dsa_drr->drr_type = DRR_WRITE;
317         drrw->drr_object = object;
318         drrw->drr_type = type;
319         drrw->drr_offset = offset;
320         drrw->drr_toguid = dsp->dsa_toguid;
321         drrw->drr_logical_size = lsize;
322
323         /* only set the compression fields if the buf is compressed or raw */
324         if (raw || lsize != psize) {
325                 ASSERT(!BP_IS_EMBEDDED(bp));
326                 ASSERT3S(psize, >, 0);
327
328                 if (raw) {
329                         ASSERT(BP_IS_PROTECTED(bp));
330
331                         /*
332                          * This is a raw protected block so we need to pass
333                          * along everything the receiving side will need to
334                          * interpret this block, including the byteswap, salt,
335                          * IV, and MAC.
336                          */
337                         if (BP_SHOULD_BYTESWAP(bp))
338                                 drrw->drr_flags |= DRR_RAW_BYTESWAP;
339                         zio_crypt_decode_params_bp(bp, drrw->drr_salt,
340                             drrw->drr_iv);
341                         zio_crypt_decode_mac_bp(bp, drrw->drr_mac);
342                 } else {
343                         /* this is a compressed block */
344                         ASSERT(dsp->dsa_featureflags &
345                             DMU_BACKUP_FEATURE_COMPRESSED);
346                         ASSERT(!BP_SHOULD_BYTESWAP(bp));
347                         ASSERT(!DMU_OT_IS_METADATA(BP_GET_TYPE(bp)));
348                         ASSERT3U(BP_GET_COMPRESS(bp), !=, ZIO_COMPRESS_OFF);
349                         ASSERT3S(lsize, >=, psize);
350                 }
351
352                 /* set fields common to compressed and raw sends */
353                 drrw->drr_compressiontype = BP_GET_COMPRESS(bp);
354                 drrw->drr_compressed_size = psize;
355                 payload_size = drrw->drr_compressed_size;
356         } else {
357                 payload_size = drrw->drr_logical_size;
358         }
359
360         if (bp == NULL || BP_IS_EMBEDDED(bp) || (BP_IS_PROTECTED(bp) && !raw)) {
361                 /*
362                  * There's no pre-computed checksum for partial-block writes,
363                  * embedded BP's, or encrypted BP's that are being sent as
364                  * plaintext, so (like fletcher4-checkummed blocks) userland
365                  * will have to compute a dedup-capable checksum itself.
366                  */
367                 drrw->drr_checksumtype = ZIO_CHECKSUM_OFF;
368         } else {
369                 drrw->drr_checksumtype = BP_GET_CHECKSUM(bp);
370                 if (zio_checksum_table[drrw->drr_checksumtype].ci_flags &
371                     ZCHECKSUM_FLAG_DEDUP)
372                         drrw->drr_flags |= DRR_CHECKSUM_DEDUP;
373                 DDK_SET_LSIZE(&drrw->drr_key, BP_GET_LSIZE(bp));
374                 DDK_SET_PSIZE(&drrw->drr_key, BP_GET_PSIZE(bp));
375                 DDK_SET_COMPRESS(&drrw->drr_key, BP_GET_COMPRESS(bp));
376                 DDK_SET_CRYPT(&drrw->drr_key, BP_IS_PROTECTED(bp));
377                 drrw->drr_key.ddk_cksum = bp->blk_cksum;
378         }
379
380         if (dump_record(dsp, data, payload_size) != 0)
381                 return (SET_ERROR(EINTR));
382         return (0);
383 }
384
385 static int
386 dump_write_embedded(dmu_sendarg_t *dsp, uint64_t object, uint64_t offset,
387     int blksz, const blkptr_t *bp)
388 {
389         char buf[BPE_PAYLOAD_SIZE];
390         struct drr_write_embedded *drrw =
391             &(dsp->dsa_drr->drr_u.drr_write_embedded);
392
393         if (dsp->dsa_pending_op != PENDING_NONE) {
394                 if (dump_record(dsp, NULL, 0) != 0)
395                         return (SET_ERROR(EINTR));
396                 dsp->dsa_pending_op = PENDING_NONE;
397         }
398
399         ASSERT(BP_IS_EMBEDDED(bp));
400
401         bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
402         dsp->dsa_drr->drr_type = DRR_WRITE_EMBEDDED;
403         drrw->drr_object = object;
404         drrw->drr_offset = offset;
405         drrw->drr_length = blksz;
406         drrw->drr_toguid = dsp->dsa_toguid;
407         drrw->drr_compression = BP_GET_COMPRESS(bp);
408         drrw->drr_etype = BPE_GET_ETYPE(bp);
409         drrw->drr_lsize = BPE_GET_LSIZE(bp);
410         drrw->drr_psize = BPE_GET_PSIZE(bp);
411
412         decode_embedded_bp_compressed(bp, buf);
413
414         if (dump_record(dsp, buf, P2ROUNDUP(drrw->drr_psize, 8)) != 0)
415                 return (SET_ERROR(EINTR));
416         return (0);
417 }
418
419 static int
420 dump_spill(dmu_sendarg_t *dsp, const blkptr_t *bp, uint64_t object, void *data)
421 {
422         struct drr_spill *drrs = &(dsp->dsa_drr->drr_u.drr_spill);
423         uint64_t blksz = BP_GET_LSIZE(bp);
424         uint64_t payload_size = blksz;
425
426         if (dsp->dsa_pending_op != PENDING_NONE) {
427                 if (dump_record(dsp, NULL, 0) != 0)
428                         return (SET_ERROR(EINTR));
429                 dsp->dsa_pending_op = PENDING_NONE;
430         }
431
432         /* write a SPILL record */
433         bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
434         dsp->dsa_drr->drr_type = DRR_SPILL;
435         drrs->drr_object = object;
436         drrs->drr_length = blksz;
437         drrs->drr_toguid = dsp->dsa_toguid;
438
439         /* handle raw send fields */
440         if (dsp->dsa_featureflags & DMU_BACKUP_FEATURE_RAW) {
441                 ASSERT(BP_IS_PROTECTED(bp));
442
443                 if (BP_SHOULD_BYTESWAP(bp))
444                         drrs->drr_flags |= DRR_RAW_BYTESWAP;
445                 drrs->drr_compressiontype = BP_GET_COMPRESS(bp);
446                 drrs->drr_compressed_size = BP_GET_PSIZE(bp);
447                 zio_crypt_decode_params_bp(bp, drrs->drr_salt, drrs->drr_iv);
448                 zio_crypt_decode_mac_bp(bp, drrs->drr_mac);
449                 payload_size = drrs->drr_compressed_size;
450         }
451
452         if (dump_record(dsp, data, payload_size) != 0)
453                 return (SET_ERROR(EINTR));
454         return (0);
455 }
456
457 static int
458 dump_freeobjects(dmu_sendarg_t *dsp, uint64_t firstobj, uint64_t numobjs)
459 {
460         struct drr_freeobjects *drrfo = &(dsp->dsa_drr->drr_u.drr_freeobjects);
461         uint64_t maxobj = DNODES_PER_BLOCK *
462             (DMU_META_DNODE(dsp->dsa_os)->dn_maxblkid + 1);
463
464         /*
465          * ZoL < 0.7 does not handle large FREEOBJECTS records correctly,
466          * leading to zfs recv never completing. to avoid this issue, don't
467          * send FREEOBJECTS records for object IDs which cannot exist on the
468          * receiving side.
469          */
470         if (maxobj > 0) {
471                 if (maxobj < firstobj)
472                         return (0);
473
474                 if (maxobj < firstobj + numobjs)
475                         numobjs = maxobj - firstobj;
476         }
477
478         /*
479          * If there is a pending op, but it's not PENDING_FREEOBJECTS,
480          * push it out, since free block aggregation can only be done for
481          * blocks of the same type (i.e., DRR_FREE records can only be
482          * aggregated with other DRR_FREE records.  DRR_FREEOBJECTS records
483          * can only be aggregated with other DRR_FREEOBJECTS records.
484          */
485         if (dsp->dsa_pending_op != PENDING_NONE &&
486             dsp->dsa_pending_op != PENDING_FREEOBJECTS) {
487                 if (dump_record(dsp, NULL, 0) != 0)
488                         return (SET_ERROR(EINTR));
489                 dsp->dsa_pending_op = PENDING_NONE;
490         }
491         if (dsp->dsa_pending_op == PENDING_FREEOBJECTS) {
492                 /*
493                  * See whether this free object array can be aggregated
494                  * with pending one
495                  */
496                 if (drrfo->drr_firstobj + drrfo->drr_numobjs == firstobj) {
497                         drrfo->drr_numobjs += numobjs;
498                         return (0);
499                 } else {
500                         /* can't be aggregated.  Push out pending record */
501                         if (dump_record(dsp, NULL, 0) != 0)
502                                 return (SET_ERROR(EINTR));
503                         dsp->dsa_pending_op = PENDING_NONE;
504                 }
505         }
506
507         /* write a FREEOBJECTS record */
508         bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
509         dsp->dsa_drr->drr_type = DRR_FREEOBJECTS;
510         drrfo->drr_firstobj = firstobj;
511         drrfo->drr_numobjs = numobjs;
512         drrfo->drr_toguid = dsp->dsa_toguid;
513
514         dsp->dsa_pending_op = PENDING_FREEOBJECTS;
515
516         return (0);
517 }
518
519 static int
520 dump_dnode(dmu_sendarg_t *dsp, const blkptr_t *bp, uint64_t object,
521     dnode_phys_t *dnp)
522 {
523         struct drr_object *drro = &(dsp->dsa_drr->drr_u.drr_object);
524         int bonuslen;
525
526         if (object < dsp->dsa_resume_object) {
527                 /*
528                  * Note: when resuming, we will visit all the dnodes in
529                  * the block of dnodes that we are resuming from.  In
530                  * this case it's unnecessary to send the dnodes prior to
531                  * the one we are resuming from.  We should be at most one
532                  * block's worth of dnodes behind the resume point.
533                  */
534                 ASSERT3U(dsp->dsa_resume_object - object, <,
535                     1 << (DNODE_BLOCK_SHIFT - DNODE_SHIFT));
536                 return (0);
537         }
538
539         if (dnp == NULL || dnp->dn_type == DMU_OT_NONE)
540                 return (dump_freeobjects(dsp, object, 1));
541
542         if (dsp->dsa_pending_op != PENDING_NONE) {
543                 if (dump_record(dsp, NULL, 0) != 0)
544                         return (SET_ERROR(EINTR));
545                 dsp->dsa_pending_op = PENDING_NONE;
546         }
547
548         /* write an OBJECT record */
549         bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
550         dsp->dsa_drr->drr_type = DRR_OBJECT;
551         drro->drr_object = object;
552         drro->drr_type = dnp->dn_type;
553         drro->drr_bonustype = dnp->dn_bonustype;
554         drro->drr_blksz = dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT;
555         drro->drr_bonuslen = dnp->dn_bonuslen;
556         drro->drr_dn_slots = dnp->dn_extra_slots + 1;
557         drro->drr_checksumtype = dnp->dn_checksum;
558         drro->drr_compress = dnp->dn_compress;
559         drro->drr_toguid = dsp->dsa_toguid;
560
561         if (!(dsp->dsa_featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) &&
562             drro->drr_blksz > SPA_OLD_MAXBLOCKSIZE)
563                 drro->drr_blksz = SPA_OLD_MAXBLOCKSIZE;
564
565         bonuslen = P2ROUNDUP(dnp->dn_bonuslen, 8);
566
567         if ((dsp->dsa_featureflags & DMU_BACKUP_FEATURE_RAW)) {
568                 ASSERT(BP_IS_ENCRYPTED(bp));
569
570                 if (BP_SHOULD_BYTESWAP(bp))
571                         drro->drr_flags |= DRR_RAW_BYTESWAP;
572
573                 /* needed for reconstructing dnp on recv side */
574                 drro->drr_maxblkid = dnp->dn_maxblkid;
575                 drro->drr_indblkshift = dnp->dn_indblkshift;
576                 drro->drr_nlevels = dnp->dn_nlevels;
577                 drro->drr_nblkptr = dnp->dn_nblkptr;
578
579                 /*
580                  * Since we encrypt the entire bonus area, the (raw) part
581                  * beyond the bonuslen is actually nonzero, so we need
582                  * to send it.
583                  */
584                 if (bonuslen != 0) {
585                         drro->drr_raw_bonuslen = DN_MAX_BONUS_LEN(dnp);
586                         bonuslen = drro->drr_raw_bonuslen;
587                 }
588         }
589
590         if (dump_record(dsp, DN_BONUS(dnp), bonuslen) != 0)
591                 return (SET_ERROR(EINTR));
592
593         /* Free anything past the end of the file. */
594         if (dump_free(dsp, object, (dnp->dn_maxblkid + 1) *
595             (dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT), DMU_OBJECT_END) != 0)
596                 return (SET_ERROR(EINTR));
597         if (dsp->dsa_err != 0)
598                 return (SET_ERROR(EINTR));
599         return (0);
600 }
601
602 static int
603 dump_object_range(dmu_sendarg_t *dsp, const blkptr_t *bp, uint64_t firstobj,
604     uint64_t numslots)
605 {
606         struct drr_object_range *drror =
607             &(dsp->dsa_drr->drr_u.drr_object_range);
608
609         /* we only use this record type for raw sends */
610         ASSERT(BP_IS_PROTECTED(bp));
611         ASSERT(dsp->dsa_featureflags & DMU_BACKUP_FEATURE_RAW);
612         ASSERT3U(BP_GET_COMPRESS(bp), ==, ZIO_COMPRESS_OFF);
613         ASSERT3U(BP_GET_TYPE(bp), ==, DMU_OT_DNODE);
614         ASSERT0(BP_GET_LEVEL(bp));
615
616         if (dsp->dsa_pending_op != PENDING_NONE) {
617                 if (dump_record(dsp, NULL, 0) != 0)
618                         return (SET_ERROR(EINTR));
619                 dsp->dsa_pending_op = PENDING_NONE;
620         }
621
622         bzero(dsp->dsa_drr, sizeof (dmu_replay_record_t));
623         dsp->dsa_drr->drr_type = DRR_OBJECT_RANGE;
624         drror->drr_firstobj = firstobj;
625         drror->drr_numslots = numslots;
626         drror->drr_toguid = dsp->dsa_toguid;
627         if (BP_SHOULD_BYTESWAP(bp))
628                 drror->drr_flags |= DRR_RAW_BYTESWAP;
629         zio_crypt_decode_params_bp(bp, drror->drr_salt, drror->drr_iv);
630         zio_crypt_decode_mac_bp(bp, drror->drr_mac);
631
632         if (dump_record(dsp, NULL, 0) != 0)
633                 return (SET_ERROR(EINTR));
634         return (0);
635 }
636
637 static boolean_t
638 backup_do_embed(dmu_sendarg_t *dsp, const blkptr_t *bp)
639 {
640         if (!BP_IS_EMBEDDED(bp))
641                 return (B_FALSE);
642
643         /*
644          * Compression function must be legacy, or explicitly enabled.
645          */
646         if ((BP_GET_COMPRESS(bp) >= ZIO_COMPRESS_LEGACY_FUNCTIONS &&
647             !(dsp->dsa_featureflags & DMU_BACKUP_FEATURE_LZ4)))
648                 return (B_FALSE);
649
650         /*
651          * Embed type must be explicitly enabled.
652          */
653         switch (BPE_GET_ETYPE(bp)) {
654         case BP_EMBEDDED_TYPE_DATA:
655                 if (dsp->dsa_featureflags & DMU_BACKUP_FEATURE_EMBED_DATA)
656                         return (B_TRUE);
657                 break;
658         default:
659                 return (B_FALSE);
660         }
661         return (B_FALSE);
662 }
663
664 /*
665  * This is the callback function to traverse_dataset that acts as the worker
666  * thread for dmu_send_impl.
667  */
668 /*ARGSUSED*/
669 static int
670 send_cb(spa_t *spa, zilog_t *zilog, const blkptr_t *bp,
671     const zbookmark_phys_t *zb, const struct dnode_phys *dnp, void *arg)
672 {
673         struct send_thread_arg *sta = arg;
674         struct send_block_record *record;
675         uint64_t record_size;
676         int err = 0;
677
678         ASSERT(zb->zb_object == DMU_META_DNODE_OBJECT ||
679             zb->zb_object >= sta->resume.zb_object);
680         ASSERT3P(sta->ds, !=, NULL);
681
682         if (sta->cancel)
683                 return (SET_ERROR(EINTR));
684
685         if (bp == NULL) {
686                 ASSERT3U(zb->zb_level, ==, ZB_DNODE_LEVEL);
687                 return (0);
688         } else if (zb->zb_level < 0) {
689                 return (0);
690         }
691
692         record = kmem_zalloc(sizeof (struct send_block_record), KM_SLEEP);
693         record->eos_marker = B_FALSE;
694         record->bp = *bp;
695         record->zb = *zb;
696         record->indblkshift = dnp->dn_indblkshift;
697         record->datablkszsec = dnp->dn_datablkszsec;
698         record_size = dnp->dn_datablkszsec << SPA_MINBLOCKSHIFT;
699         bqueue_enqueue(&sta->q, record, record_size);
700
701         return (err);
702 }
703
704 /*
705  * This function kicks off the traverse_dataset.  It also handles setting the
706  * error code of the thread in case something goes wrong, and pushes the End of
707  * Stream record when the traverse_dataset call has finished.  If there is no
708  * dataset to traverse, the thread immediately pushes End of Stream marker.
709  */
710 static void
711 send_traverse_thread(void *arg)
712 {
713         struct send_thread_arg *st_arg = arg;
714         int err;
715         struct send_block_record *data;
716         fstrans_cookie_t cookie = spl_fstrans_mark();
717
718         if (st_arg->ds != NULL) {
719                 err = traverse_dataset_resume(st_arg->ds,
720                     st_arg->fromtxg, &st_arg->resume,
721                     st_arg->flags, send_cb, st_arg);
722
723                 if (err != EINTR)
724                         st_arg->error_code = err;
725         }
726         data = kmem_zalloc(sizeof (*data), KM_SLEEP);
727         data->eos_marker = B_TRUE;
728         bqueue_enqueue(&st_arg->q, data, 1);
729         spl_fstrans_unmark(cookie);
730         thread_exit();
731 }
732
733 /*
734  * This function actually handles figuring out what kind of record needs to be
735  * dumped, reading the data (which has hopefully been prefetched), and calling
736  * the appropriate helper function.
737  */
738 static int
739 do_dump(dmu_sendarg_t *dsa, struct send_block_record *data)
740 {
741         dsl_dataset_t *ds = dmu_objset_ds(dsa->dsa_os);
742         const blkptr_t *bp = &data->bp;
743         const zbookmark_phys_t *zb = &data->zb;
744         uint8_t indblkshift = data->indblkshift;
745         uint16_t dblkszsec = data->datablkszsec;
746         spa_t *spa = ds->ds_dir->dd_pool->dp_spa;
747         dmu_object_type_t type = bp ? BP_GET_TYPE(bp) : DMU_OT_NONE;
748         int err = 0;
749
750         ASSERT3U(zb->zb_level, >=, 0);
751
752         ASSERT(zb->zb_object == DMU_META_DNODE_OBJECT ||
753             zb->zb_object >= dsa->dsa_resume_object);
754
755         /*
756          * All bps of an encrypted os should have the encryption bit set.
757          * If this is not true it indicates tampering and we report an error.
758          */
759         if (dsa->dsa_os->os_encrypted &&
760             !BP_IS_HOLE(bp) && !BP_USES_CRYPT(bp)) {
761                 spa_log_error(spa, zb);
762                 zfs_panic_recover("unencrypted block in encrypted "
763                     "object set %llu", ds->ds_object);
764                 return (SET_ERROR(EIO));
765         }
766
767         if (zb->zb_object != DMU_META_DNODE_OBJECT &&
768             DMU_OBJECT_IS_SPECIAL(zb->zb_object)) {
769                 return (0);
770         } else if (BP_IS_HOLE(bp) &&
771             zb->zb_object == DMU_META_DNODE_OBJECT) {
772                 uint64_t span = BP_SPAN(dblkszsec, indblkshift, zb->zb_level);
773                 uint64_t dnobj = (zb->zb_blkid * span) >> DNODE_SHIFT;
774                 err = dump_freeobjects(dsa, dnobj, span >> DNODE_SHIFT);
775         } else if (BP_IS_HOLE(bp)) {
776                 uint64_t span = BP_SPAN(dblkszsec, indblkshift, zb->zb_level);
777                 uint64_t offset = zb->zb_blkid * span;
778                 /* Don't dump free records for offsets > DMU_OBJECT_END */
779                 if (zb->zb_blkid == 0 || span <= DMU_OBJECT_END / zb->zb_blkid)
780                         err = dump_free(dsa, zb->zb_object, offset, span);
781         } else if (zb->zb_level > 0 || type == DMU_OT_OBJSET) {
782                 return (0);
783         } else if (type == DMU_OT_DNODE) {
784                 int epb = BP_GET_LSIZE(bp) >> DNODE_SHIFT;
785                 arc_flags_t aflags = ARC_FLAG_WAIT;
786                 arc_buf_t *abuf;
787                 enum zio_flag zioflags = ZIO_FLAG_CANFAIL;
788
789                 if (dsa->dsa_featureflags & DMU_BACKUP_FEATURE_RAW) {
790                         ASSERT(BP_IS_ENCRYPTED(bp));
791                         ASSERT3U(BP_GET_COMPRESS(bp), ==, ZIO_COMPRESS_OFF);
792                         zioflags |= ZIO_FLAG_RAW;
793                 }
794
795                 ASSERT0(zb->zb_level);
796
797                 if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
798                     ZIO_PRIORITY_ASYNC_READ, zioflags, &aflags, zb) != 0)
799                         return (SET_ERROR(EIO));
800
801                 dnode_phys_t *blk = abuf->b_data;
802                 uint64_t dnobj = zb->zb_blkid * epb;
803
804                 /*
805                  * Raw sends require sending encryption parameters for the
806                  * block of dnodes. Regular sends do not need to send this
807                  * info.
808                  */
809                 if (dsa->dsa_featureflags & DMU_BACKUP_FEATURE_RAW) {
810                         ASSERT(arc_is_encrypted(abuf));
811                         err = dump_object_range(dsa, bp, dnobj, epb);
812                 }
813
814                 if (err == 0) {
815                         for (int i = 0; i < epb;
816                             i += blk[i].dn_extra_slots + 1) {
817                                 err = dump_dnode(dsa, bp, dnobj + i, blk + i);
818                                 if (err != 0)
819                                         break;
820                         }
821                 }
822                 arc_buf_destroy(abuf, &abuf);
823         } else if (type == DMU_OT_SA) {
824                 arc_flags_t aflags = ARC_FLAG_WAIT;
825                 arc_buf_t *abuf;
826                 enum zio_flag zioflags = ZIO_FLAG_CANFAIL;
827
828                 if (dsa->dsa_featureflags & DMU_BACKUP_FEATURE_RAW) {
829                         ASSERT(BP_IS_PROTECTED(bp));
830                         zioflags |= ZIO_FLAG_RAW;
831                 }
832
833                 if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
834                     ZIO_PRIORITY_ASYNC_READ, zioflags, &aflags, zb) != 0)
835                         return (SET_ERROR(EIO));
836
837                 err = dump_spill(dsa, bp, zb->zb_object, abuf->b_data);
838                 arc_buf_destroy(abuf, &abuf);
839         } else if (backup_do_embed(dsa, bp)) {
840                 /* it's an embedded level-0 block of a regular object */
841                 int blksz = dblkszsec << SPA_MINBLOCKSHIFT;
842                 ASSERT0(zb->zb_level);
843                 err = dump_write_embedded(dsa, zb->zb_object,
844                     zb->zb_blkid * blksz, blksz, bp);
845         } else {
846                 /* it's a level-0 block of a regular object */
847                 arc_flags_t aflags = ARC_FLAG_WAIT;
848                 arc_buf_t *abuf;
849                 int blksz = dblkszsec << SPA_MINBLOCKSHIFT;
850                 uint64_t offset;
851
852                 /*
853                  * If we have large blocks stored on disk but the send flags
854                  * don't allow us to send large blocks, we split the data from
855                  * the arc buf into chunks.
856                  */
857                 boolean_t split_large_blocks = blksz > SPA_OLD_MAXBLOCKSIZE &&
858                     !(dsa->dsa_featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS);
859
860                 /*
861                  * Raw sends require that we always get raw data as it exists
862                  * on disk, so we assert that we are not splitting blocks here.
863                  */
864                 boolean_t request_raw =
865                     (dsa->dsa_featureflags & DMU_BACKUP_FEATURE_RAW) != 0;
866
867                 /*
868                  * We should only request compressed data from the ARC if all
869                  * the following are true:
870                  *  - stream compression was requested
871                  *  - we aren't splitting large blocks into smaller chunks
872                  *  - the data won't need to be byteswapped before sending
873                  *  - this isn't an embedded block
874                  *  - this isn't metadata (if receiving on a different endian
875                  *    system it can be byteswapped more easily)
876                  */
877                 boolean_t request_compressed =
878                     (dsa->dsa_featureflags & DMU_BACKUP_FEATURE_COMPRESSED) &&
879                     !split_large_blocks && !BP_SHOULD_BYTESWAP(bp) &&
880                     !BP_IS_EMBEDDED(bp) && !DMU_OT_IS_METADATA(BP_GET_TYPE(bp));
881
882                 IMPLY(request_raw, !split_large_blocks);
883                 IMPLY(request_raw, BP_IS_PROTECTED(bp));
884                 ASSERT0(zb->zb_level);
885                 ASSERT(zb->zb_object > dsa->dsa_resume_object ||
886                     (zb->zb_object == dsa->dsa_resume_object &&
887                     zb->zb_blkid * blksz >= dsa->dsa_resume_offset));
888
889                 ASSERT3U(blksz, ==, BP_GET_LSIZE(bp));
890
891                 enum zio_flag zioflags = ZIO_FLAG_CANFAIL;
892                 if (request_raw)
893                         zioflags |= ZIO_FLAG_RAW;
894                 else if (request_compressed)
895                         zioflags |= ZIO_FLAG_RAW_COMPRESS;
896
897                 if (arc_read(NULL, spa, bp, arc_getbuf_func, &abuf,
898                     ZIO_PRIORITY_ASYNC_READ, zioflags, &aflags, zb) != 0) {
899                         if (zfs_send_corrupt_data) {
900                                 /* Send a block filled with 0x"zfs badd bloc" */
901                                 abuf = arc_alloc_buf(spa, &abuf, ARC_BUFC_DATA,
902                                     blksz);
903                                 uint64_t *ptr;
904                                 for (ptr = abuf->b_data;
905                                     (char *)ptr < (char *)abuf->b_data + blksz;
906                                     ptr++)
907                                         *ptr = 0x2f5baddb10cULL;
908                         } else {
909                                 return (SET_ERROR(EIO));
910                         }
911                 }
912
913                 offset = zb->zb_blkid * blksz;
914
915                 if (split_large_blocks) {
916                         ASSERT0(arc_is_encrypted(abuf));
917                         ASSERT3U(arc_get_compression(abuf), ==,
918                             ZIO_COMPRESS_OFF);
919                         char *buf = abuf->b_data;
920                         while (blksz > 0 && err == 0) {
921                                 int n = MIN(blksz, SPA_OLD_MAXBLOCKSIZE);
922                                 err = dump_write(dsa, type, zb->zb_object,
923                                     offset, n, n, NULL, buf);
924                                 offset += n;
925                                 buf += n;
926                                 blksz -= n;
927                         }
928                 } else {
929                         err = dump_write(dsa, type, zb->zb_object, offset,
930                             blksz, arc_buf_size(abuf), bp, abuf->b_data);
931                 }
932                 arc_buf_destroy(abuf, &abuf);
933         }
934
935         ASSERT(err == 0 || err == EINTR);
936         return (err);
937 }
938
939 /*
940  * Pop the new data off the queue, and free the old data.
941  */
942 static struct send_block_record *
943 get_next_record(bqueue_t *bq, struct send_block_record *data)
944 {
945         struct send_block_record *tmp = bqueue_dequeue(bq);
946         kmem_free(data, sizeof (*data));
947         return (tmp);
948 }
949
950 /*
951  * Actually do the bulk of the work in a zfs send.
952  *
953  * Note: Releases dp using the specified tag.
954  */
955 static int
956 dmu_send_impl(void *tag, dsl_pool_t *dp, dsl_dataset_t *to_ds,
957     zfs_bookmark_phys_t *ancestor_zb, boolean_t is_clone,
958     boolean_t embedok, boolean_t large_block_ok, boolean_t compressok,
959     boolean_t rawok, int outfd, uint64_t resumeobj, uint64_t resumeoff,
960     vnode_t *vp, offset_t *off)
961 {
962         objset_t *os;
963         dmu_replay_record_t *drr;
964         dmu_sendarg_t *dsp;
965         int err;
966         uint64_t fromtxg = 0;
967         uint64_t featureflags = 0;
968         struct send_thread_arg to_arg;
969         void *payload = NULL;
970         size_t payload_len = 0;
971         struct send_block_record *to_data;
972
973         err = dmu_objset_from_ds(to_ds, &os);
974         if (err != 0) {
975                 dsl_pool_rele(dp, tag);
976                 return (err);
977         }
978
979         /*
980          * If this is a non-raw send of an encrypted ds, we can ensure that
981          * the objset_phys_t is authenticated. This is safe because this is
982          * either a snapshot or we have owned the dataset, ensuring that
983          * it can't be modified.
984          */
985         if (!rawok && os->os_encrypted &&
986             arc_is_unauthenticated(os->os_phys_buf)) {
987                 zbookmark_phys_t zb;
988
989                 SET_BOOKMARK(&zb, to_ds->ds_object, ZB_ROOT_OBJECT,
990                     ZB_ROOT_LEVEL, ZB_ROOT_BLKID);
991                 err = arc_untransform(os->os_phys_buf, os->os_spa,
992                     &zb, B_FALSE);
993                 if (err != 0) {
994                         dsl_pool_rele(dp, tag);
995                         return (err);
996                 }
997
998                 ASSERT0(arc_is_unauthenticated(os->os_phys_buf));
999         }
1000
1001         drr = kmem_zalloc(sizeof (dmu_replay_record_t), KM_SLEEP);
1002         drr->drr_type = DRR_BEGIN;
1003         drr->drr_u.drr_begin.drr_magic = DMU_BACKUP_MAGIC;
1004         DMU_SET_STREAM_HDRTYPE(drr->drr_u.drr_begin.drr_versioninfo,
1005             DMU_SUBSTREAM);
1006
1007         bzero(&to_arg, sizeof (to_arg));
1008
1009 #ifdef _KERNEL
1010         if (dmu_objset_type(os) == DMU_OST_ZFS) {
1011                 uint64_t version;
1012                 if (zfs_get_zplprop(os, ZFS_PROP_VERSION, &version) != 0) {
1013                         kmem_free(drr, sizeof (dmu_replay_record_t));
1014                         dsl_pool_rele(dp, tag);
1015                         return (SET_ERROR(EINVAL));
1016                 }
1017                 if (version >= ZPL_VERSION_SA) {
1018                         featureflags |= DMU_BACKUP_FEATURE_SA_SPILL;
1019                 }
1020         }
1021 #endif
1022
1023         /* raw sends imply large_block_ok */
1024         if ((large_block_ok || rawok) &&
1025             dsl_dataset_feature_is_active(to_ds, SPA_FEATURE_LARGE_BLOCKS))
1026                 featureflags |= DMU_BACKUP_FEATURE_LARGE_BLOCKS;
1027         if (dsl_dataset_feature_is_active(to_ds, SPA_FEATURE_LARGE_DNODE))
1028                 featureflags |= DMU_BACKUP_FEATURE_LARGE_DNODE;
1029
1030         /* encrypted datasets will not have embedded blocks */
1031         if ((embedok || rawok) && !os->os_encrypted &&
1032             spa_feature_is_active(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA)) {
1033                 featureflags |= DMU_BACKUP_FEATURE_EMBED_DATA;
1034         }
1035
1036         /* raw send implies compressok */
1037         if (compressok || rawok)
1038                 featureflags |= DMU_BACKUP_FEATURE_COMPRESSED;
1039         if (rawok && os->os_encrypted)
1040                 featureflags |= DMU_BACKUP_FEATURE_RAW;
1041
1042         if ((featureflags &
1043             (DMU_BACKUP_FEATURE_EMBED_DATA | DMU_BACKUP_FEATURE_COMPRESSED |
1044             DMU_BACKUP_FEATURE_RAW)) != 0 &&
1045             spa_feature_is_active(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS)) {
1046                 featureflags |= DMU_BACKUP_FEATURE_LZ4;
1047         }
1048
1049         if (resumeobj != 0 || resumeoff != 0) {
1050                 featureflags |= DMU_BACKUP_FEATURE_RESUMING;
1051         }
1052
1053         DMU_SET_FEATUREFLAGS(drr->drr_u.drr_begin.drr_versioninfo,
1054             featureflags);
1055
1056         drr->drr_u.drr_begin.drr_creation_time =
1057             dsl_dataset_phys(to_ds)->ds_creation_time;
1058         drr->drr_u.drr_begin.drr_type = dmu_objset_type(os);
1059         if (is_clone)
1060                 drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_CLONE;
1061         drr->drr_u.drr_begin.drr_toguid = dsl_dataset_phys(to_ds)->ds_guid;
1062         if (dsl_dataset_phys(to_ds)->ds_flags & DS_FLAG_CI_DATASET)
1063                 drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_CI_DATA;
1064         if (zfs_send_set_freerecords_bit)
1065                 drr->drr_u.drr_begin.drr_flags |= DRR_FLAG_FREERECORDS;
1066
1067         if (ancestor_zb != NULL) {
1068                 drr->drr_u.drr_begin.drr_fromguid =
1069                     ancestor_zb->zbm_guid;
1070                 fromtxg = ancestor_zb->zbm_creation_txg;
1071         }
1072         dsl_dataset_name(to_ds, drr->drr_u.drr_begin.drr_toname);
1073         if (!to_ds->ds_is_snapshot) {
1074                 (void) strlcat(drr->drr_u.drr_begin.drr_toname, "@--head--",
1075                     sizeof (drr->drr_u.drr_begin.drr_toname));
1076         }
1077
1078         dsp = kmem_zalloc(sizeof (dmu_sendarg_t), KM_SLEEP);
1079
1080         dsp->dsa_drr = drr;
1081         dsp->dsa_vp = vp;
1082         dsp->dsa_outfd = outfd;
1083         dsp->dsa_proc = curproc;
1084         dsp->dsa_os = os;
1085         dsp->dsa_off = off;
1086         dsp->dsa_toguid = dsl_dataset_phys(to_ds)->ds_guid;
1087         dsp->dsa_pending_op = PENDING_NONE;
1088         dsp->dsa_featureflags = featureflags;
1089         dsp->dsa_resume_object = resumeobj;
1090         dsp->dsa_resume_offset = resumeoff;
1091
1092         mutex_enter(&to_ds->ds_sendstream_lock);
1093         list_insert_head(&to_ds->ds_sendstreams, dsp);
1094         mutex_exit(&to_ds->ds_sendstream_lock);
1095
1096         dsl_dataset_long_hold(to_ds, FTAG);
1097         dsl_pool_rele(dp, tag);
1098
1099         /* handle features that require a DRR_BEGIN payload */
1100         if (featureflags &
1101             (DMU_BACKUP_FEATURE_RESUMING | DMU_BACKUP_FEATURE_RAW)) {
1102                 nvlist_t *keynvl = NULL;
1103                 nvlist_t *nvl = fnvlist_alloc();
1104
1105                 if (featureflags & DMU_BACKUP_FEATURE_RESUMING) {
1106                         dmu_object_info_t to_doi;
1107                         err = dmu_object_info(os, resumeobj, &to_doi);
1108                         if (err != 0) {
1109                                 fnvlist_free(nvl);
1110                                 goto out;
1111                         }
1112
1113                         SET_BOOKMARK(&to_arg.resume, to_ds->ds_object,
1114                             resumeobj, 0,
1115                             resumeoff / to_doi.doi_data_block_size);
1116
1117                         fnvlist_add_uint64(nvl, "resume_object", resumeobj);
1118                         fnvlist_add_uint64(nvl, "resume_offset", resumeoff);
1119                 }
1120
1121                 if (featureflags & DMU_BACKUP_FEATURE_RAW) {
1122                         ASSERT(os->os_encrypted);
1123
1124                         err = dsl_crypto_populate_key_nvlist(to_ds, &keynvl);
1125                         if (err != 0) {
1126                                 fnvlist_free(nvl);
1127                                 goto out;
1128                         }
1129
1130                         fnvlist_add_nvlist(nvl, "crypt_keydata", keynvl);
1131                 }
1132
1133                 payload = fnvlist_pack(nvl, &payload_len);
1134                 drr->drr_payloadlen = payload_len;
1135                 fnvlist_free(keynvl);
1136                 fnvlist_free(nvl);
1137         }
1138
1139         err = dump_record(dsp, payload, payload_len);
1140         fnvlist_pack_free(payload, payload_len);
1141         if (err != 0) {
1142                 err = dsp->dsa_err;
1143                 goto out;
1144         }
1145
1146         err = bqueue_init(&to_arg.q,
1147             MAX(zfs_send_queue_length, 2 * zfs_max_recordsize),
1148             offsetof(struct send_block_record, ln));
1149         to_arg.error_code = 0;
1150         to_arg.cancel = B_FALSE;
1151         to_arg.ds = to_ds;
1152         to_arg.fromtxg = fromtxg;
1153         to_arg.flags = TRAVERSE_PRE | TRAVERSE_PREFETCH;
1154         if (rawok)
1155                 to_arg.flags |= TRAVERSE_NO_DECRYPT;
1156         (void) thread_create(NULL, 0, send_traverse_thread, &to_arg, 0, curproc,
1157             TS_RUN, minclsyspri);
1158
1159         to_data = bqueue_dequeue(&to_arg.q);
1160
1161         while (!to_data->eos_marker && err == 0) {
1162                 err = do_dump(dsp, to_data);
1163                 to_data = get_next_record(&to_arg.q, to_data);
1164                 if (issig(JUSTLOOKING) && issig(FORREAL))
1165                         err = EINTR;
1166         }
1167
1168         if (err != 0) {
1169                 to_arg.cancel = B_TRUE;
1170                 while (!to_data->eos_marker) {
1171                         to_data = get_next_record(&to_arg.q, to_data);
1172                 }
1173         }
1174         kmem_free(to_data, sizeof (*to_data));
1175
1176         bqueue_destroy(&to_arg.q);
1177
1178         if (err == 0 && to_arg.error_code != 0)
1179                 err = to_arg.error_code;
1180
1181         if (err != 0)
1182                 goto out;
1183
1184         if (dsp->dsa_pending_op != PENDING_NONE)
1185                 if (dump_record(dsp, NULL, 0) != 0)
1186                         err = SET_ERROR(EINTR);
1187
1188         if (err != 0) {
1189                 if (err == EINTR && dsp->dsa_err != 0)
1190                         err = dsp->dsa_err;
1191                 goto out;
1192         }
1193
1194         bzero(drr, sizeof (dmu_replay_record_t));
1195         drr->drr_type = DRR_END;
1196         drr->drr_u.drr_end.drr_checksum = dsp->dsa_zc;
1197         drr->drr_u.drr_end.drr_toguid = dsp->dsa_toguid;
1198
1199         if (dump_record(dsp, NULL, 0) != 0)
1200                 err = dsp->dsa_err;
1201 out:
1202         mutex_enter(&to_ds->ds_sendstream_lock);
1203         list_remove(&to_ds->ds_sendstreams, dsp);
1204         mutex_exit(&to_ds->ds_sendstream_lock);
1205
1206         VERIFY(err != 0 || (dsp->dsa_sent_begin && dsp->dsa_sent_end));
1207
1208         kmem_free(drr, sizeof (dmu_replay_record_t));
1209         kmem_free(dsp, sizeof (dmu_sendarg_t));
1210
1211         dsl_dataset_long_rele(to_ds, FTAG);
1212
1213         return (err);
1214 }
1215
1216 int
1217 dmu_send_obj(const char *pool, uint64_t tosnap, uint64_t fromsnap,
1218     boolean_t embedok, boolean_t large_block_ok, boolean_t compressok,
1219     boolean_t rawok, int outfd, vnode_t *vp, offset_t *off)
1220 {
1221         dsl_pool_t *dp;
1222         dsl_dataset_t *ds;
1223         dsl_dataset_t *fromds = NULL;
1224         ds_hold_flags_t dsflags = (rawok) ? 0 : DS_HOLD_FLAG_DECRYPT;
1225         int err;
1226
1227         err = dsl_pool_hold(pool, FTAG, &dp);
1228         if (err != 0)
1229                 return (err);
1230
1231         err = dsl_dataset_hold_obj_flags(dp, tosnap, dsflags, FTAG, &ds);
1232         if (err != 0) {
1233                 dsl_pool_rele(dp, FTAG);
1234                 return (err);
1235         }
1236
1237         if (fromsnap != 0) {
1238                 zfs_bookmark_phys_t zb;
1239                 boolean_t is_clone;
1240
1241                 err = dsl_dataset_hold_obj(dp, fromsnap, FTAG, &fromds);
1242                 if (err != 0) {
1243                         dsl_dataset_rele_flags(ds, dsflags, FTAG);
1244                         dsl_pool_rele(dp, FTAG);
1245                         return (err);
1246                 }
1247                 if (!dsl_dataset_is_before(ds, fromds, 0))
1248                         err = SET_ERROR(EXDEV);
1249                 zb.zbm_creation_time =
1250                     dsl_dataset_phys(fromds)->ds_creation_time;
1251                 zb.zbm_creation_txg = dsl_dataset_phys(fromds)->ds_creation_txg;
1252                 zb.zbm_guid = dsl_dataset_phys(fromds)->ds_guid;
1253                 is_clone = (fromds->ds_dir != ds->ds_dir);
1254                 dsl_dataset_rele(fromds, FTAG);
1255                 err = dmu_send_impl(FTAG, dp, ds, &zb, is_clone,
1256                     embedok, large_block_ok, compressok, rawok, outfd,
1257                     0, 0, vp, off);
1258         } else {
1259                 err = dmu_send_impl(FTAG, dp, ds, NULL, B_FALSE,
1260                     embedok, large_block_ok, compressok, rawok, outfd,
1261                     0, 0, vp, off);
1262         }
1263         dsl_dataset_rele_flags(ds, dsflags, FTAG);
1264         return (err);
1265 }
1266
1267 int
1268 dmu_send(const char *tosnap, const char *fromsnap, boolean_t embedok,
1269     boolean_t large_block_ok, boolean_t compressok, boolean_t rawok,
1270     int outfd, uint64_t resumeobj, uint64_t resumeoff, vnode_t *vp,
1271     offset_t *off)
1272 {
1273         dsl_pool_t *dp;
1274         dsl_dataset_t *ds;
1275         int err;
1276         ds_hold_flags_t dsflags = (rawok) ? 0 : DS_HOLD_FLAG_DECRYPT;
1277         boolean_t owned = B_FALSE;
1278
1279         if (fromsnap != NULL && strpbrk(fromsnap, "@#") == NULL)
1280                 return (SET_ERROR(EINVAL));
1281
1282         err = dsl_pool_hold(tosnap, FTAG, &dp);
1283         if (err != 0)
1284                 return (err);
1285
1286         if (strchr(tosnap, '@') == NULL && spa_writeable(dp->dp_spa)) {
1287                 /*
1288                  * We are sending a filesystem or volume.  Ensure
1289                  * that it doesn't change by owning the dataset.
1290                  */
1291                 err = dsl_dataset_own(dp, tosnap, dsflags, FTAG, &ds);
1292                 owned = B_TRUE;
1293         } else {
1294                 err = dsl_dataset_hold_flags(dp, tosnap, dsflags, FTAG, &ds);
1295         }
1296         if (err != 0) {
1297                 dsl_pool_rele(dp, FTAG);
1298                 return (err);
1299         }
1300
1301         if (fromsnap != NULL) {
1302                 zfs_bookmark_phys_t zb;
1303                 boolean_t is_clone = B_FALSE;
1304                 int fsnamelen = strchr(tosnap, '@') - tosnap;
1305
1306                 /*
1307                  * If the fromsnap is in a different filesystem, then
1308                  * mark the send stream as a clone.
1309                  */
1310                 if (strncmp(tosnap, fromsnap, fsnamelen) != 0 ||
1311                     (fromsnap[fsnamelen] != '@' &&
1312                     fromsnap[fsnamelen] != '#')) {
1313                         is_clone = B_TRUE;
1314                 }
1315
1316                 if (strchr(fromsnap, '@')) {
1317                         dsl_dataset_t *fromds;
1318                         err = dsl_dataset_hold(dp, fromsnap, FTAG, &fromds);
1319                         if (err == 0) {
1320                                 if (!dsl_dataset_is_before(ds, fromds, 0))
1321                                         err = SET_ERROR(EXDEV);
1322                                 zb.zbm_creation_time =
1323                                     dsl_dataset_phys(fromds)->ds_creation_time;
1324                                 zb.zbm_creation_txg =
1325                                     dsl_dataset_phys(fromds)->ds_creation_txg;
1326                                 zb.zbm_guid = dsl_dataset_phys(fromds)->ds_guid;
1327                                 is_clone = (ds->ds_dir != fromds->ds_dir);
1328                                 dsl_dataset_rele(fromds, FTAG);
1329                         }
1330                 } else {
1331                         err = dsl_bookmark_lookup(dp, fromsnap, ds, &zb);
1332                 }
1333                 if (err != 0) {
1334                         if (owned)
1335                                 dsl_dataset_disown(ds, dsflags, FTAG);
1336                         else
1337                                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
1338
1339                         dsl_pool_rele(dp, FTAG);
1340                         return (err);
1341                 }
1342                 err = dmu_send_impl(FTAG, dp, ds, &zb, is_clone,
1343                     embedok, large_block_ok, compressok, rawok,
1344                     outfd, resumeobj, resumeoff, vp, off);
1345         } else {
1346                 err = dmu_send_impl(FTAG, dp, ds, NULL, B_FALSE,
1347                     embedok, large_block_ok, compressok, rawok,
1348                     outfd, resumeobj, resumeoff, vp, off);
1349         }
1350         if (owned)
1351                 dsl_dataset_disown(ds, dsflags, FTAG);
1352         else
1353                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
1354
1355         return (err);
1356 }
1357
1358 static int
1359 dmu_adjust_send_estimate_for_indirects(dsl_dataset_t *ds, uint64_t uncompressed,
1360     uint64_t compressed, boolean_t stream_compressed, uint64_t *sizep)
1361 {
1362         int err = 0;
1363         uint64_t size;
1364         /*
1365          * Assume that space (both on-disk and in-stream) is dominated by
1366          * data.  We will adjust for indirect blocks and the copies property,
1367          * but ignore per-object space used (eg, dnodes and DRR_OBJECT records).
1368          */
1369
1370         uint64_t recordsize;
1371         uint64_t record_count;
1372         objset_t *os;
1373         VERIFY0(dmu_objset_from_ds(ds, &os));
1374
1375         /* Assume all (uncompressed) blocks are recordsize. */
1376         if (zfs_override_estimate_recordsize != 0) {
1377                 recordsize = zfs_override_estimate_recordsize;
1378         } else if (os->os_phys->os_type == DMU_OST_ZVOL) {
1379                 err = dsl_prop_get_int_ds(ds,
1380                     zfs_prop_to_name(ZFS_PROP_VOLBLOCKSIZE), &recordsize);
1381         } else {
1382                 err = dsl_prop_get_int_ds(ds,
1383                     zfs_prop_to_name(ZFS_PROP_RECORDSIZE), &recordsize);
1384         }
1385         if (err != 0)
1386                 return (err);
1387         record_count = uncompressed / recordsize;
1388
1389         /*
1390          * If we're estimating a send size for a compressed stream, use the
1391          * compressed data size to estimate the stream size. Otherwise, use the
1392          * uncompressed data size.
1393          */
1394         size = stream_compressed ? compressed : uncompressed;
1395
1396         /*
1397          * Subtract out approximate space used by indirect blocks.
1398          * Assume most space is used by data blocks (non-indirect, non-dnode).
1399          * Assume no ditto blocks or internal fragmentation.
1400          *
1401          * Therefore, space used by indirect blocks is sizeof(blkptr_t) per
1402          * block.
1403          */
1404         size -= record_count * sizeof (blkptr_t);
1405
1406         /* Add in the space for the record associated with each block. */
1407         size += record_count * sizeof (dmu_replay_record_t);
1408
1409         *sizep = size;
1410
1411         return (0);
1412 }
1413
1414 int
1415 dmu_send_estimate(dsl_dataset_t *ds, dsl_dataset_t *fromds,
1416     boolean_t stream_compressed, uint64_t *sizep)
1417 {
1418         int err;
1419         uint64_t uncomp, comp;
1420
1421         ASSERT(dsl_pool_config_held(ds->ds_dir->dd_pool));
1422
1423         /* tosnap must be a snapshot */
1424         if (!ds->ds_is_snapshot)
1425                 return (SET_ERROR(EINVAL));
1426
1427         /* fromsnap, if provided, must be a snapshot */
1428         if (fromds != NULL && !fromds->ds_is_snapshot)
1429                 return (SET_ERROR(EINVAL));
1430
1431         /*
1432          * fromsnap must be an earlier snapshot from the same fs as tosnap,
1433          * or the origin's fs.
1434          */
1435         if (fromds != NULL && !dsl_dataset_is_before(ds, fromds, 0))
1436                 return (SET_ERROR(EXDEV));
1437
1438         /* Get compressed and uncompressed size estimates of changed data. */
1439         if (fromds == NULL) {
1440                 uncomp = dsl_dataset_phys(ds)->ds_uncompressed_bytes;
1441                 comp = dsl_dataset_phys(ds)->ds_compressed_bytes;
1442         } else {
1443                 uint64_t used;
1444                 err = dsl_dataset_space_written(fromds, ds,
1445                     &used, &comp, &uncomp);
1446                 if (err != 0)
1447                         return (err);
1448         }
1449
1450         err = dmu_adjust_send_estimate_for_indirects(ds, uncomp, comp,
1451             stream_compressed, sizep);
1452         /*
1453          * Add the size of the BEGIN and END records to the estimate.
1454          */
1455         *sizep += 2 * sizeof (dmu_replay_record_t);
1456         return (err);
1457 }
1458
1459 struct calculate_send_arg {
1460         uint64_t uncompressed;
1461         uint64_t compressed;
1462 };
1463
1464 /*
1465  * Simple callback used to traverse the blocks of a snapshot and sum their
1466  * uncompressed and compressed sizes.
1467  */
1468 /* ARGSUSED */
1469 static int
1470 dmu_calculate_send_traversal(spa_t *spa, zilog_t *zilog, const blkptr_t *bp,
1471     const zbookmark_phys_t *zb, const dnode_phys_t *dnp, void *arg)
1472 {
1473         struct calculate_send_arg *space = arg;
1474         if (bp != NULL && !BP_IS_HOLE(bp)) {
1475                 space->uncompressed += BP_GET_UCSIZE(bp);
1476                 space->compressed += BP_GET_PSIZE(bp);
1477         }
1478         return (0);
1479 }
1480
1481 /*
1482  * Given a desination snapshot and a TXG, calculate the approximate size of a
1483  * send stream sent from that TXG. from_txg may be zero, indicating that the
1484  * whole snapshot will be sent.
1485  */
1486 int
1487 dmu_send_estimate_from_txg(dsl_dataset_t *ds, uint64_t from_txg,
1488     boolean_t stream_compressed, uint64_t *sizep)
1489 {
1490         int err;
1491         struct calculate_send_arg size = { 0 };
1492
1493         ASSERT(dsl_pool_config_held(ds->ds_dir->dd_pool));
1494
1495         /* tosnap must be a snapshot */
1496         if (!dsl_dataset_is_snapshot(ds))
1497                 return (SET_ERROR(EINVAL));
1498
1499         /* verify that from_txg is before the provided snapshot was taken */
1500         if (from_txg >= dsl_dataset_phys(ds)->ds_creation_txg) {
1501                 return (SET_ERROR(EXDEV));
1502         }
1503         /*
1504          * traverse the blocks of the snapshot with birth times after
1505          * from_txg, summing their uncompressed size
1506          */
1507         err = traverse_dataset(ds, from_txg,
1508             TRAVERSE_POST | TRAVERSE_NO_DECRYPT,
1509             dmu_calculate_send_traversal, &size);
1510
1511         if (err)
1512                 return (err);
1513
1514         err = dmu_adjust_send_estimate_for_indirects(ds, size.uncompressed,
1515             size.compressed, stream_compressed, sizep);
1516         return (err);
1517 }
1518
1519
1520 #if defined(_KERNEL)
1521 /* BEGIN CSTYLED */
1522 module_param(zfs_override_estimate_recordsize, ulong, 0644);
1523 MODULE_PARM_DESC(zfs_override_estimate_recordsize,
1524         "Record size calculation override for zfs send estimates");
1525 /* END CSTYLED */
1526
1527 module_param(zfs_send_corrupt_data, int, 0644);
1528 MODULE_PARM_DESC(zfs_send_corrupt_data, "Allow sending corrupt data");
1529
1530 module_param(zfs_send_queue_length, int, 0644);
1531 MODULE_PARM_DESC(zfs_send_queue_length, "Maximum send queue length");
1532 #endif