]> granicus.if.org Git - zfs/blob - module/zfs/dmu_recv.c
257f157fd9958e545ee62a66f7432e276f00a2a3
[zfs] / module / zfs / dmu_recv.c
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23  * Copyright 2011 Nexenta Systems, Inc. All rights reserved.
24  * Copyright (c) 2011, 2015 by Delphix. All rights reserved.
25  * Copyright (c) 2014, Joyent, Inc. All rights reserved.
26  * Copyright 2014 HybridCluster. All rights reserved.
27  * Copyright 2016 RackTop Systems.
28  * Copyright (c) 2016 Actifio, Inc. All rights reserved.
29  */
30
31 #include <sys/dmu.h>
32 #include <sys/dmu_impl.h>
33 #include <sys/dmu_tx.h>
34 #include <sys/dbuf.h>
35 #include <sys/dnode.h>
36 #include <sys/zfs_context.h>
37 #include <sys/dmu_objset.h>
38 #include <sys/dmu_traverse.h>
39 #include <sys/dsl_dataset.h>
40 #include <sys/dsl_dir.h>
41 #include <sys/dsl_prop.h>
42 #include <sys/dsl_pool.h>
43 #include <sys/dsl_synctask.h>
44 #include <sys/spa_impl.h>
45 #include <sys/zfs_ioctl.h>
46 #include <sys/zap.h>
47 #include <sys/zio_checksum.h>
48 #include <sys/zfs_znode.h>
49 #include <zfs_fletcher.h>
50 #include <sys/avl.h>
51 #include <sys/ddt.h>
52 #include <sys/zfs_onexit.h>
53 #include <sys/dmu_recv.h>
54 #include <sys/dsl_destroy.h>
55 #include <sys/blkptr.h>
56 #include <sys/dsl_bookmark.h>
57 #include <sys/zfeature.h>
58 #include <sys/bqueue.h>
59 #include <sys/zvol.h>
60 #include <sys/policy.h>
61
62 int zfs_recv_queue_length = SPA_MAXBLOCKSIZE;
63
64 static char *dmu_recv_tag = "dmu_recv_tag";
65 const char *recv_clone_name = "%recv";
66
67 static void byteswap_record(dmu_replay_record_t *drr);
68
69 typedef struct dmu_recv_begin_arg {
70         const char *drba_origin;
71         dmu_recv_cookie_t *drba_cookie;
72         cred_t *drba_cred;
73         dsl_crypto_params_t *drba_dcp;
74         uint64_t drba_snapobj;
75 } dmu_recv_begin_arg_t;
76
77 static int
78 recv_begin_check_existing_impl(dmu_recv_begin_arg_t *drba, dsl_dataset_t *ds,
79     uint64_t fromguid, uint64_t featureflags)
80 {
81         uint64_t val;
82         int error;
83         dsl_pool_t *dp = ds->ds_dir->dd_pool;
84         boolean_t encrypted = ds->ds_dir->dd_crypto_obj != 0;
85         boolean_t raw = (featureflags & DMU_BACKUP_FEATURE_RAW) != 0;
86         boolean_t embed = (featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) != 0;
87
88         /* temporary clone name must not exist */
89         error = zap_lookup(dp->dp_meta_objset,
90             dsl_dir_phys(ds->ds_dir)->dd_child_dir_zapobj, recv_clone_name,
91             8, 1, &val);
92         if (error != ENOENT)
93                 return (error == 0 ? EBUSY : error);
94
95         /* new snapshot name must not exist */
96         error = zap_lookup(dp->dp_meta_objset,
97             dsl_dataset_phys(ds)->ds_snapnames_zapobj,
98             drba->drba_cookie->drc_tosnap, 8, 1, &val);
99         if (error != ENOENT)
100                 return (error == 0 ? EEXIST : error);
101
102         /*
103          * Check snapshot limit before receiving. We'll recheck again at the
104          * end, but might as well abort before receiving if we're already over
105          * the limit.
106          *
107          * Note that we do not check the file system limit with
108          * dsl_dir_fscount_check because the temporary %clones don't count
109          * against that limit.
110          */
111         error = dsl_fs_ss_limit_check(ds->ds_dir, 1, ZFS_PROP_SNAPSHOT_LIMIT,
112             NULL, drba->drba_cred);
113         if (error != 0)
114                 return (error);
115
116         if (fromguid != 0) {
117                 dsl_dataset_t *snap;
118                 uint64_t obj = dsl_dataset_phys(ds)->ds_prev_snap_obj;
119
120                 /* Can't perform a raw receive on top of a non-raw receive */
121                 if (!encrypted && raw)
122                         return (SET_ERROR(EINVAL));
123
124                 /* Encryption is incompatible with embedded data */
125                 if (encrypted && embed)
126                         return (SET_ERROR(EINVAL));
127
128                 /* Find snapshot in this dir that matches fromguid. */
129                 while (obj != 0) {
130                         error = dsl_dataset_hold_obj(dp, obj, FTAG,
131                             &snap);
132                         if (error != 0)
133                                 return (SET_ERROR(ENODEV));
134                         if (snap->ds_dir != ds->ds_dir) {
135                                 dsl_dataset_rele(snap, FTAG);
136                                 return (SET_ERROR(ENODEV));
137                         }
138                         if (dsl_dataset_phys(snap)->ds_guid == fromguid)
139                                 break;
140                         obj = dsl_dataset_phys(snap)->ds_prev_snap_obj;
141                         dsl_dataset_rele(snap, FTAG);
142                 }
143                 if (obj == 0)
144                         return (SET_ERROR(ENODEV));
145
146                 if (drba->drba_cookie->drc_force) {
147                         drba->drba_snapobj = obj;
148                 } else {
149                         /*
150                          * If we are not forcing, there must be no
151                          * changes since fromsnap.
152                          */
153                         if (dsl_dataset_modified_since_snap(ds, snap)) {
154                                 dsl_dataset_rele(snap, FTAG);
155                                 return (SET_ERROR(ETXTBSY));
156                         }
157                         drba->drba_snapobj = ds->ds_prev->ds_object;
158                 }
159
160                 dsl_dataset_rele(snap, FTAG);
161         } else {
162                 /* if full, then must be forced */
163                 if (!drba->drba_cookie->drc_force)
164                         return (SET_ERROR(EEXIST));
165
166                 /*
167                  * We don't support using zfs recv -F to blow away
168                  * encrypted filesystems. This would require the
169                  * dsl dir to point to the old encryption key and
170                  * the new one at the same time during the receive.
171                  */
172                 if ((!encrypted && raw) || encrypted)
173                         return (SET_ERROR(EINVAL));
174
175                 /*
176                  * Perform the same encryption checks we would if
177                  * we were creating a new dataset from scratch.
178                  */
179                 if (!raw) {
180                         boolean_t will_encrypt;
181
182                         error = dmu_objset_create_crypt_check(
183                             ds->ds_dir->dd_parent, drba->drba_dcp,
184                             &will_encrypt);
185                         if (error != 0)
186                                 return (error);
187
188                         if (will_encrypt && embed)
189                                 return (SET_ERROR(EINVAL));
190                 }
191
192                 drba->drba_snapobj = 0;
193         }
194
195         return (0);
196
197 }
198
199 static int
200 dmu_recv_begin_check(void *arg, dmu_tx_t *tx)
201 {
202         dmu_recv_begin_arg_t *drba = arg;
203         dsl_pool_t *dp = dmu_tx_pool(tx);
204         struct drr_begin *drrb = drba->drba_cookie->drc_drrb;
205         uint64_t fromguid = drrb->drr_fromguid;
206         int flags = drrb->drr_flags;
207         ds_hold_flags_t dsflags = 0;
208         int error;
209         uint64_t featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo);
210         dsl_dataset_t *ds;
211         const char *tofs = drba->drba_cookie->drc_tofs;
212
213         /* already checked */
214         ASSERT3U(drrb->drr_magic, ==, DMU_BACKUP_MAGIC);
215         ASSERT(!(featureflags & DMU_BACKUP_FEATURE_RESUMING));
216
217         if (DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) ==
218             DMU_COMPOUNDSTREAM ||
219             drrb->drr_type >= DMU_OST_NUMTYPES ||
220             ((flags & DRR_FLAG_CLONE) && drba->drba_origin == NULL))
221                 return (SET_ERROR(EINVAL));
222
223         /* Verify pool version supports SA if SA_SPILL feature set */
224         if ((featureflags & DMU_BACKUP_FEATURE_SA_SPILL) &&
225             spa_version(dp->dp_spa) < SPA_VERSION_SA)
226                 return (SET_ERROR(ENOTSUP));
227
228         if (drba->drba_cookie->drc_resumable &&
229             !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EXTENSIBLE_DATASET))
230                 return (SET_ERROR(ENOTSUP));
231
232         /*
233          * The receiving code doesn't know how to translate a WRITE_EMBEDDED
234          * record to a plain WRITE record, so the pool must have the
235          * EMBEDDED_DATA feature enabled if the stream has WRITE_EMBEDDED
236          * records.  Same with WRITE_EMBEDDED records that use LZ4 compression.
237          */
238         if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) &&
239             !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA))
240                 return (SET_ERROR(ENOTSUP));
241         if ((featureflags & DMU_BACKUP_FEATURE_LZ4) &&
242             !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS))
243                 return (SET_ERROR(ENOTSUP));
244
245         /*
246          * The receiving code doesn't know how to translate large blocks
247          * to smaller ones, so the pool must have the LARGE_BLOCKS
248          * feature enabled if the stream has LARGE_BLOCKS. Same with
249          * large dnodes.
250          */
251         if ((featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) &&
252             !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LARGE_BLOCKS))
253                 return (SET_ERROR(ENOTSUP));
254         if ((featureflags & DMU_BACKUP_FEATURE_LARGE_DNODE) &&
255             !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LARGE_DNODE))
256                 return (SET_ERROR(ENOTSUP));
257
258         if (featureflags & DMU_BACKUP_FEATURE_RAW) {
259                 /* raw receives require the encryption feature */
260                 if (!spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_ENCRYPTION))
261                         return (SET_ERROR(ENOTSUP));
262
263                 /* embedded data is incompatible with encryption and raw recv */
264                 if (featureflags & DMU_BACKUP_FEATURE_EMBED_DATA)
265                         return (SET_ERROR(EINVAL));
266         } else {
267                 dsflags |= DS_HOLD_FLAG_DECRYPT;
268         }
269
270         error = dsl_dataset_hold_flags(dp, tofs, dsflags, FTAG, &ds);
271         if (error == 0) {
272                 /* target fs already exists; recv into temp clone */
273
274                 /* Can't recv a clone into an existing fs */
275                 if (flags & DRR_FLAG_CLONE || drba->drba_origin) {
276                         dsl_dataset_rele_flags(ds, dsflags, FTAG);
277                         return (SET_ERROR(EINVAL));
278                 }
279
280                 error = recv_begin_check_existing_impl(drba, ds, fromguid,
281                     featureflags);
282                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
283         } else if (error == ENOENT) {
284                 /* target fs does not exist; must be a full backup or clone */
285                 char buf[ZFS_MAX_DATASET_NAME_LEN];
286
287                 /*
288                  * If it's a non-clone incremental, we are missing the
289                  * target fs, so fail the recv.
290                  */
291                 if (fromguid != 0 && !(flags & DRR_FLAG_CLONE ||
292                     drba->drba_origin))
293                         return (SET_ERROR(ENOENT));
294
295                 /*
296                  * If we're receiving a full send as a clone, and it doesn't
297                  * contain all the necessary free records and freeobject
298                  * records, reject it.
299                  */
300                 if (fromguid == 0 && drba->drba_origin &&
301                     !(flags & DRR_FLAG_FREERECORDS))
302                         return (SET_ERROR(EINVAL));
303
304                 /* Open the parent of tofs */
305                 ASSERT3U(strlen(tofs), <, sizeof (buf));
306                 (void) strlcpy(buf, tofs, strrchr(tofs, '/') - tofs + 1);
307                 error = dsl_dataset_hold_flags(dp, buf, dsflags, FTAG, &ds);
308                 if (error != 0)
309                         return (error);
310
311                 if ((featureflags & DMU_BACKUP_FEATURE_RAW) == 0 &&
312                     drba->drba_origin == NULL) {
313                         boolean_t will_encrypt;
314
315                         /*
316                          * Check that we aren't breaking any encryption rules
317                          * and that we have all the parameters we need to
318                          * create an encrypted dataset if necessary. If we are
319                          * making an encrypted dataset the stream can't have
320                          * embedded data.
321                          */
322                         error = dmu_objset_create_crypt_check(ds->ds_dir,
323                             drba->drba_dcp, &will_encrypt);
324                         if (error != 0) {
325                                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
326                                 return (error);
327                         }
328
329                         if (will_encrypt &&
330                             (featureflags & DMU_BACKUP_FEATURE_EMBED_DATA)) {
331                                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
332                                 return (SET_ERROR(EINVAL));
333                         }
334                 }
335
336                 /*
337                  * Check filesystem and snapshot limits before receiving. We'll
338                  * recheck snapshot limits again at the end (we create the
339                  * filesystems and increment those counts during begin_sync).
340                  */
341                 error = dsl_fs_ss_limit_check(ds->ds_dir, 1,
342                     ZFS_PROP_FILESYSTEM_LIMIT, NULL, drba->drba_cred);
343                 if (error != 0) {
344                         dsl_dataset_rele_flags(ds, dsflags, FTAG);
345                         return (error);
346                 }
347
348                 error = dsl_fs_ss_limit_check(ds->ds_dir, 1,
349                     ZFS_PROP_SNAPSHOT_LIMIT, NULL, drba->drba_cred);
350                 if (error != 0) {
351                         dsl_dataset_rele_flags(ds, dsflags, FTAG);
352                         return (error);
353                 }
354
355                 if (drba->drba_origin != NULL) {
356                         dsl_dataset_t *origin;
357
358                         error = dsl_dataset_hold_flags(dp, drba->drba_origin,
359                             dsflags, FTAG, &origin);
360                         if (error != 0) {
361                                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
362                                 return (error);
363                         }
364                         if (!origin->ds_is_snapshot) {
365                                 dsl_dataset_rele_flags(origin, dsflags, FTAG);
366                                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
367                                 return (SET_ERROR(EINVAL));
368                         }
369                         if (dsl_dataset_phys(origin)->ds_guid != fromguid &&
370                             fromguid != 0) {
371                                 dsl_dataset_rele_flags(origin, dsflags, FTAG);
372                                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
373                                 return (SET_ERROR(ENODEV));
374                         }
375                         if (origin->ds_dir->dd_crypto_obj != 0 &&
376                             (featureflags & DMU_BACKUP_FEATURE_EMBED_DATA)) {
377                                 dsl_dataset_rele_flags(origin, dsflags, FTAG);
378                                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
379                                 return (SET_ERROR(EINVAL));
380                         }
381                         dsl_dataset_rele_flags(origin,
382                             dsflags, FTAG);
383                 }
384                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
385                 error = 0;
386         }
387         return (error);
388 }
389
390 static void
391 dmu_recv_begin_sync(void *arg, dmu_tx_t *tx)
392 {
393         dmu_recv_begin_arg_t *drba = arg;
394         dsl_pool_t *dp = dmu_tx_pool(tx);
395         objset_t *mos = dp->dp_meta_objset;
396         struct drr_begin *drrb = drba->drba_cookie->drc_drrb;
397         const char *tofs = drba->drba_cookie->drc_tofs;
398         uint64_t featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo);
399         dsl_dataset_t *ds, *newds;
400         objset_t *os;
401         uint64_t dsobj;
402         ds_hold_flags_t dsflags = 0;
403         int error;
404         uint64_t crflags = 0;
405         dsl_crypto_params_t dummy_dcp = { 0 };
406         dsl_crypto_params_t *dcp = drba->drba_dcp;
407
408         if (drrb->drr_flags & DRR_FLAG_CI_DATA)
409                 crflags |= DS_FLAG_CI_DATASET;
410
411         if ((featureflags & DMU_BACKUP_FEATURE_RAW) == 0)
412                 dsflags |= DS_HOLD_FLAG_DECRYPT;
413
414         /*
415          * Raw, non-incremental recvs always use a dummy dcp with
416          * the raw cmd set. Raw incremental recvs do not use a dcp
417          * since the encryption parameters are already set in stone.
418          */
419         if (dcp == NULL && drba->drba_snapobj == 0 &&
420             drba->drba_origin == NULL) {
421                 ASSERT3P(dcp, ==, NULL);
422                 dcp = &dummy_dcp;
423
424                 if (featureflags & DMU_BACKUP_FEATURE_RAW)
425                         dcp->cp_cmd = DCP_CMD_RAW_RECV;
426         }
427
428         error = dsl_dataset_hold_flags(dp, tofs, dsflags, FTAG, &ds);
429         if (error == 0) {
430                 /* create temporary clone */
431                 dsl_dataset_t *snap = NULL;
432
433                 if (drba->drba_snapobj != 0) {
434                         VERIFY0(dsl_dataset_hold_obj(dp,
435                             drba->drba_snapobj, FTAG, &snap));
436                         ASSERT3P(dcp, ==, NULL);
437                 }
438
439                 dsobj = dsl_dataset_create_sync(ds->ds_dir, recv_clone_name,
440                     snap, crflags, drba->drba_cred, dcp, tx);
441                 if (drba->drba_snapobj != 0)
442                         dsl_dataset_rele(snap, FTAG);
443                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
444         } else {
445                 dsl_dir_t *dd;
446                 const char *tail;
447                 dsl_dataset_t *origin = NULL;
448
449                 VERIFY0(dsl_dir_hold(dp, tofs, FTAG, &dd, &tail));
450
451                 if (drba->drba_origin != NULL) {
452                         VERIFY0(dsl_dataset_hold(dp, drba->drba_origin,
453                             FTAG, &origin));
454                         ASSERT3P(dcp, ==, NULL);
455                 }
456
457                 /* Create new dataset. */
458                 dsobj = dsl_dataset_create_sync(dd, strrchr(tofs, '/') + 1,
459                     origin, crflags, drba->drba_cred, dcp, tx);
460                 if (origin != NULL)
461                         dsl_dataset_rele(origin, FTAG);
462                 dsl_dir_rele(dd, FTAG);
463                 drba->drba_cookie->drc_newfs = B_TRUE;
464         }
465
466         VERIFY0(dsl_dataset_own_obj(dp, dsobj, dsflags, dmu_recv_tag, &newds));
467         VERIFY0(dmu_objset_from_ds(newds, &os));
468
469         if (drba->drba_cookie->drc_resumable) {
470                 dsl_dataset_zapify(newds, tx);
471                 if (drrb->drr_fromguid != 0) {
472                         VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_FROMGUID,
473                             8, 1, &drrb->drr_fromguid, tx));
474                 }
475                 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_TOGUID,
476                     8, 1, &drrb->drr_toguid, tx));
477                 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_TONAME,
478                     1, strlen(drrb->drr_toname) + 1, drrb->drr_toname, tx));
479                 uint64_t one = 1;
480                 uint64_t zero = 0;
481                 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_OBJECT,
482                     8, 1, &one, tx));
483                 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_OFFSET,
484                     8, 1, &zero, tx));
485                 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_BYTES,
486                     8, 1, &zero, tx));
487                 if (featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) {
488                         VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_LARGEBLOCK,
489                             8, 1, &one, tx));
490                 }
491                 if (featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) {
492                         VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_EMBEDOK,
493                             8, 1, &one, tx));
494                 }
495                 if (featureflags & DMU_BACKUP_FEATURE_COMPRESSED) {
496                         VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_COMPRESSOK,
497                             8, 1, &one, tx));
498                 }
499                 if (featureflags & DMU_BACKUP_FEATURE_RAW) {
500                         VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_RAWOK,
501                             8, 1, &one, tx));
502                 }
503         }
504
505         /*
506          * Usually the os->os_encrypted value is tied to the presence of a
507          * DSL Crypto Key object in the dd. However, that will not be received
508          * until dmu_recv_stream(), so we set the value manually for now.
509          */
510         if (featureflags & DMU_BACKUP_FEATURE_RAW) {
511                 os->os_encrypted = B_TRUE;
512                 drba->drba_cookie->drc_raw = B_TRUE;
513         }
514
515         dmu_buf_will_dirty(newds->ds_dbuf, tx);
516         dsl_dataset_phys(newds)->ds_flags |= DS_FLAG_INCONSISTENT;
517
518         /*
519          * If we actually created a non-clone, we need to create the objset
520          * in our new dataset. If this is a raw send we postpone this until
521          * dmu_recv_stream() so that we can allocate the metadnode with the
522          * properties from the DRR_BEGIN payload.
523          */
524         rrw_enter(&newds->ds_bp_rwlock, RW_READER, FTAG);
525         if (BP_IS_HOLE(dsl_dataset_get_blkptr(newds)) &&
526             (featureflags & DMU_BACKUP_FEATURE_RAW) == 0) {
527                 (void) dmu_objset_create_impl(dp->dp_spa,
528                     newds, dsl_dataset_get_blkptr(newds), drrb->drr_type, tx);
529         }
530         rrw_exit(&newds->ds_bp_rwlock, FTAG);
531
532         drba->drba_cookie->drc_ds = newds;
533
534         spa_history_log_internal_ds(newds, "receive", tx, "");
535 }
536
537 static int
538 dmu_recv_resume_begin_check(void *arg, dmu_tx_t *tx)
539 {
540         dmu_recv_begin_arg_t *drba = arg;
541         dsl_pool_t *dp = dmu_tx_pool(tx);
542         struct drr_begin *drrb = drba->drba_cookie->drc_drrb;
543         int error;
544         ds_hold_flags_t dsflags = 0;
545         uint64_t featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo);
546         dsl_dataset_t *ds;
547         const char *tofs = drba->drba_cookie->drc_tofs;
548
549         /* already checked */
550         ASSERT3U(drrb->drr_magic, ==, DMU_BACKUP_MAGIC);
551         ASSERT(featureflags & DMU_BACKUP_FEATURE_RESUMING);
552
553         if (DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) ==
554             DMU_COMPOUNDSTREAM ||
555             drrb->drr_type >= DMU_OST_NUMTYPES)
556                 return (SET_ERROR(EINVAL));
557
558         /* Verify pool version supports SA if SA_SPILL feature set */
559         if ((featureflags & DMU_BACKUP_FEATURE_SA_SPILL) &&
560             spa_version(dp->dp_spa) < SPA_VERSION_SA)
561                 return (SET_ERROR(ENOTSUP));
562
563         /*
564          * The receiving code doesn't know how to translate a WRITE_EMBEDDED
565          * record to a plain WRITE record, so the pool must have the
566          * EMBEDDED_DATA feature enabled if the stream has WRITE_EMBEDDED
567          * records.  Same with WRITE_EMBEDDED records that use LZ4 compression.
568          */
569         if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) &&
570             !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA))
571                 return (SET_ERROR(ENOTSUP));
572         if ((featureflags & DMU_BACKUP_FEATURE_LZ4) &&
573             !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS))
574                 return (SET_ERROR(ENOTSUP));
575
576         /*
577          * The receiving code doesn't know how to translate large blocks
578          * to smaller ones, so the pool must have the LARGE_BLOCKS
579          * feature enabled if the stream has LARGE_BLOCKS. Same with
580          * large dnodes.
581          */
582         if ((featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) &&
583             !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LARGE_BLOCKS))
584                 return (SET_ERROR(ENOTSUP));
585         if ((featureflags & DMU_BACKUP_FEATURE_LARGE_DNODE) &&
586             !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LARGE_DNODE))
587                 return (SET_ERROR(ENOTSUP));
588
589         /* 6 extra bytes for /%recv */
590         char recvname[ZFS_MAX_DATASET_NAME_LEN + 6];
591         (void) snprintf(recvname, sizeof (recvname), "%s/%s",
592             tofs, recv_clone_name);
593
594         if ((featureflags & DMU_BACKUP_FEATURE_RAW) == 0)
595                 dsflags |= DS_HOLD_FLAG_DECRYPT;
596
597         if (dsl_dataset_hold_flags(dp, recvname, dsflags, FTAG, &ds) != 0) {
598                 /* %recv does not exist; continue in tofs */
599                 error = dsl_dataset_hold_flags(dp, tofs, dsflags, FTAG, &ds);
600                 if (error != 0)
601                         return (error);
602         }
603
604         /* check that ds is marked inconsistent */
605         if (!DS_IS_INCONSISTENT(ds)) {
606                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
607                 return (SET_ERROR(EINVAL));
608         }
609
610         /* check that there is resuming data, and that the toguid matches */
611         if (!dsl_dataset_is_zapified(ds)) {
612                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
613                 return (SET_ERROR(EINVAL));
614         }
615         uint64_t val;
616         error = zap_lookup(dp->dp_meta_objset, ds->ds_object,
617             DS_FIELD_RESUME_TOGUID, sizeof (val), 1, &val);
618         if (error != 0 || drrb->drr_toguid != val) {
619                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
620                 return (SET_ERROR(EINVAL));
621         }
622
623         /*
624          * Check if the receive is still running.  If so, it will be owned.
625          * Note that nothing else can own the dataset (e.g. after the receive
626          * fails) because it will be marked inconsistent.
627          */
628         if (dsl_dataset_has_owner(ds)) {
629                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
630                 return (SET_ERROR(EBUSY));
631         }
632
633         /* There should not be any snapshots of this fs yet. */
634         if (ds->ds_prev != NULL && ds->ds_prev->ds_dir == ds->ds_dir) {
635                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
636                 return (SET_ERROR(EINVAL));
637         }
638
639         /*
640          * Note: resume point will be checked when we process the first WRITE
641          * record.
642          */
643
644         /* check that the origin matches */
645         val = 0;
646         (void) zap_lookup(dp->dp_meta_objset, ds->ds_object,
647             DS_FIELD_RESUME_FROMGUID, sizeof (val), 1, &val);
648         if (drrb->drr_fromguid != val) {
649                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
650                 return (SET_ERROR(EINVAL));
651         }
652
653         dsl_dataset_rele_flags(ds, dsflags, FTAG);
654         return (0);
655 }
656
657 static void
658 dmu_recv_resume_begin_sync(void *arg, dmu_tx_t *tx)
659 {
660         dmu_recv_begin_arg_t *drba = arg;
661         dsl_pool_t *dp = dmu_tx_pool(tx);
662         const char *tofs = drba->drba_cookie->drc_tofs;
663         struct drr_begin *drrb = drba->drba_cookie->drc_drrb;
664         uint64_t featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo);
665         dsl_dataset_t *ds;
666         objset_t *os;
667         ds_hold_flags_t dsflags = 0;
668         uint64_t dsobj;
669         /* 6 extra bytes for /%recv */
670         char recvname[ZFS_MAX_DATASET_NAME_LEN + 6];
671
672         (void) snprintf(recvname, sizeof (recvname), "%s/%s",
673             tofs, recv_clone_name);
674
675         if (featureflags & DMU_BACKUP_FEATURE_RAW) {
676                 drba->drba_cookie->drc_raw = B_TRUE;
677         } else {
678                 dsflags |= DS_HOLD_FLAG_DECRYPT;
679         }
680
681         if (dsl_dataset_hold_flags(dp, recvname, dsflags, FTAG, &ds) != 0) {
682                 /* %recv does not exist; continue in tofs */
683                 VERIFY0(dsl_dataset_hold_flags(dp, tofs, dsflags, FTAG, &ds));
684                 drba->drba_cookie->drc_newfs = B_TRUE;
685         }
686
687         /* clear the inconsistent flag so that we can own it */
688         ASSERT(DS_IS_INCONSISTENT(ds));
689         dmu_buf_will_dirty(ds->ds_dbuf, tx);
690         dsl_dataset_phys(ds)->ds_flags &= ~DS_FLAG_INCONSISTENT;
691         dsobj = ds->ds_object;
692         dsl_dataset_rele_flags(ds, dsflags, FTAG);
693
694         VERIFY0(dsl_dataset_own_obj(dp, dsobj, dsflags, dmu_recv_tag, &ds));
695         VERIFY0(dmu_objset_from_ds(ds, &os));
696
697         dmu_buf_will_dirty(ds->ds_dbuf, tx);
698         dsl_dataset_phys(ds)->ds_flags |= DS_FLAG_INCONSISTENT;
699
700         rrw_enter(&ds->ds_bp_rwlock, RW_READER, FTAG);
701         ASSERT(!BP_IS_HOLE(dsl_dataset_get_blkptr(ds)) ||
702             drba->drba_cookie->drc_raw);
703         rrw_exit(&ds->ds_bp_rwlock, FTAG);
704
705         drba->drba_cookie->drc_ds = ds;
706
707         spa_history_log_internal_ds(ds, "resume receive", tx, "");
708 }
709
710 /*
711  * NB: callers *MUST* call dmu_recv_stream() if dmu_recv_begin()
712  * succeeds; otherwise we will leak the holds on the datasets.
713  */
714 int
715 dmu_recv_begin(char *tofs, char *tosnap, dmu_replay_record_t *drr_begin,
716     boolean_t force, boolean_t resumable, nvlist_t *localprops,
717     nvlist_t *hidden_args, char *origin, dmu_recv_cookie_t *drc)
718 {
719         dmu_recv_begin_arg_t drba = { 0 };
720
721         bzero(drc, sizeof (dmu_recv_cookie_t));
722         drc->drc_drr_begin = drr_begin;
723         drc->drc_drrb = &drr_begin->drr_u.drr_begin;
724         drc->drc_tosnap = tosnap;
725         drc->drc_tofs = tofs;
726         drc->drc_force = force;
727         drc->drc_resumable = resumable;
728         drc->drc_cred = CRED();
729         drc->drc_clone = (origin != NULL);
730
731         if (drc->drc_drrb->drr_magic == BSWAP_64(DMU_BACKUP_MAGIC)) {
732                 drc->drc_byteswap = B_TRUE;
733                 (void) fletcher_4_incremental_byteswap(drr_begin,
734                     sizeof (dmu_replay_record_t), &drc->drc_cksum);
735                 byteswap_record(drr_begin);
736         } else if (drc->drc_drrb->drr_magic == DMU_BACKUP_MAGIC) {
737                 (void) fletcher_4_incremental_native(drr_begin,
738                     sizeof (dmu_replay_record_t), &drc->drc_cksum);
739         } else {
740                 return (SET_ERROR(EINVAL));
741         }
742
743         drba.drba_origin = origin;
744         drba.drba_cookie = drc;
745         drba.drba_cred = CRED();
746
747         if (DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo) &
748             DMU_BACKUP_FEATURE_RESUMING) {
749                 return (dsl_sync_task(tofs,
750                     dmu_recv_resume_begin_check, dmu_recv_resume_begin_sync,
751                     &drba, 5, ZFS_SPACE_CHECK_NORMAL));
752         } else  {
753                 int err;
754
755                 /*
756                  * For non-raw, non-incremental, non-resuming receives the
757                  * user can specify encryption parameters on the command line
758                  * with "zfs recv -o". For these receives we create a dcp and
759                  * pass it to the sync task. Creating the dcp will implicitly
760                  * remove the encryption params from the localprops nvlist,
761                  * which avoids errors when trying to set these normally
762                  * read-only properties. Any other kind of receive that
763                  * attempts to set these properties will fail as a result.
764                  */
765                 if ((DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo) &
766                     DMU_BACKUP_FEATURE_RAW) == 0 &&
767                     origin == NULL && drc->drc_drrb->drr_fromguid == 0) {
768                         err = dsl_crypto_params_create_nvlist(DCP_CMD_NONE,
769                             localprops, hidden_args, &drba.drba_dcp);
770                         if (err != 0)
771                                 return (err);
772                 }
773
774                 err = dsl_sync_task(tofs,
775                     dmu_recv_begin_check, dmu_recv_begin_sync,
776                     &drba, 5, ZFS_SPACE_CHECK_NORMAL);
777                 dsl_crypto_params_free(drba.drba_dcp, !!err);
778
779                 return (err);
780         }
781 }
782
783 struct receive_record_arg {
784         dmu_replay_record_t header;
785         void *payload; /* Pointer to a buffer containing the payload */
786         /*
787          * If the record is a write, pointer to the arc_buf_t containing the
788          * payload.
789          */
790         arc_buf_t *arc_buf;
791         int payload_size;
792         uint64_t bytes_read; /* bytes read from stream when record created */
793         boolean_t eos_marker; /* Marks the end of the stream */
794         bqueue_node_t node;
795 };
796
797 struct receive_writer_arg {
798         objset_t *os;
799         boolean_t byteswap;
800         bqueue_t q;
801
802         /*
803          * These three args are used to signal to the main thread that we're
804          * done.
805          */
806         kmutex_t mutex;
807         kcondvar_t cv;
808         boolean_t done;
809
810         int err;
811         /* A map from guid to dataset to help handle dedup'd streams. */
812         avl_tree_t *guid_to_ds_map;
813         boolean_t resumable;
814         boolean_t raw;
815         uint64_t last_object;
816         uint64_t last_offset;
817         uint64_t max_object; /* highest object ID referenced in stream */
818         uint64_t bytes_read; /* bytes read when current record created */
819
820         /* Encryption parameters for the last received DRR_OBJECT_RANGE */
821         boolean_t or_crypt_params_present;
822         uint64_t or_firstobj;
823         uint64_t or_numslots;
824         uint8_t or_salt[ZIO_DATA_SALT_LEN];
825         uint8_t or_iv[ZIO_DATA_IV_LEN];
826         uint8_t or_mac[ZIO_DATA_MAC_LEN];
827         boolean_t or_byteorder;
828 };
829
830 struct objlist {
831         list_t list; /* List of struct receive_objnode. */
832         /*
833          * Last object looked up. Used to assert that objects are being looked
834          * up in ascending order.
835          */
836         uint64_t last_lookup;
837 };
838
839 struct receive_objnode {
840         list_node_t node;
841         uint64_t object;
842 };
843
844 struct receive_arg  {
845         objset_t *os;
846         vnode_t *vp; /* The vnode to read the stream from */
847         uint64_t voff; /* The current offset in the stream */
848         uint64_t bytes_read;
849         /*
850          * A record that has had its payload read in, but hasn't yet been handed
851          * off to the worker thread.
852          */
853         struct receive_record_arg *rrd;
854         /* A record that has had its header read in, but not its payload. */
855         struct receive_record_arg *next_rrd;
856         zio_cksum_t cksum;
857         zio_cksum_t prev_cksum;
858         int err;
859         boolean_t byteswap;
860         boolean_t raw;
861         uint64_t featureflags;
862         /* Sorted list of objects not to issue prefetches for. */
863         struct objlist ignore_objlist;
864 };
865
866 typedef struct guid_map_entry {
867         uint64_t        guid;
868         boolean_t       raw;
869         dsl_dataset_t   *gme_ds;
870         avl_node_t      avlnode;
871 } guid_map_entry_t;
872
873 static int
874 guid_compare(const void *arg1, const void *arg2)
875 {
876         const guid_map_entry_t *gmep1 = (const guid_map_entry_t *)arg1;
877         const guid_map_entry_t *gmep2 = (const guid_map_entry_t *)arg2;
878
879         return (AVL_CMP(gmep1->guid, gmep2->guid));
880 }
881
882 static void
883 free_guid_map_onexit(void *arg)
884 {
885         avl_tree_t *ca = arg;
886         void *cookie = NULL;
887         guid_map_entry_t *gmep;
888
889         while ((gmep = avl_destroy_nodes(ca, &cookie)) != NULL) {
890                 ds_hold_flags_t dsflags = DS_HOLD_FLAG_DECRYPT;
891
892                 if (gmep->raw) {
893                         gmep->gme_ds->ds_objset->os_raw_receive = B_FALSE;
894                         dsflags &= ~DS_HOLD_FLAG_DECRYPT;
895                 }
896
897                 dsl_dataset_disown(gmep->gme_ds, dsflags, gmep);
898                 kmem_free(gmep, sizeof (guid_map_entry_t));
899         }
900         avl_destroy(ca);
901         kmem_free(ca, sizeof (avl_tree_t));
902 }
903
904 static int
905 receive_read(struct receive_arg *ra, int len, void *buf)
906 {
907         int done = 0;
908
909         /*
910          * The code doesn't rely on this (lengths being multiples of 8).  See
911          * comment in dump_bytes.
912          */
913         ASSERT(len % 8 == 0 ||
914             (ra->featureflags & DMU_BACKUP_FEATURE_RAW) != 0);
915
916         while (done < len) {
917                 ssize_t resid;
918
919                 ra->err = vn_rdwr(UIO_READ, ra->vp,
920                     (char *)buf + done, len - done,
921                     ra->voff, UIO_SYSSPACE, FAPPEND,
922                     RLIM64_INFINITY, CRED(), &resid);
923
924                 if (resid == len - done) {
925                         /*
926                          * Note: ECKSUM indicates that the receive
927                          * was interrupted and can potentially be resumed.
928                          */
929                         ra->err = SET_ERROR(ECKSUM);
930                 }
931                 ra->voff += len - done - resid;
932                 done = len - resid;
933                 if (ra->err != 0)
934                         return (ra->err);
935         }
936
937         ra->bytes_read += len;
938
939         ASSERT3U(done, ==, len);
940         return (0);
941 }
942
943 noinline static void
944 byteswap_record(dmu_replay_record_t *drr)
945 {
946 #define DO64(X) (drr->drr_u.X = BSWAP_64(drr->drr_u.X))
947 #define DO32(X) (drr->drr_u.X = BSWAP_32(drr->drr_u.X))
948         drr->drr_type = BSWAP_32(drr->drr_type);
949         drr->drr_payloadlen = BSWAP_32(drr->drr_payloadlen);
950
951         switch (drr->drr_type) {
952         case DRR_BEGIN:
953                 DO64(drr_begin.drr_magic);
954                 DO64(drr_begin.drr_versioninfo);
955                 DO64(drr_begin.drr_creation_time);
956                 DO32(drr_begin.drr_type);
957                 DO32(drr_begin.drr_flags);
958                 DO64(drr_begin.drr_toguid);
959                 DO64(drr_begin.drr_fromguid);
960                 break;
961         case DRR_OBJECT:
962                 DO64(drr_object.drr_object);
963                 DO32(drr_object.drr_type);
964                 DO32(drr_object.drr_bonustype);
965                 DO32(drr_object.drr_blksz);
966                 DO32(drr_object.drr_bonuslen);
967                 DO32(drr_object.drr_raw_bonuslen);
968                 DO64(drr_object.drr_toguid);
969                 DO64(drr_object.drr_maxblkid);
970                 break;
971         case DRR_FREEOBJECTS:
972                 DO64(drr_freeobjects.drr_firstobj);
973                 DO64(drr_freeobjects.drr_numobjs);
974                 DO64(drr_freeobjects.drr_toguid);
975                 break;
976         case DRR_WRITE:
977                 DO64(drr_write.drr_object);
978                 DO32(drr_write.drr_type);
979                 DO64(drr_write.drr_offset);
980                 DO64(drr_write.drr_logical_size);
981                 DO64(drr_write.drr_toguid);
982                 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_write.drr_key.ddk_cksum);
983                 DO64(drr_write.drr_key.ddk_prop);
984                 DO64(drr_write.drr_compressed_size);
985                 break;
986         case DRR_WRITE_BYREF:
987                 DO64(drr_write_byref.drr_object);
988                 DO64(drr_write_byref.drr_offset);
989                 DO64(drr_write_byref.drr_length);
990                 DO64(drr_write_byref.drr_toguid);
991                 DO64(drr_write_byref.drr_refguid);
992                 DO64(drr_write_byref.drr_refobject);
993                 DO64(drr_write_byref.drr_refoffset);
994                 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_write_byref.
995                     drr_key.ddk_cksum);
996                 DO64(drr_write_byref.drr_key.ddk_prop);
997                 break;
998         case DRR_WRITE_EMBEDDED:
999                 DO64(drr_write_embedded.drr_object);
1000                 DO64(drr_write_embedded.drr_offset);
1001                 DO64(drr_write_embedded.drr_length);
1002                 DO64(drr_write_embedded.drr_toguid);
1003                 DO32(drr_write_embedded.drr_lsize);
1004                 DO32(drr_write_embedded.drr_psize);
1005                 break;
1006         case DRR_FREE:
1007                 DO64(drr_free.drr_object);
1008                 DO64(drr_free.drr_offset);
1009                 DO64(drr_free.drr_length);
1010                 DO64(drr_free.drr_toguid);
1011                 break;
1012         case DRR_SPILL:
1013                 DO64(drr_spill.drr_object);
1014                 DO64(drr_spill.drr_length);
1015                 DO64(drr_spill.drr_toguid);
1016                 DO64(drr_spill.drr_compressed_size);
1017                 DO32(drr_spill.drr_type);
1018                 break;
1019         case DRR_OBJECT_RANGE:
1020                 DO64(drr_object_range.drr_firstobj);
1021                 DO64(drr_object_range.drr_numslots);
1022                 DO64(drr_object_range.drr_toguid);
1023                 break;
1024         case DRR_END:
1025                 DO64(drr_end.drr_toguid);
1026                 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_end.drr_checksum);
1027                 break;
1028         default:
1029                 break;
1030         }
1031
1032         if (drr->drr_type != DRR_BEGIN) {
1033                 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_checksum.drr_checksum);
1034         }
1035
1036 #undef DO64
1037 #undef DO32
1038 }
1039
1040 static inline uint8_t
1041 deduce_nblkptr(dmu_object_type_t bonus_type, uint64_t bonus_size)
1042 {
1043         if (bonus_type == DMU_OT_SA) {
1044                 return (1);
1045         } else {
1046                 return (1 +
1047                     ((DN_OLD_MAX_BONUSLEN -
1048                     MIN(DN_OLD_MAX_BONUSLEN, bonus_size)) >> SPA_BLKPTRSHIFT));
1049         }
1050 }
1051
1052 static void
1053 save_resume_state(struct receive_writer_arg *rwa,
1054     uint64_t object, uint64_t offset, dmu_tx_t *tx)
1055 {
1056         int txgoff = dmu_tx_get_txg(tx) & TXG_MASK;
1057
1058         if (!rwa->resumable)
1059                 return;
1060
1061         /*
1062          * We use ds_resume_bytes[] != 0 to indicate that we need to
1063          * update this on disk, so it must not be 0.
1064          */
1065         ASSERT(rwa->bytes_read != 0);
1066
1067         /*
1068          * We only resume from write records, which have a valid
1069          * (non-meta-dnode) object number.
1070          */
1071         ASSERT(object != 0);
1072
1073         /*
1074          * For resuming to work correctly, we must receive records in order,
1075          * sorted by object,offset.  This is checked by the callers, but
1076          * assert it here for good measure.
1077          */
1078         ASSERT3U(object, >=, rwa->os->os_dsl_dataset->ds_resume_object[txgoff]);
1079         ASSERT(object != rwa->os->os_dsl_dataset->ds_resume_object[txgoff] ||
1080             offset >= rwa->os->os_dsl_dataset->ds_resume_offset[txgoff]);
1081         ASSERT3U(rwa->bytes_read, >=,
1082             rwa->os->os_dsl_dataset->ds_resume_bytes[txgoff]);
1083
1084         rwa->os->os_dsl_dataset->ds_resume_object[txgoff] = object;
1085         rwa->os->os_dsl_dataset->ds_resume_offset[txgoff] = offset;
1086         rwa->os->os_dsl_dataset->ds_resume_bytes[txgoff] = rwa->bytes_read;
1087 }
1088
1089 noinline static int
1090 receive_object(struct receive_writer_arg *rwa, struct drr_object *drro,
1091     void *data)
1092 {
1093         dmu_object_info_t doi;
1094         dmu_tx_t *tx;
1095         uint64_t object;
1096         int err;
1097         uint8_t dn_slots = drro->drr_dn_slots != 0 ?
1098             drro->drr_dn_slots : DNODE_MIN_SLOTS;
1099
1100         if (drro->drr_type == DMU_OT_NONE ||
1101             !DMU_OT_IS_VALID(drro->drr_type) ||
1102             !DMU_OT_IS_VALID(drro->drr_bonustype) ||
1103             drro->drr_checksumtype >= ZIO_CHECKSUM_FUNCTIONS ||
1104             drro->drr_compress >= ZIO_COMPRESS_FUNCTIONS ||
1105             P2PHASE(drro->drr_blksz, SPA_MINBLOCKSIZE) ||
1106             drro->drr_blksz < SPA_MINBLOCKSIZE ||
1107             drro->drr_blksz > spa_maxblocksize(dmu_objset_spa(rwa->os)) ||
1108             drro->drr_bonuslen >
1109             DN_BONUS_SIZE(spa_maxdnodesize(dmu_objset_spa(rwa->os))) ||
1110             dn_slots >
1111             (spa_maxdnodesize(dmu_objset_spa(rwa->os)) >> DNODE_SHIFT))  {
1112                 return (SET_ERROR(EINVAL));
1113         }
1114
1115         if (rwa->raw) {
1116                 /*
1117                  * We should have received a DRR_OBJECT_RANGE record
1118                  * containing this block and stored it in rwa.
1119                  */
1120                 if (drro->drr_object < rwa->or_firstobj ||
1121                     drro->drr_object >= rwa->or_firstobj + rwa->or_numslots ||
1122                     drro->drr_raw_bonuslen < drro->drr_bonuslen ||
1123                     drro->drr_indblkshift > SPA_MAXBLOCKSHIFT ||
1124                     drro->drr_nlevels > DN_MAX_LEVELS ||
1125                     drro->drr_nblkptr > DN_MAX_NBLKPTR ||
1126                     DN_SLOTS_TO_BONUSLEN(dn_slots) <
1127                     drro->drr_raw_bonuslen)
1128                         return (SET_ERROR(EINVAL));
1129         } else {
1130                 if (drro->drr_flags != 0 || drro->drr_raw_bonuslen != 0 ||
1131                     drro->drr_indblkshift != 0 || drro->drr_nlevels != 0 ||
1132                     drro->drr_nblkptr != 0)
1133                         return (SET_ERROR(EINVAL));
1134         }
1135
1136         err = dmu_object_info(rwa->os, drro->drr_object, &doi);
1137         if (err != 0 && err != ENOENT && err != EEXIST)
1138                 return (SET_ERROR(EINVAL));
1139
1140         if (drro->drr_object > rwa->max_object)
1141                 rwa->max_object = drro->drr_object;
1142
1143         /*
1144          * If we are losing blkptrs or changing the block size this must
1145          * be a new file instance.  We must clear out the previous file
1146          * contents before we can change this type of metadata in the dnode.
1147          * Raw receives will also check that the indirect structure of the
1148          * dnode hasn't changed.
1149          */
1150         if (err == 0) {
1151                 uint32_t indblksz = drro->drr_indblkshift ?
1152                     1ULL << drro->drr_indblkshift : 0;
1153                 int nblkptr = deduce_nblkptr(drro->drr_bonustype,
1154                     drro->drr_bonuslen);
1155
1156                 object = drro->drr_object;
1157
1158                 /* nblkptr will be bounded by the bonus size and type */
1159                 if (rwa->raw && nblkptr != drro->drr_nblkptr)
1160                         return (SET_ERROR(EINVAL));
1161
1162                 if (drro->drr_blksz != doi.doi_data_block_size ||
1163                     nblkptr < doi.doi_nblkptr ||
1164                     dn_slots != doi.doi_dnodesize >> DNODE_SHIFT ||
1165                     (rwa->raw &&
1166                     (indblksz != doi.doi_metadata_block_size ||
1167                     drro->drr_nlevels < doi.doi_indirection))) {
1168                         err = dmu_free_long_range(rwa->os,
1169                             drro->drr_object, 0, DMU_OBJECT_END);
1170                         if (err != 0)
1171                                 return (SET_ERROR(EINVAL));
1172                 }
1173
1174                 /*
1175                  * The dmu does not currently support decreasing nlevels
1176                  * on an object. For non-raw sends, this does not matter
1177                  * and the new object can just use the previous one's nlevels.
1178                  * For raw sends, however, the structure of the received dnode
1179                  * (including nlevels) must match that of the send side.
1180                  * Therefore, instead of using dmu_object_reclaim(), we must
1181                  * free the object completely and call dmu_object_claim_dnsize()
1182                  * instead.
1183                  */
1184                 if ((rwa->raw && drro->drr_nlevels < doi.doi_indirection) ||
1185                     dn_slots != doi.doi_dnodesize >> DNODE_SHIFT) {
1186                         err = dmu_free_long_object(rwa->os, drro->drr_object);
1187                         if (err != 0)
1188                                 return (SET_ERROR(EINVAL));
1189
1190                         txg_wait_synced(dmu_objset_pool(rwa->os), 0);
1191                         object = DMU_NEW_OBJECT;
1192                 }
1193         } else if (err == EEXIST) {
1194                 /*
1195                  * The object requested is currently an interior slot of a
1196                  * multi-slot dnode. This will be resolved when the next txg
1197                  * is synced out, since the send stream will have told us
1198                  * to free this slot when we freed the associated dnode
1199                  * earlier in the stream.
1200                  */
1201                 txg_wait_synced(dmu_objset_pool(rwa->os), 0);
1202                 object = drro->drr_object;
1203         } else {
1204                 /* object is free and we are about to allocate a new one */
1205                 object = DMU_NEW_OBJECT;
1206         }
1207
1208         /*
1209          * If this is a multi-slot dnode there is a chance that this
1210          * object will expand into a slot that is already used by
1211          * another object from the previous snapshot. We must free
1212          * these objects before we attempt to allocate the new dnode.
1213          */
1214         if (dn_slots > 1) {
1215                 boolean_t need_sync = B_FALSE;
1216
1217                 for (uint64_t slot = drro->drr_object + 1;
1218                     slot < drro->drr_object + dn_slots;
1219                     slot++) {
1220                         dmu_object_info_t slot_doi;
1221
1222                         err = dmu_object_info(rwa->os, slot, &slot_doi);
1223                         if (err == ENOENT || err == EEXIST)
1224                                 continue;
1225                         else if (err != 0)
1226                                 return (err);
1227
1228                         err = dmu_free_long_object(rwa->os, slot);
1229
1230                         if (err != 0)
1231                                 return (err);
1232
1233                         need_sync = B_TRUE;
1234                 }
1235
1236                 if (need_sync)
1237                         txg_wait_synced(dmu_objset_pool(rwa->os), 0);
1238         }
1239
1240         tx = dmu_tx_create(rwa->os);
1241         dmu_tx_hold_bonus(tx, object);
1242         dmu_tx_hold_write(tx, object, 0, 0);
1243         err = dmu_tx_assign(tx, TXG_WAIT);
1244         if (err != 0) {
1245                 dmu_tx_abort(tx);
1246                 return (err);
1247         }
1248
1249         if (object == DMU_NEW_OBJECT) {
1250                 /* currently free, want to be allocated */
1251                 err = dmu_object_claim_dnsize(rwa->os, drro->drr_object,
1252                     drro->drr_type, drro->drr_blksz,
1253                     drro->drr_bonustype, drro->drr_bonuslen,
1254                     dn_slots << DNODE_SHIFT, tx);
1255         } else if (drro->drr_type != doi.doi_type ||
1256             drro->drr_blksz != doi.doi_data_block_size ||
1257             drro->drr_bonustype != doi.doi_bonus_type ||
1258             drro->drr_bonuslen != doi.doi_bonus_size) {
1259                 /* currently allocated, but with different properties */
1260                 err = dmu_object_reclaim_dnsize(rwa->os, drro->drr_object,
1261                     drro->drr_type, drro->drr_blksz,
1262                     drro->drr_bonustype, drro->drr_bonuslen,
1263                     dn_slots << DNODE_SHIFT, tx);
1264         }
1265         if (err != 0) {
1266                 dmu_tx_commit(tx);
1267                 return (SET_ERROR(EINVAL));
1268         }
1269
1270         if (rwa->or_crypt_params_present) {
1271                 /*
1272                  * Set the crypt params for the buffer associated with this
1273                  * range of dnodes.  This causes the blkptr_t to have the
1274                  * same crypt params (byteorder, salt, iv, mac) as on the
1275                  * sending side.
1276                  *
1277                  * Since we are committing this tx now, it is possible for
1278                  * the dnode block to end up on-disk with the incorrect MAC,
1279                  * if subsequent objects in this block are received in a
1280                  * different txg.  However, since the dataset is marked as
1281                  * inconsistent, no code paths will do a non-raw read (or
1282                  * decrypt the block / verify the MAC). The receive code and
1283                  * scrub code can safely do raw reads and verify the
1284                  * checksum.  They don't need to verify the MAC.
1285                  */
1286                 dmu_buf_t *db = NULL;
1287                 uint64_t offset = rwa->or_firstobj * DNODE_MIN_SIZE;
1288
1289                 err = dmu_buf_hold_by_dnode(DMU_META_DNODE(rwa->os),
1290                     offset, FTAG, &db, DMU_READ_PREFETCH | DMU_READ_NO_DECRYPT);
1291                 if (err != 0) {
1292                         dmu_tx_commit(tx);
1293                         return (SET_ERROR(EINVAL));
1294                 }
1295
1296                 dmu_buf_set_crypt_params(db, rwa->or_byteorder,
1297                     rwa->or_salt, rwa->or_iv, rwa->or_mac, tx);
1298
1299                 dmu_buf_rele(db, FTAG);
1300
1301                 rwa->or_crypt_params_present = B_FALSE;
1302         }
1303
1304         dmu_object_set_checksum(rwa->os, drro->drr_object,
1305             drro->drr_checksumtype, tx);
1306         dmu_object_set_compress(rwa->os, drro->drr_object,
1307             drro->drr_compress, tx);
1308
1309         /* handle more restrictive dnode structuring for raw recvs */
1310         if (rwa->raw) {
1311                 /*
1312                  * Set the indirect block shift and nlevels. This will not fail
1313                  * because we ensured all of the blocks were free earlier if
1314                  * this is a new object.
1315                  */
1316                 VERIFY0(dmu_object_set_blocksize(rwa->os, drro->drr_object,
1317                     drro->drr_blksz, drro->drr_indblkshift, tx));
1318                 VERIFY0(dmu_object_set_nlevels(rwa->os, drro->drr_object,
1319                     drro->drr_nlevels, tx));
1320                 VERIFY0(dmu_object_set_maxblkid(rwa->os, drro->drr_object,
1321                     drro->drr_maxblkid, tx));
1322         }
1323
1324         if (data != NULL) {
1325                 dmu_buf_t *db;
1326                 dnode_t *dn;
1327                 uint32_t flags = DMU_READ_NO_PREFETCH;
1328
1329                 if (rwa->raw)
1330                         flags |= DMU_READ_NO_DECRYPT;
1331
1332                 VERIFY0(dnode_hold(rwa->os, drro->drr_object, FTAG, &dn));
1333                 VERIFY0(dmu_bonus_hold_by_dnode(dn, FTAG, &db, flags));
1334
1335                 dmu_buf_will_dirty(db, tx);
1336
1337                 ASSERT3U(db->db_size, >=, drro->drr_bonuslen);
1338                 bcopy(data, db->db_data, DRR_OBJECT_PAYLOAD_SIZE(drro));
1339
1340                 /*
1341                  * Raw bonus buffers have their byteorder determined by the
1342                  * DRR_OBJECT_RANGE record.
1343                  */
1344                 if (rwa->byteswap && !rwa->raw) {
1345                         dmu_object_byteswap_t byteswap =
1346                             DMU_OT_BYTESWAP(drro->drr_bonustype);
1347                         dmu_ot_byteswap[byteswap].ob_func(db->db_data,
1348                             DRR_OBJECT_PAYLOAD_SIZE(drro));
1349                 }
1350                 dmu_buf_rele(db, FTAG);
1351                 dnode_rele(dn, FTAG);
1352         }
1353         dmu_tx_commit(tx);
1354
1355         return (0);
1356 }
1357
1358 /* ARGSUSED */
1359 noinline static int
1360 receive_freeobjects(struct receive_writer_arg *rwa,
1361     struct drr_freeobjects *drrfo)
1362 {
1363         uint64_t obj;
1364         int next_err = 0;
1365
1366         if (drrfo->drr_firstobj + drrfo->drr_numobjs < drrfo->drr_firstobj)
1367                 return (SET_ERROR(EINVAL));
1368
1369         for (obj = drrfo->drr_firstobj == 0 ? 1 : drrfo->drr_firstobj;
1370             obj < drrfo->drr_firstobj + drrfo->drr_numobjs && next_err == 0;
1371             next_err = dmu_object_next(rwa->os, &obj, FALSE, 0)) {
1372                 dmu_object_info_t doi;
1373                 int err;
1374
1375                 err = dmu_object_info(rwa->os, obj, &doi);
1376                 if (err == ENOENT)
1377                         continue;
1378                 else if (err != 0)
1379                         return (err);
1380
1381                 err = dmu_free_long_object(rwa->os, obj);
1382
1383                 if (err != 0)
1384                         return (err);
1385
1386                 if (obj > rwa->max_object)
1387                         rwa->max_object = obj;
1388         }
1389         if (next_err != ESRCH)
1390                 return (next_err);
1391         return (0);
1392 }
1393
1394 noinline static int
1395 receive_write(struct receive_writer_arg *rwa, struct drr_write *drrw,
1396     arc_buf_t *abuf)
1397 {
1398         int err;
1399         dmu_tx_t *tx;
1400         dnode_t *dn;
1401
1402         if (drrw->drr_offset + drrw->drr_logical_size < drrw->drr_offset ||
1403             !DMU_OT_IS_VALID(drrw->drr_type))
1404                 return (SET_ERROR(EINVAL));
1405
1406         /*
1407          * For resuming to work, records must be in increasing order
1408          * by (object, offset).
1409          */
1410         if (drrw->drr_object < rwa->last_object ||
1411             (drrw->drr_object == rwa->last_object &&
1412             drrw->drr_offset < rwa->last_offset)) {
1413                 return (SET_ERROR(EINVAL));
1414         }
1415         rwa->last_object = drrw->drr_object;
1416         rwa->last_offset = drrw->drr_offset;
1417
1418         if (rwa->last_object > rwa->max_object)
1419                 rwa->max_object = rwa->last_object;
1420
1421         if (dmu_object_info(rwa->os, drrw->drr_object, NULL) != 0)
1422                 return (SET_ERROR(EINVAL));
1423
1424         tx = dmu_tx_create(rwa->os);
1425         dmu_tx_hold_write(tx, drrw->drr_object,
1426             drrw->drr_offset, drrw->drr_logical_size);
1427         err = dmu_tx_assign(tx, TXG_WAIT);
1428         if (err != 0) {
1429                 dmu_tx_abort(tx);
1430                 return (err);
1431         }
1432
1433         if (rwa->byteswap && !arc_is_encrypted(abuf) &&
1434             arc_get_compression(abuf) == ZIO_COMPRESS_OFF) {
1435                 dmu_object_byteswap_t byteswap =
1436                     DMU_OT_BYTESWAP(drrw->drr_type);
1437                 dmu_ot_byteswap[byteswap].ob_func(abuf->b_data,
1438                     DRR_WRITE_PAYLOAD_SIZE(drrw));
1439         }
1440
1441         VERIFY0(dnode_hold(rwa->os, drrw->drr_object, FTAG, &dn));
1442         err = dmu_assign_arcbuf_by_dnode(dn, drrw->drr_offset, abuf, tx);
1443         if (err != 0) {
1444                 dnode_rele(dn, FTAG);
1445                 dmu_tx_commit(tx);
1446                 return (err);
1447         }
1448         dnode_rele(dn, FTAG);
1449
1450         /*
1451          * Note: If the receive fails, we want the resume stream to start
1452          * with the same record that we last successfully received (as opposed
1453          * to the next record), so that we can verify that we are
1454          * resuming from the correct location.
1455          */
1456         save_resume_state(rwa, drrw->drr_object, drrw->drr_offset, tx);
1457         dmu_tx_commit(tx);
1458
1459         return (0);
1460 }
1461
1462 /*
1463  * Handle a DRR_WRITE_BYREF record.  This record is used in dedup'ed
1464  * streams to refer to a copy of the data that is already on the
1465  * system because it came in earlier in the stream.  This function
1466  * finds the earlier copy of the data, and uses that copy instead of
1467  * data from the stream to fulfill this write.
1468  */
1469 static int
1470 receive_write_byref(struct receive_writer_arg *rwa,
1471     struct drr_write_byref *drrwbr)
1472 {
1473         dmu_tx_t *tx;
1474         int err;
1475         guid_map_entry_t gmesrch;
1476         guid_map_entry_t *gmep;
1477         avl_index_t where;
1478         objset_t *ref_os = NULL;
1479         int flags = DMU_READ_PREFETCH;
1480         dmu_buf_t *dbp;
1481
1482         if (drrwbr->drr_offset + drrwbr->drr_length < drrwbr->drr_offset)
1483                 return (SET_ERROR(EINVAL));
1484
1485         /*
1486          * If the GUID of the referenced dataset is different from the
1487          * GUID of the target dataset, find the referenced dataset.
1488          */
1489         if (drrwbr->drr_toguid != drrwbr->drr_refguid) {
1490                 gmesrch.guid = drrwbr->drr_refguid;
1491                 if ((gmep = avl_find(rwa->guid_to_ds_map, &gmesrch,
1492                     &where)) == NULL) {
1493                         return (SET_ERROR(EINVAL));
1494                 }
1495                 if (dmu_objset_from_ds(gmep->gme_ds, &ref_os))
1496                         return (SET_ERROR(EINVAL));
1497         } else {
1498                 ref_os = rwa->os;
1499         }
1500
1501         if (drrwbr->drr_object > rwa->max_object)
1502                 rwa->max_object = drrwbr->drr_object;
1503
1504         if (rwa->raw)
1505                 flags |= DMU_READ_NO_DECRYPT;
1506
1507         /* may return either a regular db or an encrypted one */
1508         err = dmu_buf_hold(ref_os, drrwbr->drr_refobject,
1509             drrwbr->drr_refoffset, FTAG, &dbp, flags);
1510         if (err != 0)
1511                 return (err);
1512
1513         tx = dmu_tx_create(rwa->os);
1514
1515         dmu_tx_hold_write(tx, drrwbr->drr_object,
1516             drrwbr->drr_offset, drrwbr->drr_length);
1517         err = dmu_tx_assign(tx, TXG_WAIT);
1518         if (err != 0) {
1519                 dmu_tx_abort(tx);
1520                 return (err);
1521         }
1522
1523         if (rwa->raw) {
1524                 dmu_copy_from_buf(rwa->os, drrwbr->drr_object,
1525                     drrwbr->drr_offset, dbp, tx);
1526         } else {
1527                 dmu_write(rwa->os, drrwbr->drr_object,
1528                     drrwbr->drr_offset, drrwbr->drr_length, dbp->db_data, tx);
1529         }
1530         dmu_buf_rele(dbp, FTAG);
1531
1532         /* See comment in restore_write. */
1533         save_resume_state(rwa, drrwbr->drr_object, drrwbr->drr_offset, tx);
1534         dmu_tx_commit(tx);
1535         return (0);
1536 }
1537
1538 static int
1539 receive_write_embedded(struct receive_writer_arg *rwa,
1540     struct drr_write_embedded *drrwe, void *data)
1541 {
1542         dmu_tx_t *tx;
1543         int err;
1544
1545         if (drrwe->drr_offset + drrwe->drr_length < drrwe->drr_offset)
1546                 return (SET_ERROR(EINVAL));
1547
1548         if (drrwe->drr_psize > BPE_PAYLOAD_SIZE)
1549                 return (SET_ERROR(EINVAL));
1550
1551         if (drrwe->drr_etype >= NUM_BP_EMBEDDED_TYPES)
1552                 return (SET_ERROR(EINVAL));
1553         if (drrwe->drr_compression >= ZIO_COMPRESS_FUNCTIONS)
1554                 return (SET_ERROR(EINVAL));
1555         if (rwa->raw)
1556                 return (SET_ERROR(EINVAL));
1557
1558         if (drrwe->drr_object > rwa->max_object)
1559                 rwa->max_object = drrwe->drr_object;
1560
1561         tx = dmu_tx_create(rwa->os);
1562
1563         dmu_tx_hold_write(tx, drrwe->drr_object,
1564             drrwe->drr_offset, drrwe->drr_length);
1565         err = dmu_tx_assign(tx, TXG_WAIT);
1566         if (err != 0) {
1567                 dmu_tx_abort(tx);
1568                 return (err);
1569         }
1570
1571         dmu_write_embedded(rwa->os, drrwe->drr_object,
1572             drrwe->drr_offset, data, drrwe->drr_etype,
1573             drrwe->drr_compression, drrwe->drr_lsize, drrwe->drr_psize,
1574             rwa->byteswap ^ ZFS_HOST_BYTEORDER, tx);
1575
1576         /* See comment in restore_write. */
1577         save_resume_state(rwa, drrwe->drr_object, drrwe->drr_offset, tx);
1578         dmu_tx_commit(tx);
1579         return (0);
1580 }
1581
1582 static int
1583 receive_spill(struct receive_writer_arg *rwa, struct drr_spill *drrs,
1584     arc_buf_t *abuf)
1585 {
1586         dmu_tx_t *tx;
1587         dmu_buf_t *db, *db_spill;
1588         int err;
1589         uint32_t flags = 0;
1590
1591         if (drrs->drr_length < SPA_MINBLOCKSIZE ||
1592             drrs->drr_length > spa_maxblocksize(dmu_objset_spa(rwa->os)))
1593                 return (SET_ERROR(EINVAL));
1594
1595         if (rwa->raw) {
1596                 if (!DMU_OT_IS_VALID(drrs->drr_type) ||
1597                     drrs->drr_compressiontype >= ZIO_COMPRESS_FUNCTIONS ||
1598                     drrs->drr_compressed_size == 0)
1599                         return (SET_ERROR(EINVAL));
1600
1601                 flags |= DMU_READ_NO_DECRYPT;
1602         }
1603
1604         if (dmu_object_info(rwa->os, drrs->drr_object, NULL) != 0)
1605                 return (SET_ERROR(EINVAL));
1606
1607         if (drrs->drr_object > rwa->max_object)
1608                 rwa->max_object = drrs->drr_object;
1609
1610         VERIFY0(dmu_bonus_hold(rwa->os, drrs->drr_object, FTAG, &db));
1611         if ((err = dmu_spill_hold_by_bonus(db, DMU_READ_NO_DECRYPT, FTAG,
1612             &db_spill)) != 0) {
1613                 dmu_buf_rele(db, FTAG);
1614                 return (err);
1615         }
1616
1617         tx = dmu_tx_create(rwa->os);
1618
1619         dmu_tx_hold_spill(tx, db->db_object);
1620
1621         err = dmu_tx_assign(tx, TXG_WAIT);
1622         if (err != 0) {
1623                 dmu_buf_rele(db, FTAG);
1624                 dmu_buf_rele(db_spill, FTAG);
1625                 dmu_tx_abort(tx);
1626                 return (err);
1627         }
1628
1629         if (db_spill->db_size < drrs->drr_length)
1630                 VERIFY(0 == dbuf_spill_set_blksz(db_spill,
1631                     drrs->drr_length, tx));
1632
1633         if (rwa->byteswap && !arc_is_encrypted(abuf) &&
1634             arc_get_compression(abuf) == ZIO_COMPRESS_OFF) {
1635                 dmu_object_byteswap_t byteswap =
1636                     DMU_OT_BYTESWAP(drrs->drr_type);
1637                 dmu_ot_byteswap[byteswap].ob_func(abuf->b_data,
1638                     DRR_SPILL_PAYLOAD_SIZE(drrs));
1639         }
1640
1641         dbuf_assign_arcbuf((dmu_buf_impl_t *)db_spill, abuf, tx);
1642
1643         dmu_buf_rele(db, FTAG);
1644         dmu_buf_rele(db_spill, FTAG);
1645
1646         dmu_tx_commit(tx);
1647         return (0);
1648 }
1649
1650 /* ARGSUSED */
1651 noinline static int
1652 receive_free(struct receive_writer_arg *rwa, struct drr_free *drrf)
1653 {
1654         int err;
1655
1656         if (drrf->drr_length != DMU_OBJECT_END &&
1657             drrf->drr_offset + drrf->drr_length < drrf->drr_offset)
1658                 return (SET_ERROR(EINVAL));
1659
1660         if (dmu_object_info(rwa->os, drrf->drr_object, NULL) != 0)
1661                 return (SET_ERROR(EINVAL));
1662
1663         if (drrf->drr_object > rwa->max_object)
1664                 rwa->max_object = drrf->drr_object;
1665
1666         err = dmu_free_long_range(rwa->os, drrf->drr_object,
1667             drrf->drr_offset, drrf->drr_length);
1668
1669         return (err);
1670 }
1671
1672 static int
1673 receive_object_range(struct receive_writer_arg *rwa,
1674     struct drr_object_range *drror)
1675 {
1676         /*
1677          * By default, we assume this block is in our native format
1678          * (ZFS_HOST_BYTEORDER). We then take into account whether
1679          * the send stream is byteswapped (rwa->byteswap). Finally,
1680          * we need to byteswap again if this particular block was
1681          * in non-native format on the send side.
1682          */
1683         boolean_t byteorder = ZFS_HOST_BYTEORDER ^ rwa->byteswap ^
1684             !!DRR_IS_RAW_BYTESWAPPED(drror->drr_flags);
1685
1686         /*
1687          * Since dnode block sizes are constant, we should not need to worry
1688          * about making sure that the dnode block size is the same on the
1689          * sending and receiving sides for the time being. For non-raw sends,
1690          * this does not matter (and in fact we do not send a DRR_OBJECT_RANGE
1691          * record at all). Raw sends require this record type because the
1692          * encryption parameters are used to protect an entire block of bonus
1693          * buffers. If the size of dnode blocks ever becomes variable,
1694          * handling will need to be added to ensure that dnode block sizes
1695          * match on the sending and receiving side.
1696          */
1697         if (drror->drr_numslots != DNODES_PER_BLOCK ||
1698             P2PHASE(drror->drr_firstobj, DNODES_PER_BLOCK) != 0 ||
1699             !rwa->raw)
1700                 return (SET_ERROR(EINVAL));
1701
1702         if (drror->drr_firstobj > rwa->max_object)
1703                 rwa->max_object = drror->drr_firstobj;
1704
1705         /*
1706          * The DRR_OBJECT_RANGE handling must be deferred to receive_object()
1707          * so that the block of dnodes is not written out when it's empty,
1708          * and converted to a HOLE BP.
1709          */
1710         rwa->or_crypt_params_present = B_TRUE;
1711         rwa->or_firstobj = drror->drr_firstobj;
1712         rwa->or_numslots = drror->drr_numslots;
1713         bcopy(drror->drr_salt, rwa->or_salt, ZIO_DATA_SALT_LEN);
1714         bcopy(drror->drr_iv, rwa->or_iv, ZIO_DATA_IV_LEN);
1715         bcopy(drror->drr_mac, rwa->or_mac, ZIO_DATA_MAC_LEN);
1716         rwa->or_byteorder = byteorder;
1717
1718         return (0);
1719 }
1720
1721 /* used to destroy the drc_ds on error */
1722 static void
1723 dmu_recv_cleanup_ds(dmu_recv_cookie_t *drc)
1724 {
1725         dsl_dataset_t *ds = drc->drc_ds;
1726         ds_hold_flags_t dsflags = (drc->drc_raw) ? 0 : DS_HOLD_FLAG_DECRYPT;
1727
1728         /*
1729          * Wait for the txg sync before cleaning up the receive. For
1730          * resumable receives, this ensures that our resume state has
1731          * been written out to disk. For raw receives, this ensures
1732          * that the user accounting code will not attempt to do anything
1733          * after we stopped receiving the dataset.
1734          */
1735         txg_wait_synced(ds->ds_dir->dd_pool, 0);
1736         ds->ds_objset->os_raw_receive = B_FALSE;
1737
1738         rrw_enter(&ds->ds_bp_rwlock, RW_READER, FTAG);
1739         if (drc->drc_resumable && !BP_IS_HOLE(dsl_dataset_get_blkptr(ds))) {
1740                 rrw_exit(&ds->ds_bp_rwlock, FTAG);
1741                 dsl_dataset_disown(ds, dsflags, dmu_recv_tag);
1742         } else {
1743                 char name[ZFS_MAX_DATASET_NAME_LEN];
1744                 rrw_exit(&ds->ds_bp_rwlock, FTAG);
1745                 dsl_dataset_name(ds, name);
1746                 dsl_dataset_disown(ds, dsflags, dmu_recv_tag);
1747                 (void) dsl_destroy_head(name);
1748         }
1749 }
1750
1751 static void
1752 receive_cksum(struct receive_arg *ra, int len, void *buf)
1753 {
1754         if (ra->byteswap) {
1755                 (void) fletcher_4_incremental_byteswap(buf, len, &ra->cksum);
1756         } else {
1757                 (void) fletcher_4_incremental_native(buf, len, &ra->cksum);
1758         }
1759 }
1760
1761 /*
1762  * Read the payload into a buffer of size len, and update the current record's
1763  * payload field.
1764  * Allocate ra->next_rrd and read the next record's header into
1765  * ra->next_rrd->header.
1766  * Verify checksum of payload and next record.
1767  */
1768 static int
1769 receive_read_payload_and_next_header(struct receive_arg *ra, int len, void *buf)
1770 {
1771         int err;
1772         zio_cksum_t cksum_orig;
1773         zio_cksum_t *cksump;
1774
1775         if (len != 0) {
1776                 ASSERT3U(len, <=, SPA_MAXBLOCKSIZE);
1777                 err = receive_read(ra, len, buf);
1778                 if (err != 0)
1779                         return (err);
1780                 receive_cksum(ra, len, buf);
1781
1782                 /* note: rrd is NULL when reading the begin record's payload */
1783                 if (ra->rrd != NULL) {
1784                         ra->rrd->payload = buf;
1785                         ra->rrd->payload_size = len;
1786                         ra->rrd->bytes_read = ra->bytes_read;
1787                 }
1788         }
1789
1790         ra->prev_cksum = ra->cksum;
1791
1792         ra->next_rrd = kmem_zalloc(sizeof (*ra->next_rrd), KM_SLEEP);
1793         err = receive_read(ra, sizeof (ra->next_rrd->header),
1794             &ra->next_rrd->header);
1795         ra->next_rrd->bytes_read = ra->bytes_read;
1796
1797         if (err != 0) {
1798                 kmem_free(ra->next_rrd, sizeof (*ra->next_rrd));
1799                 ra->next_rrd = NULL;
1800                 return (err);
1801         }
1802         if (ra->next_rrd->header.drr_type == DRR_BEGIN) {
1803                 kmem_free(ra->next_rrd, sizeof (*ra->next_rrd));
1804                 ra->next_rrd = NULL;
1805                 return (SET_ERROR(EINVAL));
1806         }
1807
1808         /*
1809          * Note: checksum is of everything up to but not including the
1810          * checksum itself.
1811          */
1812         ASSERT3U(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
1813             ==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));
1814         receive_cksum(ra,
1815             offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
1816             &ra->next_rrd->header);
1817
1818         cksum_orig = ra->next_rrd->header.drr_u.drr_checksum.drr_checksum;
1819         cksump = &ra->next_rrd->header.drr_u.drr_checksum.drr_checksum;
1820
1821         if (ra->byteswap)
1822                 byteswap_record(&ra->next_rrd->header);
1823
1824         if ((!ZIO_CHECKSUM_IS_ZERO(cksump)) &&
1825             !ZIO_CHECKSUM_EQUAL(ra->cksum, *cksump)) {
1826                 kmem_free(ra->next_rrd, sizeof (*ra->next_rrd));
1827                 ra->next_rrd = NULL;
1828                 return (SET_ERROR(ECKSUM));
1829         }
1830
1831         receive_cksum(ra, sizeof (cksum_orig), &cksum_orig);
1832
1833         return (0);
1834 }
1835
1836 static void
1837 objlist_create(struct objlist *list)
1838 {
1839         list_create(&list->list, sizeof (struct receive_objnode),
1840             offsetof(struct receive_objnode, node));
1841         list->last_lookup = 0;
1842 }
1843
1844 static void
1845 objlist_destroy(struct objlist *list)
1846 {
1847         for (struct receive_objnode *n = list_remove_head(&list->list);
1848             n != NULL; n = list_remove_head(&list->list)) {
1849                 kmem_free(n, sizeof (*n));
1850         }
1851         list_destroy(&list->list);
1852 }
1853
1854 /*
1855  * This function looks through the objlist to see if the specified object number
1856  * is contained in the objlist.  In the process, it will remove all object
1857  * numbers in the list that are smaller than the specified object number.  Thus,
1858  * any lookup of an object number smaller than a previously looked up object
1859  * number will always return false; therefore, all lookups should be done in
1860  * ascending order.
1861  */
1862 static boolean_t
1863 objlist_exists(struct objlist *list, uint64_t object)
1864 {
1865         struct receive_objnode *node = list_head(&list->list);
1866         ASSERT3U(object, >=, list->last_lookup);
1867         list->last_lookup = object;
1868         while (node != NULL && node->object < object) {
1869                 VERIFY3P(node, ==, list_remove_head(&list->list));
1870                 kmem_free(node, sizeof (*node));
1871                 node = list_head(&list->list);
1872         }
1873         return (node != NULL && node->object == object);
1874 }
1875
1876 /*
1877  * The objlist is a list of object numbers stored in ascending order.  However,
1878  * the insertion of new object numbers does not seek out the correct location to
1879  * store a new object number; instead, it appends it to the list for simplicity.
1880  * Thus, any users must take care to only insert new object numbers in ascending
1881  * order.
1882  */
1883 static void
1884 objlist_insert(struct objlist *list, uint64_t object)
1885 {
1886         struct receive_objnode *node = kmem_zalloc(sizeof (*node), KM_SLEEP);
1887         node->object = object;
1888 #ifdef ZFS_DEBUG
1889         {
1890         struct receive_objnode *last_object = list_tail(&list->list);
1891         uint64_t last_objnum = (last_object != NULL ? last_object->object : 0);
1892         ASSERT3U(node->object, >, last_objnum);
1893         }
1894 #endif
1895         list_insert_tail(&list->list, node);
1896 }
1897
1898 /*
1899  * Issue the prefetch reads for any necessary indirect blocks.
1900  *
1901  * We use the object ignore list to tell us whether or not to issue prefetches
1902  * for a given object.  We do this for both correctness (in case the blocksize
1903  * of an object has changed) and performance (if the object doesn't exist, don't
1904  * needlessly try to issue prefetches).  We also trim the list as we go through
1905  * the stream to prevent it from growing to an unbounded size.
1906  *
1907  * The object numbers within will always be in sorted order, and any write
1908  * records we see will also be in sorted order, but they're not sorted with
1909  * respect to each other (i.e. we can get several object records before
1910  * receiving each object's write records).  As a result, once we've reached a
1911  * given object number, we can safely remove any reference to lower object
1912  * numbers in the ignore list. In practice, we receive up to 32 object records
1913  * before receiving write records, so the list can have up to 32 nodes in it.
1914  */
1915 /* ARGSUSED */
1916 static void
1917 receive_read_prefetch(struct receive_arg *ra,
1918     uint64_t object, uint64_t offset, uint64_t length)
1919 {
1920         if (!objlist_exists(&ra->ignore_objlist, object)) {
1921                 dmu_prefetch(ra->os, object, 1, offset, length,
1922                     ZIO_PRIORITY_SYNC_READ);
1923         }
1924 }
1925
1926 /*
1927  * Read records off the stream, issuing any necessary prefetches.
1928  */
1929 static int
1930 receive_read_record(struct receive_arg *ra)
1931 {
1932         int err;
1933
1934         switch (ra->rrd->header.drr_type) {
1935         case DRR_OBJECT:
1936         {
1937                 struct drr_object *drro = &ra->rrd->header.drr_u.drr_object;
1938                 uint32_t size = DRR_OBJECT_PAYLOAD_SIZE(drro);
1939                 void *buf = kmem_zalloc(size, KM_SLEEP);
1940                 dmu_object_info_t doi;
1941
1942                 err = receive_read_payload_and_next_header(ra, size, buf);
1943                 if (err != 0) {
1944                         kmem_free(buf, size);
1945                         return (err);
1946                 }
1947                 err = dmu_object_info(ra->os, drro->drr_object, &doi);
1948                 /*
1949                  * See receive_read_prefetch for an explanation why we're
1950                  * storing this object in the ignore_obj_list.
1951                  */
1952                 if (err == ENOENT || err == EEXIST ||
1953                     (err == 0 && doi.doi_data_block_size != drro->drr_blksz)) {
1954                         objlist_insert(&ra->ignore_objlist, drro->drr_object);
1955                         err = 0;
1956                 }
1957                 return (err);
1958         }
1959         case DRR_FREEOBJECTS:
1960         {
1961                 err = receive_read_payload_and_next_header(ra, 0, NULL);
1962                 return (err);
1963         }
1964         case DRR_WRITE:
1965         {
1966                 struct drr_write *drrw = &ra->rrd->header.drr_u.drr_write;
1967                 arc_buf_t *abuf;
1968                 boolean_t is_meta = DMU_OT_IS_METADATA(drrw->drr_type);
1969
1970                 if (ra->raw) {
1971                         boolean_t byteorder = ZFS_HOST_BYTEORDER ^
1972                             !!DRR_IS_RAW_BYTESWAPPED(drrw->drr_flags) ^
1973                             ra->byteswap;
1974
1975                         abuf = arc_loan_raw_buf(dmu_objset_spa(ra->os),
1976                             drrw->drr_object, byteorder, drrw->drr_salt,
1977                             drrw->drr_iv, drrw->drr_mac, drrw->drr_type,
1978                             drrw->drr_compressed_size, drrw->drr_logical_size,
1979                             drrw->drr_compressiontype);
1980                 } else if (DRR_WRITE_COMPRESSED(drrw)) {
1981                         ASSERT3U(drrw->drr_compressed_size, >, 0);
1982                         ASSERT3U(drrw->drr_logical_size, >=,
1983                             drrw->drr_compressed_size);
1984                         ASSERT(!is_meta);
1985                         abuf = arc_loan_compressed_buf(
1986                             dmu_objset_spa(ra->os),
1987                             drrw->drr_compressed_size, drrw->drr_logical_size,
1988                             drrw->drr_compressiontype);
1989                 } else {
1990                         abuf = arc_loan_buf(dmu_objset_spa(ra->os),
1991                             is_meta, drrw->drr_logical_size);
1992                 }
1993
1994                 err = receive_read_payload_and_next_header(ra,
1995                     DRR_WRITE_PAYLOAD_SIZE(drrw), abuf->b_data);
1996                 if (err != 0) {
1997                         dmu_return_arcbuf(abuf);
1998                         return (err);
1999                 }
2000                 ra->rrd->arc_buf = abuf;
2001                 receive_read_prefetch(ra, drrw->drr_object, drrw->drr_offset,
2002                     drrw->drr_logical_size);
2003                 return (err);
2004         }
2005         case DRR_WRITE_BYREF:
2006         {
2007                 struct drr_write_byref *drrwb =
2008                     &ra->rrd->header.drr_u.drr_write_byref;
2009                 err = receive_read_payload_and_next_header(ra, 0, NULL);
2010                 receive_read_prefetch(ra, drrwb->drr_object, drrwb->drr_offset,
2011                     drrwb->drr_length);
2012                 return (err);
2013         }
2014         case DRR_WRITE_EMBEDDED:
2015         {
2016                 struct drr_write_embedded *drrwe =
2017                     &ra->rrd->header.drr_u.drr_write_embedded;
2018                 uint32_t size = P2ROUNDUP(drrwe->drr_psize, 8);
2019                 void *buf = kmem_zalloc(size, KM_SLEEP);
2020
2021                 err = receive_read_payload_and_next_header(ra, size, buf);
2022                 if (err != 0) {
2023                         kmem_free(buf, size);
2024                         return (err);
2025                 }
2026
2027                 receive_read_prefetch(ra, drrwe->drr_object, drrwe->drr_offset,
2028                     drrwe->drr_length);
2029                 return (err);
2030         }
2031         case DRR_FREE:
2032         {
2033                 /*
2034                  * It might be beneficial to prefetch indirect blocks here, but
2035                  * we don't really have the data to decide for sure.
2036                  */
2037                 err = receive_read_payload_and_next_header(ra, 0, NULL);
2038                 return (err);
2039         }
2040         case DRR_END:
2041         {
2042                 struct drr_end *drre = &ra->rrd->header.drr_u.drr_end;
2043                 if (!ZIO_CHECKSUM_EQUAL(ra->prev_cksum, drre->drr_checksum))
2044                         return (SET_ERROR(ECKSUM));
2045                 return (0);
2046         }
2047         case DRR_SPILL:
2048         {
2049                 struct drr_spill *drrs = &ra->rrd->header.drr_u.drr_spill;
2050                 arc_buf_t *abuf;
2051                 int len = DRR_SPILL_PAYLOAD_SIZE(drrs);
2052
2053                 /* DRR_SPILL records are either raw or uncompressed */
2054                 if (ra->raw) {
2055                         boolean_t byteorder = ZFS_HOST_BYTEORDER ^
2056                             !!DRR_IS_RAW_BYTESWAPPED(drrs->drr_flags) ^
2057                             ra->byteswap;
2058
2059                         abuf = arc_loan_raw_buf(dmu_objset_spa(ra->os),
2060                             dmu_objset_id(ra->os), byteorder, drrs->drr_salt,
2061                             drrs->drr_iv, drrs->drr_mac, drrs->drr_type,
2062                             drrs->drr_compressed_size, drrs->drr_length,
2063                             drrs->drr_compressiontype);
2064                 } else {
2065                         abuf = arc_loan_buf(dmu_objset_spa(ra->os),
2066                             DMU_OT_IS_METADATA(drrs->drr_type),
2067                             drrs->drr_length);
2068                 }
2069
2070                 err = receive_read_payload_and_next_header(ra, len,
2071                     abuf->b_data);
2072                 if (err != 0) {
2073                         dmu_return_arcbuf(abuf);
2074                         return (err);
2075                 }
2076                 ra->rrd->arc_buf = abuf;
2077                 return (err);
2078         }
2079         case DRR_OBJECT_RANGE:
2080         {
2081                 err = receive_read_payload_and_next_header(ra, 0, NULL);
2082                 return (err);
2083         }
2084         default:
2085                 return (SET_ERROR(EINVAL));
2086         }
2087 }
2088
2089 static void
2090 dprintf_drr(struct receive_record_arg *rrd, int err)
2091 {
2092 #ifdef ZFS_DEBUG
2093         switch (rrd->header.drr_type) {
2094         case DRR_OBJECT:
2095         {
2096                 struct drr_object *drro = &rrd->header.drr_u.drr_object;
2097                 dprintf("drr_type = OBJECT obj = %llu type = %u "
2098                     "bonustype = %u blksz = %u bonuslen = %u cksumtype = %u "
2099                     "compress = %u dn_slots = %u err = %d\n",
2100                     drro->drr_object, drro->drr_type,  drro->drr_bonustype,
2101                     drro->drr_blksz, drro->drr_bonuslen,
2102                     drro->drr_checksumtype, drro->drr_compress,
2103                     drro->drr_dn_slots, err);
2104                 break;
2105         }
2106         case DRR_FREEOBJECTS:
2107         {
2108                 struct drr_freeobjects *drrfo =
2109                     &rrd->header.drr_u.drr_freeobjects;
2110                 dprintf("drr_type = FREEOBJECTS firstobj = %llu "
2111                     "numobjs = %llu err = %d\n",
2112                     drrfo->drr_firstobj, drrfo->drr_numobjs, err);
2113                 break;
2114         }
2115         case DRR_WRITE:
2116         {
2117                 struct drr_write *drrw = &rrd->header.drr_u.drr_write;
2118                 dprintf("drr_type = WRITE obj = %llu type = %u offset = %llu "
2119                     "lsize = %llu cksumtype = %u cksumflags = %u "
2120                     "compress = %u psize = %llu err = %d\n",
2121                     drrw->drr_object, drrw->drr_type, drrw->drr_offset,
2122                     drrw->drr_logical_size, drrw->drr_checksumtype,
2123                     drrw->drr_flags, drrw->drr_compressiontype,
2124                     drrw->drr_compressed_size, err);
2125                 break;
2126         }
2127         case DRR_WRITE_BYREF:
2128         {
2129                 struct drr_write_byref *drrwbr =
2130                     &rrd->header.drr_u.drr_write_byref;
2131                 dprintf("drr_type = WRITE_BYREF obj = %llu offset = %llu "
2132                     "length = %llu toguid = %llx refguid = %llx "
2133                     "refobject = %llu refoffset = %llu cksumtype = %u "
2134                     "cksumflags = %u err = %d\n",
2135                     drrwbr->drr_object, drrwbr->drr_offset,
2136                     drrwbr->drr_length, drrwbr->drr_toguid,
2137                     drrwbr->drr_refguid, drrwbr->drr_refobject,
2138                     drrwbr->drr_refoffset, drrwbr->drr_checksumtype,
2139                     drrwbr->drr_flags, err);
2140                 break;
2141         }
2142         case DRR_WRITE_EMBEDDED:
2143         {
2144                 struct drr_write_embedded *drrwe =
2145                     &rrd->header.drr_u.drr_write_embedded;
2146                 dprintf("drr_type = WRITE_EMBEDDED obj = %llu offset = %llu "
2147                     "length = %llu compress = %u etype = %u lsize = %u "
2148                     "psize = %u err = %d\n",
2149                     drrwe->drr_object, drrwe->drr_offset, drrwe->drr_length,
2150                     drrwe->drr_compression, drrwe->drr_etype,
2151                     drrwe->drr_lsize, drrwe->drr_psize, err);
2152                 break;
2153         }
2154         case DRR_FREE:
2155         {
2156                 struct drr_free *drrf = &rrd->header.drr_u.drr_free;
2157                 dprintf("drr_type = FREE obj = %llu offset = %llu "
2158                     "length = %lld err = %d\n",
2159                     drrf->drr_object, drrf->drr_offset, drrf->drr_length,
2160                     err);
2161                 break;
2162         }
2163         case DRR_SPILL:
2164         {
2165                 struct drr_spill *drrs = &rrd->header.drr_u.drr_spill;
2166                 dprintf("drr_type = SPILL obj = %llu length = %llu "
2167                     "err = %d\n", drrs->drr_object, drrs->drr_length, err);
2168                 break;
2169         }
2170         default:
2171                 return;
2172         }
2173 #endif
2174 }
2175
2176 /*
2177  * Commit the records to the pool.
2178  */
2179 static int
2180 receive_process_record(struct receive_writer_arg *rwa,
2181     struct receive_record_arg *rrd)
2182 {
2183         int err;
2184
2185         /* Processing in order, therefore bytes_read should be increasing. */
2186         ASSERT3U(rrd->bytes_read, >=, rwa->bytes_read);
2187         rwa->bytes_read = rrd->bytes_read;
2188
2189         switch (rrd->header.drr_type) {
2190         case DRR_OBJECT:
2191         {
2192                 struct drr_object *drro = &rrd->header.drr_u.drr_object;
2193                 err = receive_object(rwa, drro, rrd->payload);
2194                 kmem_free(rrd->payload, rrd->payload_size);
2195                 rrd->payload = NULL;
2196                 break;
2197         }
2198         case DRR_FREEOBJECTS:
2199         {
2200                 struct drr_freeobjects *drrfo =
2201                     &rrd->header.drr_u.drr_freeobjects;
2202                 err = receive_freeobjects(rwa, drrfo);
2203                 break;
2204         }
2205         case DRR_WRITE:
2206         {
2207                 struct drr_write *drrw = &rrd->header.drr_u.drr_write;
2208                 err = receive_write(rwa, drrw, rrd->arc_buf);
2209                 /* if receive_write() is successful, it consumes the arc_buf */
2210                 if (err != 0)
2211                         dmu_return_arcbuf(rrd->arc_buf);
2212                 rrd->arc_buf = NULL;
2213                 rrd->payload = NULL;
2214                 break;
2215         }
2216         case DRR_WRITE_BYREF:
2217         {
2218                 struct drr_write_byref *drrwbr =
2219                     &rrd->header.drr_u.drr_write_byref;
2220                 err = receive_write_byref(rwa, drrwbr);
2221                 break;
2222         }
2223         case DRR_WRITE_EMBEDDED:
2224         {
2225                 struct drr_write_embedded *drrwe =
2226                     &rrd->header.drr_u.drr_write_embedded;
2227                 err = receive_write_embedded(rwa, drrwe, rrd->payload);
2228                 kmem_free(rrd->payload, rrd->payload_size);
2229                 rrd->payload = NULL;
2230                 break;
2231         }
2232         case DRR_FREE:
2233         {
2234                 struct drr_free *drrf = &rrd->header.drr_u.drr_free;
2235                 err = receive_free(rwa, drrf);
2236                 break;
2237         }
2238         case DRR_SPILL:
2239         {
2240                 struct drr_spill *drrs = &rrd->header.drr_u.drr_spill;
2241                 err = receive_spill(rwa, drrs, rrd->arc_buf);
2242                 /* if receive_spill() is successful, it consumes the arc_buf */
2243                 if (err != 0)
2244                         dmu_return_arcbuf(rrd->arc_buf);
2245                 rrd->arc_buf = NULL;
2246                 rrd->payload = NULL;
2247                 break;
2248         }
2249         case DRR_OBJECT_RANGE:
2250         {
2251                 struct drr_object_range *drror =
2252                     &rrd->header.drr_u.drr_object_range;
2253                 return (receive_object_range(rwa, drror));
2254         }
2255         default:
2256                 return (SET_ERROR(EINVAL));
2257         }
2258
2259         if (err != 0)
2260                 dprintf_drr(rrd, err);
2261
2262         return (err);
2263 }
2264
2265 /*
2266  * dmu_recv_stream's worker thread; pull records off the queue, and then call
2267  * receive_process_record  When we're done, signal the main thread and exit.
2268  */
2269 static void
2270 receive_writer_thread(void *arg)
2271 {
2272         struct receive_writer_arg *rwa = arg;
2273         struct receive_record_arg *rrd;
2274         fstrans_cookie_t cookie = spl_fstrans_mark();
2275
2276         for (rrd = bqueue_dequeue(&rwa->q); !rrd->eos_marker;
2277             rrd = bqueue_dequeue(&rwa->q)) {
2278                 /*
2279                  * If there's an error, the main thread will stop putting things
2280                  * on the queue, but we need to clear everything in it before we
2281                  * can exit.
2282                  */
2283                 if (rwa->err == 0) {
2284                         rwa->err = receive_process_record(rwa, rrd);
2285                 } else if (rrd->arc_buf != NULL) {
2286                         dmu_return_arcbuf(rrd->arc_buf);
2287                         rrd->arc_buf = NULL;
2288                         rrd->payload = NULL;
2289                 } else if (rrd->payload != NULL) {
2290                         kmem_free(rrd->payload, rrd->payload_size);
2291                         rrd->payload = NULL;
2292                 }
2293                 kmem_free(rrd, sizeof (*rrd));
2294         }
2295         kmem_free(rrd, sizeof (*rrd));
2296         mutex_enter(&rwa->mutex);
2297         rwa->done = B_TRUE;
2298         cv_signal(&rwa->cv);
2299         mutex_exit(&rwa->mutex);
2300         spl_fstrans_unmark(cookie);
2301         thread_exit();
2302 }
2303
2304 static int
2305 resume_check(struct receive_arg *ra, nvlist_t *begin_nvl)
2306 {
2307         uint64_t val;
2308         objset_t *mos = dmu_objset_pool(ra->os)->dp_meta_objset;
2309         uint64_t dsobj = dmu_objset_id(ra->os);
2310         uint64_t resume_obj, resume_off;
2311
2312         if (nvlist_lookup_uint64(begin_nvl,
2313             "resume_object", &resume_obj) != 0 ||
2314             nvlist_lookup_uint64(begin_nvl,
2315             "resume_offset", &resume_off) != 0) {
2316                 return (SET_ERROR(EINVAL));
2317         }
2318         VERIFY0(zap_lookup(mos, dsobj,
2319             DS_FIELD_RESUME_OBJECT, sizeof (val), 1, &val));
2320         if (resume_obj != val)
2321                 return (SET_ERROR(EINVAL));
2322         VERIFY0(zap_lookup(mos, dsobj,
2323             DS_FIELD_RESUME_OFFSET, sizeof (val), 1, &val));
2324         if (resume_off != val)
2325                 return (SET_ERROR(EINVAL));
2326
2327         return (0);
2328 }
2329
2330 /*
2331  * Read in the stream's records, one by one, and apply them to the pool.  There
2332  * are two threads involved; the thread that calls this function will spin up a
2333  * worker thread, read the records off the stream one by one, and issue
2334  * prefetches for any necessary indirect blocks.  It will then push the records
2335  * onto an internal blocking queue.  The worker thread will pull the records off
2336  * the queue, and actually write the data into the DMU.  This way, the worker
2337  * thread doesn't have to wait for reads to complete, since everything it needs
2338  * (the indirect blocks) will be prefetched.
2339  *
2340  * NB: callers *must* call dmu_recv_end() if this succeeds.
2341  */
2342 int
2343 dmu_recv_stream(dmu_recv_cookie_t *drc, vnode_t *vp, offset_t *voffp,
2344     int cleanup_fd, uint64_t *action_handlep)
2345 {
2346         int err = 0;
2347         struct receive_arg *ra;
2348         struct receive_writer_arg *rwa;
2349         int featureflags;
2350         uint32_t payloadlen;
2351         void *payload;
2352         nvlist_t *begin_nvl = NULL;
2353
2354         ra = kmem_zalloc(sizeof (*ra), KM_SLEEP);
2355         rwa = kmem_zalloc(sizeof (*rwa), KM_SLEEP);
2356
2357         ra->byteswap = drc->drc_byteswap;
2358         ra->raw = drc->drc_raw;
2359         ra->cksum = drc->drc_cksum;
2360         ra->vp = vp;
2361         ra->voff = *voffp;
2362
2363         if (dsl_dataset_is_zapified(drc->drc_ds)) {
2364                 (void) zap_lookup(drc->drc_ds->ds_dir->dd_pool->dp_meta_objset,
2365                     drc->drc_ds->ds_object, DS_FIELD_RESUME_BYTES,
2366                     sizeof (ra->bytes_read), 1, &ra->bytes_read);
2367         }
2368
2369         objlist_create(&ra->ignore_objlist);
2370
2371         /* these were verified in dmu_recv_begin */
2372         ASSERT3U(DMU_GET_STREAM_HDRTYPE(drc->drc_drrb->drr_versioninfo), ==,
2373             DMU_SUBSTREAM);
2374         ASSERT3U(drc->drc_drrb->drr_type, <, DMU_OST_NUMTYPES);
2375
2376         /*
2377          * Open the objset we are modifying.
2378          */
2379         VERIFY0(dmu_objset_from_ds(drc->drc_ds, &ra->os));
2380
2381         ASSERT(dsl_dataset_phys(drc->drc_ds)->ds_flags & DS_FLAG_INCONSISTENT);
2382
2383         featureflags = DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo);
2384         ra->featureflags = featureflags;
2385
2386         ASSERT0(ra->os->os_encrypted &&
2387             (featureflags & DMU_BACKUP_FEATURE_EMBED_DATA));
2388
2389         /* if this stream is dedup'ed, set up the avl tree for guid mapping */
2390         if (featureflags & DMU_BACKUP_FEATURE_DEDUP) {
2391                 minor_t minor;
2392
2393                 if (cleanup_fd == -1) {
2394                         err = SET_ERROR(EBADF);
2395                         goto out;
2396                 }
2397                 err = zfs_onexit_fd_hold(cleanup_fd, &minor);
2398                 if (err != 0) {
2399                         cleanup_fd = -1;
2400                         goto out;
2401                 }
2402
2403                 if (*action_handlep == 0) {
2404                         rwa->guid_to_ds_map =
2405                             kmem_alloc(sizeof (avl_tree_t), KM_SLEEP);
2406                         avl_create(rwa->guid_to_ds_map, guid_compare,
2407                             sizeof (guid_map_entry_t),
2408                             offsetof(guid_map_entry_t, avlnode));
2409                         err = zfs_onexit_add_cb(minor,
2410                             free_guid_map_onexit, rwa->guid_to_ds_map,
2411                             action_handlep);
2412                         if (err != 0)
2413                                 goto out;
2414                 } else {
2415                         err = zfs_onexit_cb_data(minor, *action_handlep,
2416                             (void **)&rwa->guid_to_ds_map);
2417                         if (err != 0)
2418                                 goto out;
2419                 }
2420
2421                 drc->drc_guid_to_ds_map = rwa->guid_to_ds_map;
2422         }
2423
2424         payloadlen = drc->drc_drr_begin->drr_payloadlen;
2425         payload = NULL;
2426         if (payloadlen != 0)
2427                 payload = kmem_alloc(payloadlen, KM_SLEEP);
2428
2429         err = receive_read_payload_and_next_header(ra, payloadlen, payload);
2430         if (err != 0) {
2431                 if (payloadlen != 0)
2432                         kmem_free(payload, payloadlen);
2433                 goto out;
2434         }
2435         if (payloadlen != 0) {
2436                 err = nvlist_unpack(payload, payloadlen, &begin_nvl, KM_SLEEP);
2437                 kmem_free(payload, payloadlen);
2438                 if (err != 0)
2439                         goto out;
2440         }
2441
2442         /* handle DSL encryption key payload */
2443         if (featureflags & DMU_BACKUP_FEATURE_RAW) {
2444                 nvlist_t *keynvl = NULL;
2445
2446                 ASSERT(ra->os->os_encrypted);
2447                 ASSERT(drc->drc_raw);
2448
2449                 err = nvlist_lookup_nvlist(begin_nvl, "crypt_keydata", &keynvl);
2450                 if (err != 0)
2451                         goto out;
2452
2453                 /*
2454                  * If this is a new dataset we set the key immediately.
2455                  * Otherwise we don't want to change the key until we
2456                  * are sure the rest of the receive succeeded so we stash
2457                  * the keynvl away until then.
2458                  */
2459                 err = dsl_crypto_recv_raw(spa_name(ra->os->os_spa),
2460                     drc->drc_ds->ds_object, drc->drc_drrb->drr_type,
2461                     keynvl, drc->drc_newfs);
2462                 if (err != 0)
2463                         goto out;
2464
2465                 if (!drc->drc_newfs)
2466                         drc->drc_keynvl = fnvlist_dup(keynvl);
2467         }
2468
2469         if (featureflags & DMU_BACKUP_FEATURE_RESUMING) {
2470                 err = resume_check(ra, begin_nvl);
2471                 if (err != 0)
2472                         goto out;
2473         }
2474
2475         (void) bqueue_init(&rwa->q,
2476             MAX(zfs_recv_queue_length, 2 * zfs_max_recordsize),
2477             offsetof(struct receive_record_arg, node));
2478         cv_init(&rwa->cv, NULL, CV_DEFAULT, NULL);
2479         mutex_init(&rwa->mutex, NULL, MUTEX_DEFAULT, NULL);
2480         rwa->os = ra->os;
2481         rwa->byteswap = drc->drc_byteswap;
2482         rwa->resumable = drc->drc_resumable;
2483         rwa->raw = drc->drc_raw;
2484         rwa->os->os_raw_receive = drc->drc_raw;
2485
2486         (void) thread_create(NULL, 0, receive_writer_thread, rwa, 0, curproc,
2487             TS_RUN, minclsyspri);
2488         /*
2489          * We're reading rwa->err without locks, which is safe since we are the
2490          * only reader, and the worker thread is the only writer.  It's ok if we
2491          * miss a write for an iteration or two of the loop, since the writer
2492          * thread will keep freeing records we send it until we send it an eos
2493          * marker.
2494          *
2495          * We can leave this loop in 3 ways:  First, if rwa->err is
2496          * non-zero.  In that case, the writer thread will free the rrd we just
2497          * pushed.  Second, if  we're interrupted; in that case, either it's the
2498          * first loop and ra->rrd was never allocated, or it's later and ra->rrd
2499          * has been handed off to the writer thread who will free it.  Finally,
2500          * if receive_read_record fails or we're at the end of the stream, then
2501          * we free ra->rrd and exit.
2502          */
2503         while (rwa->err == 0) {
2504                 if (issig(JUSTLOOKING) && issig(FORREAL)) {
2505                         err = SET_ERROR(EINTR);
2506                         break;
2507                 }
2508
2509                 ASSERT3P(ra->rrd, ==, NULL);
2510                 ra->rrd = ra->next_rrd;
2511                 ra->next_rrd = NULL;
2512                 /* Allocates and loads header into ra->next_rrd */
2513                 err = receive_read_record(ra);
2514
2515                 if (ra->rrd->header.drr_type == DRR_END || err != 0) {
2516                         kmem_free(ra->rrd, sizeof (*ra->rrd));
2517                         ra->rrd = NULL;
2518                         break;
2519                 }
2520
2521                 bqueue_enqueue(&rwa->q, ra->rrd,
2522                     sizeof (struct receive_record_arg) + ra->rrd->payload_size);
2523                 ra->rrd = NULL;
2524         }
2525         if (ra->next_rrd == NULL)
2526                 ra->next_rrd = kmem_zalloc(sizeof (*ra->next_rrd), KM_SLEEP);
2527         ra->next_rrd->eos_marker = B_TRUE;
2528         bqueue_enqueue(&rwa->q, ra->next_rrd, 1);
2529
2530         mutex_enter(&rwa->mutex);
2531         while (!rwa->done) {
2532                 cv_wait(&rwa->cv, &rwa->mutex);
2533         }
2534         mutex_exit(&rwa->mutex);
2535
2536         /*
2537          * If we are receiving a full stream as a clone, all object IDs which
2538          * are greater than the maximum ID referenced in the stream are
2539          * by definition unused and must be freed.
2540          */
2541         if (drc->drc_clone && drc->drc_drrb->drr_fromguid == 0) {
2542                 uint64_t obj = rwa->max_object + 1;
2543                 int free_err = 0;
2544                 int next_err = 0;
2545
2546                 while (next_err == 0) {
2547                         free_err = dmu_free_long_object(rwa->os, obj);
2548                         if (free_err != 0 && free_err != ENOENT)
2549                                 break;
2550
2551                         next_err = dmu_object_next(rwa->os, &obj, FALSE, 0);
2552                 }
2553
2554                 if (err == 0) {
2555                         if (free_err != 0 && free_err != ENOENT)
2556                                 err = free_err;
2557                         else if (next_err != ESRCH)
2558                                 err = next_err;
2559                 }
2560         }
2561
2562         cv_destroy(&rwa->cv);
2563         mutex_destroy(&rwa->mutex);
2564         bqueue_destroy(&rwa->q);
2565         if (err == 0)
2566                 err = rwa->err;
2567
2568 out:
2569         nvlist_free(begin_nvl);
2570         if ((featureflags & DMU_BACKUP_FEATURE_DEDUP) && (cleanup_fd != -1))
2571                 zfs_onexit_fd_rele(cleanup_fd);
2572
2573         if (err != 0) {
2574                 /*
2575                  * Clean up references. If receive is not resumable,
2576                  * destroy what we created, so we don't leave it in
2577                  * the inconsistent state.
2578                  */
2579                 dmu_recv_cleanup_ds(drc);
2580                 nvlist_free(drc->drc_keynvl);
2581         }
2582
2583         *voffp = ra->voff;
2584         objlist_destroy(&ra->ignore_objlist);
2585         kmem_free(ra, sizeof (*ra));
2586         kmem_free(rwa, sizeof (*rwa));
2587         return (err);
2588 }
2589
2590 static int
2591 dmu_recv_end_check(void *arg, dmu_tx_t *tx)
2592 {
2593         dmu_recv_cookie_t *drc = arg;
2594         dsl_pool_t *dp = dmu_tx_pool(tx);
2595         int error;
2596
2597         ASSERT3P(drc->drc_ds->ds_owner, ==, dmu_recv_tag);
2598
2599         if (!drc->drc_newfs) {
2600                 dsl_dataset_t *origin_head;
2601
2602                 error = dsl_dataset_hold(dp, drc->drc_tofs, FTAG, &origin_head);
2603                 if (error != 0)
2604                         return (error);
2605                 if (drc->drc_force) {
2606                         /*
2607                          * We will destroy any snapshots in tofs (i.e. before
2608                          * origin_head) that are after the origin (which is
2609                          * the snap before drc_ds, because drc_ds can not
2610                          * have any snaps of its own).
2611                          */
2612                         uint64_t obj;
2613
2614                         obj = dsl_dataset_phys(origin_head)->ds_prev_snap_obj;
2615                         while (obj !=
2616                             dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj) {
2617                                 dsl_dataset_t *snap;
2618                                 error = dsl_dataset_hold_obj(dp, obj, FTAG,
2619                                     &snap);
2620                                 if (error != 0)
2621                                         break;
2622                                 if (snap->ds_dir != origin_head->ds_dir)
2623                                         error = SET_ERROR(EINVAL);
2624                                 if (error == 0)  {
2625                                         error = dsl_destroy_snapshot_check_impl(
2626                                             snap, B_FALSE);
2627                                 }
2628                                 obj = dsl_dataset_phys(snap)->ds_prev_snap_obj;
2629                                 dsl_dataset_rele(snap, FTAG);
2630                                 if (error != 0)
2631                                         break;
2632                         }
2633                         if (error != 0) {
2634                                 dsl_dataset_rele(origin_head, FTAG);
2635                                 return (error);
2636                         }
2637                 }
2638                 if (drc->drc_keynvl != NULL) {
2639                         error = dsl_crypto_recv_raw_key_check(drc->drc_ds,
2640                             drc->drc_keynvl, tx);
2641                         if (error != 0) {
2642                                 dsl_dataset_rele(origin_head, FTAG);
2643                                 return (error);
2644                         }
2645                 }
2646
2647                 error = dsl_dataset_clone_swap_check_impl(drc->drc_ds,
2648                     origin_head, drc->drc_force, drc->drc_owner, tx);
2649                 if (error != 0) {
2650                         dsl_dataset_rele(origin_head, FTAG);
2651                         return (error);
2652                 }
2653                 error = dsl_dataset_snapshot_check_impl(origin_head,
2654                     drc->drc_tosnap, tx, B_TRUE, 1, drc->drc_cred);
2655                 dsl_dataset_rele(origin_head, FTAG);
2656                 if (error != 0)
2657                         return (error);
2658
2659                 error = dsl_destroy_head_check_impl(drc->drc_ds, 1);
2660         } else {
2661                 error = dsl_dataset_snapshot_check_impl(drc->drc_ds,
2662                     drc->drc_tosnap, tx, B_TRUE, 1, drc->drc_cred);
2663         }
2664         return (error);
2665 }
2666
2667 static void
2668 dmu_recv_end_sync(void *arg, dmu_tx_t *tx)
2669 {
2670         dmu_recv_cookie_t *drc = arg;
2671         dsl_pool_t *dp = dmu_tx_pool(tx);
2672         boolean_t encrypted = drc->drc_ds->ds_dir->dd_crypto_obj != 0;
2673
2674         spa_history_log_internal_ds(drc->drc_ds, "finish receiving",
2675             tx, "snap=%s", drc->drc_tosnap);
2676         drc->drc_ds->ds_objset->os_raw_receive = B_FALSE;
2677
2678         if (!drc->drc_newfs) {
2679                 dsl_dataset_t *origin_head;
2680
2681                 VERIFY0(dsl_dataset_hold(dp, drc->drc_tofs, FTAG,
2682                     &origin_head));
2683
2684                 if (drc->drc_force) {
2685                         /*
2686                          * Destroy any snapshots of drc_tofs (origin_head)
2687                          * after the origin (the snap before drc_ds).
2688                          */
2689                         uint64_t obj;
2690
2691                         obj = dsl_dataset_phys(origin_head)->ds_prev_snap_obj;
2692                         while (obj !=
2693                             dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj) {
2694                                 dsl_dataset_t *snap;
2695                                 VERIFY0(dsl_dataset_hold_obj(dp, obj, FTAG,
2696                                     &snap));
2697                                 ASSERT3P(snap->ds_dir, ==, origin_head->ds_dir);
2698                                 obj = dsl_dataset_phys(snap)->ds_prev_snap_obj;
2699                                 dsl_destroy_snapshot_sync_impl(snap,
2700                                     B_FALSE, tx);
2701                                 dsl_dataset_rele(snap, FTAG);
2702                         }
2703                 }
2704                 if (drc->drc_keynvl != NULL) {
2705                         dsl_crypto_recv_raw_key_sync(drc->drc_ds,
2706                             drc->drc_keynvl, tx);
2707                         nvlist_free(drc->drc_keynvl);
2708                         drc->drc_keynvl = NULL;
2709                 }
2710
2711                 VERIFY3P(drc->drc_ds->ds_prev, ==, origin_head->ds_prev);
2712
2713                 dsl_dataset_clone_swap_sync_impl(drc->drc_ds,
2714                     origin_head, tx);
2715                 dsl_dataset_snapshot_sync_impl(origin_head,
2716                     drc->drc_tosnap, tx);
2717
2718                 /* set snapshot's creation time and guid */
2719                 dmu_buf_will_dirty(origin_head->ds_prev->ds_dbuf, tx);
2720                 dsl_dataset_phys(origin_head->ds_prev)->ds_creation_time =
2721                     drc->drc_drrb->drr_creation_time;
2722                 dsl_dataset_phys(origin_head->ds_prev)->ds_guid =
2723                     drc->drc_drrb->drr_toguid;
2724                 dsl_dataset_phys(origin_head->ds_prev)->ds_flags &=
2725                     ~DS_FLAG_INCONSISTENT;
2726
2727                 dmu_buf_will_dirty(origin_head->ds_dbuf, tx);
2728                 dsl_dataset_phys(origin_head)->ds_flags &=
2729                     ~DS_FLAG_INCONSISTENT;
2730
2731                 drc->drc_newsnapobj =
2732                     dsl_dataset_phys(origin_head)->ds_prev_snap_obj;
2733
2734                 dsl_dataset_rele(origin_head, FTAG);
2735                 dsl_destroy_head_sync_impl(drc->drc_ds, tx);
2736
2737                 if (drc->drc_owner != NULL)
2738                         VERIFY3P(origin_head->ds_owner, ==, drc->drc_owner);
2739         } else {
2740                 dsl_dataset_t *ds = drc->drc_ds;
2741
2742                 dsl_dataset_snapshot_sync_impl(ds, drc->drc_tosnap, tx);
2743
2744                 /* set snapshot's creation time and guid */
2745                 dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
2746                 dsl_dataset_phys(ds->ds_prev)->ds_creation_time =
2747                     drc->drc_drrb->drr_creation_time;
2748                 dsl_dataset_phys(ds->ds_prev)->ds_guid =
2749                     drc->drc_drrb->drr_toguid;
2750                 dsl_dataset_phys(ds->ds_prev)->ds_flags &=
2751                     ~DS_FLAG_INCONSISTENT;
2752
2753                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
2754                 dsl_dataset_phys(ds)->ds_flags &= ~DS_FLAG_INCONSISTENT;
2755                 if (dsl_dataset_has_resume_receive_state(ds)) {
2756                         (void) zap_remove(dp->dp_meta_objset, ds->ds_object,
2757                             DS_FIELD_RESUME_FROMGUID, tx);
2758                         (void) zap_remove(dp->dp_meta_objset, ds->ds_object,
2759                             DS_FIELD_RESUME_OBJECT, tx);
2760                         (void) zap_remove(dp->dp_meta_objset, ds->ds_object,
2761                             DS_FIELD_RESUME_OFFSET, tx);
2762                         (void) zap_remove(dp->dp_meta_objset, ds->ds_object,
2763                             DS_FIELD_RESUME_BYTES, tx);
2764                         (void) zap_remove(dp->dp_meta_objset, ds->ds_object,
2765                             DS_FIELD_RESUME_TOGUID, tx);
2766                         (void) zap_remove(dp->dp_meta_objset, ds->ds_object,
2767                             DS_FIELD_RESUME_TONAME, tx);
2768                 }
2769                 drc->drc_newsnapobj =
2770                     dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj;
2771         }
2772         zvol_create_minors(dp->dp_spa, drc->drc_tofs, B_TRUE);
2773
2774         /*
2775          * Release the hold from dmu_recv_begin.  This must be done before
2776          * we return to open context, so that when we free the dataset's dnode
2777          * we can evict its bonus buffer. Since the dataset may be destroyed
2778          * at this point (and therefore won't have a valid pointer to the spa)
2779          * we release the key mapping manually here while we do have a valid
2780          * pointer, if it exists.
2781          */
2782         if (!drc->drc_raw && encrypted) {
2783                 (void) spa_keystore_remove_mapping(dmu_tx_pool(tx)->dp_spa,
2784                     drc->drc_ds->ds_object, drc->drc_ds);
2785         }
2786         dsl_dataset_disown(drc->drc_ds, 0, dmu_recv_tag);
2787         drc->drc_ds = NULL;
2788 }
2789
2790 static int
2791 add_ds_to_guidmap(const char *name, avl_tree_t *guid_map, uint64_t snapobj,
2792     boolean_t raw)
2793 {
2794         dsl_pool_t *dp;
2795         dsl_dataset_t *snapds;
2796         guid_map_entry_t *gmep;
2797         objset_t *os;
2798         ds_hold_flags_t dsflags = (raw) ? 0 : DS_HOLD_FLAG_DECRYPT;
2799         int err;
2800
2801         ASSERT(guid_map != NULL);
2802
2803         err = dsl_pool_hold(name, FTAG, &dp);
2804         if (err != 0)
2805                 return (err);
2806         gmep = kmem_alloc(sizeof (*gmep), KM_SLEEP);
2807         err = dsl_dataset_own_obj(dp, snapobj, dsflags, gmep, &snapds);
2808         if (err == 0) {
2809                 /*
2810                  * If this is a deduplicated raw send stream, we need
2811                  * to make sure that we can still read raw blocks from
2812                  * earlier datasets in the stream, so we set the
2813                  * os_raw_receive flag now.
2814                  */
2815                 if (raw) {
2816                         err = dmu_objset_from_ds(snapds, &os);
2817                         if (err != 0) {
2818                                 dsl_dataset_disown(snapds, dsflags, FTAG);
2819                                 dsl_pool_rele(dp, FTAG);
2820                                 kmem_free(gmep, sizeof (*gmep));
2821                                 return (err);
2822                         }
2823                         os->os_raw_receive = B_TRUE;
2824                 }
2825
2826                 gmep->raw = raw;
2827                 gmep->guid = dsl_dataset_phys(snapds)->ds_guid;
2828                 gmep->gme_ds = snapds;
2829                 avl_add(guid_map, gmep);
2830         } else {
2831                 kmem_free(gmep, sizeof (*gmep));
2832         }
2833
2834         dsl_pool_rele(dp, FTAG);
2835         return (err);
2836 }
2837
2838 static int dmu_recv_end_modified_blocks = 3;
2839
2840 static int
2841 dmu_recv_existing_end(dmu_recv_cookie_t *drc)
2842 {
2843 #ifdef _KERNEL
2844         /*
2845          * We will be destroying the ds; make sure its origin is unmounted if
2846          * necessary.
2847          */
2848         char name[ZFS_MAX_DATASET_NAME_LEN];
2849         dsl_dataset_name(drc->drc_ds, name);
2850         zfs_destroy_unmount_origin(name);
2851 #endif
2852
2853         return (dsl_sync_task(drc->drc_tofs,
2854             dmu_recv_end_check, dmu_recv_end_sync, drc,
2855             dmu_recv_end_modified_blocks, ZFS_SPACE_CHECK_NORMAL));
2856 }
2857
2858 static int
2859 dmu_recv_new_end(dmu_recv_cookie_t *drc)
2860 {
2861         return (dsl_sync_task(drc->drc_tofs,
2862             dmu_recv_end_check, dmu_recv_end_sync, drc,
2863             dmu_recv_end_modified_blocks, ZFS_SPACE_CHECK_NORMAL));
2864 }
2865
2866 int
2867 dmu_recv_end(dmu_recv_cookie_t *drc, void *owner)
2868 {
2869         int error;
2870
2871         drc->drc_owner = owner;
2872
2873         if (drc->drc_newfs)
2874                 error = dmu_recv_new_end(drc);
2875         else
2876                 error = dmu_recv_existing_end(drc);
2877
2878         if (error != 0) {
2879                 dmu_recv_cleanup_ds(drc);
2880                 nvlist_free(drc->drc_keynvl);
2881         } else if (drc->drc_guid_to_ds_map != NULL) {
2882                 (void) add_ds_to_guidmap(drc->drc_tofs, drc->drc_guid_to_ds_map,
2883                     drc->drc_newsnapobj, drc->drc_raw);
2884         }
2885         return (error);
2886 }
2887
2888 /*
2889  * Return TRUE if this objset is currently being received into.
2890  */
2891 boolean_t
2892 dmu_objset_is_receiving(objset_t *os)
2893 {
2894         return (os->os_dsl_dataset != NULL &&
2895             os->os_dsl_dataset->ds_owner == dmu_recv_tag);
2896 }
2897
2898 #if defined(_KERNEL)
2899 module_param(zfs_recv_queue_length, int, 0644);
2900 MODULE_PARM_DESC(zfs_recv_queue_length, "Maximum receive queue length");
2901 #endif