]> granicus.if.org Git - zfs/blob - module/zfs/dmu_recv.c
Provide more flexible object allocation interface
[zfs] / module / zfs / dmu_recv.c
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
23  * Copyright 2011 Nexenta Systems, Inc. All rights reserved.
24  * Copyright (c) 2011, 2015 by Delphix. All rights reserved.
25  * Copyright (c) 2014, Joyent, Inc. All rights reserved.
26  * Copyright 2014 HybridCluster. All rights reserved.
27  * Copyright 2016 RackTop Systems.
28  * Copyright (c) 2016 Actifio, Inc. All rights reserved.
29  */
30
31 #include <sys/dmu.h>
32 #include <sys/dmu_impl.h>
33 #include <sys/dmu_tx.h>
34 #include <sys/dbuf.h>
35 #include <sys/dnode.h>
36 #include <sys/zfs_context.h>
37 #include <sys/dmu_objset.h>
38 #include <sys/dmu_traverse.h>
39 #include <sys/dsl_dataset.h>
40 #include <sys/dsl_dir.h>
41 #include <sys/dsl_prop.h>
42 #include <sys/dsl_pool.h>
43 #include <sys/dsl_synctask.h>
44 #include <sys/spa_impl.h>
45 #include <sys/zfs_ioctl.h>
46 #include <sys/zap.h>
47 #include <sys/zio_checksum.h>
48 #include <sys/zfs_znode.h>
49 #include <zfs_fletcher.h>
50 #include <sys/avl.h>
51 #include <sys/ddt.h>
52 #include <sys/zfs_onexit.h>
53 #include <sys/dmu_recv.h>
54 #include <sys/dsl_destroy.h>
55 #include <sys/blkptr.h>
56 #include <sys/dsl_bookmark.h>
57 #include <sys/zfeature.h>
58 #include <sys/bqueue.h>
59 #include <sys/zvol.h>
60 #include <sys/policy.h>
61
62 int zfs_recv_queue_length = SPA_MAXBLOCKSIZE;
63
64 static char *dmu_recv_tag = "dmu_recv_tag";
65 const char *recv_clone_name = "%recv";
66
67 static void byteswap_record(dmu_replay_record_t *drr);
68
69 typedef struct dmu_recv_begin_arg {
70         const char *drba_origin;
71         dmu_recv_cookie_t *drba_cookie;
72         cred_t *drba_cred;
73         dsl_crypto_params_t *drba_dcp;
74         uint64_t drba_snapobj;
75 } dmu_recv_begin_arg_t;
76
77 static int
78 recv_begin_check_existing_impl(dmu_recv_begin_arg_t *drba, dsl_dataset_t *ds,
79     uint64_t fromguid, uint64_t featureflags)
80 {
81         uint64_t val;
82         int error;
83         dsl_pool_t *dp = ds->ds_dir->dd_pool;
84         boolean_t encrypted = ds->ds_dir->dd_crypto_obj != 0;
85         boolean_t raw = (featureflags & DMU_BACKUP_FEATURE_RAW) != 0;
86         boolean_t embed = (featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) != 0;
87
88         /* temporary clone name must not exist */
89         error = zap_lookup(dp->dp_meta_objset,
90             dsl_dir_phys(ds->ds_dir)->dd_child_dir_zapobj, recv_clone_name,
91             8, 1, &val);
92         if (error != ENOENT)
93                 return (error == 0 ? EBUSY : error);
94
95         /* new snapshot name must not exist */
96         error = zap_lookup(dp->dp_meta_objset,
97             dsl_dataset_phys(ds)->ds_snapnames_zapobj,
98             drba->drba_cookie->drc_tosnap, 8, 1, &val);
99         if (error != ENOENT)
100                 return (error == 0 ? EEXIST : error);
101
102         /*
103          * Check snapshot limit before receiving. We'll recheck again at the
104          * end, but might as well abort before receiving if we're already over
105          * the limit.
106          *
107          * Note that we do not check the file system limit with
108          * dsl_dir_fscount_check because the temporary %clones don't count
109          * against that limit.
110          */
111         error = dsl_fs_ss_limit_check(ds->ds_dir, 1, ZFS_PROP_SNAPSHOT_LIMIT,
112             NULL, drba->drba_cred);
113         if (error != 0)
114                 return (error);
115
116         if (fromguid != 0) {
117                 dsl_dataset_t *snap;
118                 uint64_t obj = dsl_dataset_phys(ds)->ds_prev_snap_obj;
119
120                 /* Can't perform a raw receive on top of a non-raw receive */
121                 if (!encrypted && raw)
122                         return (SET_ERROR(EINVAL));
123
124                 /* Encryption is incompatible with embedded data */
125                 if (encrypted && embed)
126                         return (SET_ERROR(EINVAL));
127
128                 /* Find snapshot in this dir that matches fromguid. */
129                 while (obj != 0) {
130                         error = dsl_dataset_hold_obj(dp, obj, FTAG,
131                             &snap);
132                         if (error != 0)
133                                 return (SET_ERROR(ENODEV));
134                         if (snap->ds_dir != ds->ds_dir) {
135                                 dsl_dataset_rele(snap, FTAG);
136                                 return (SET_ERROR(ENODEV));
137                         }
138                         if (dsl_dataset_phys(snap)->ds_guid == fromguid)
139                                 break;
140                         obj = dsl_dataset_phys(snap)->ds_prev_snap_obj;
141                         dsl_dataset_rele(snap, FTAG);
142                 }
143                 if (obj == 0)
144                         return (SET_ERROR(ENODEV));
145
146                 if (drba->drba_cookie->drc_force) {
147                         drba->drba_snapobj = obj;
148                 } else {
149                         /*
150                          * If we are not forcing, there must be no
151                          * changes since fromsnap.
152                          */
153                         if (dsl_dataset_modified_since_snap(ds, snap)) {
154                                 dsl_dataset_rele(snap, FTAG);
155                                 return (SET_ERROR(ETXTBSY));
156                         }
157                         drba->drba_snapobj = ds->ds_prev->ds_object;
158                 }
159
160                 dsl_dataset_rele(snap, FTAG);
161         } else {
162                 /* if full, then must be forced */
163                 if (!drba->drba_cookie->drc_force)
164                         return (SET_ERROR(EEXIST));
165
166                 /*
167                  * We don't support using zfs recv -F to blow away
168                  * encrypted filesystems. This would require the
169                  * dsl dir to point to the old encryption key and
170                  * the new one at the same time during the receive.
171                  */
172                 if ((!encrypted && raw) || encrypted)
173                         return (SET_ERROR(EINVAL));
174
175                 /*
176                  * Perform the same encryption checks we would if
177                  * we were creating a new dataset from scratch.
178                  */
179                 if (!raw) {
180                         boolean_t will_encrypt;
181
182                         error = dmu_objset_create_crypt_check(
183                             ds->ds_dir->dd_parent, drba->drba_dcp,
184                             &will_encrypt);
185                         if (error != 0)
186                                 return (error);
187
188                         if (will_encrypt && embed)
189                                 return (SET_ERROR(EINVAL));
190                 }
191
192                 drba->drba_snapobj = 0;
193         }
194
195         return (0);
196
197 }
198
199 static int
200 dmu_recv_begin_check(void *arg, dmu_tx_t *tx)
201 {
202         dmu_recv_begin_arg_t *drba = arg;
203         dsl_pool_t *dp = dmu_tx_pool(tx);
204         struct drr_begin *drrb = drba->drba_cookie->drc_drrb;
205         uint64_t fromguid = drrb->drr_fromguid;
206         int flags = drrb->drr_flags;
207         ds_hold_flags_t dsflags = 0;
208         int error;
209         uint64_t featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo);
210         dsl_dataset_t *ds;
211         const char *tofs = drba->drba_cookie->drc_tofs;
212
213         /* already checked */
214         ASSERT3U(drrb->drr_magic, ==, DMU_BACKUP_MAGIC);
215         ASSERT(!(featureflags & DMU_BACKUP_FEATURE_RESUMING));
216
217         if (DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) ==
218             DMU_COMPOUNDSTREAM ||
219             drrb->drr_type >= DMU_OST_NUMTYPES ||
220             ((flags & DRR_FLAG_CLONE) && drba->drba_origin == NULL))
221                 return (SET_ERROR(EINVAL));
222
223         /* Verify pool version supports SA if SA_SPILL feature set */
224         if ((featureflags & DMU_BACKUP_FEATURE_SA_SPILL) &&
225             spa_version(dp->dp_spa) < SPA_VERSION_SA)
226                 return (SET_ERROR(ENOTSUP));
227
228         if (drba->drba_cookie->drc_resumable &&
229             !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EXTENSIBLE_DATASET))
230                 return (SET_ERROR(ENOTSUP));
231
232         /*
233          * The receiving code doesn't know how to translate a WRITE_EMBEDDED
234          * record to a plain WRITE record, so the pool must have the
235          * EMBEDDED_DATA feature enabled if the stream has WRITE_EMBEDDED
236          * records.  Same with WRITE_EMBEDDED records that use LZ4 compression.
237          */
238         if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) &&
239             !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA))
240                 return (SET_ERROR(ENOTSUP));
241         if ((featureflags & DMU_BACKUP_FEATURE_LZ4) &&
242             !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS))
243                 return (SET_ERROR(ENOTSUP));
244
245         /*
246          * The receiving code doesn't know how to translate large blocks
247          * to smaller ones, so the pool must have the LARGE_BLOCKS
248          * feature enabled if the stream has LARGE_BLOCKS. Same with
249          * large dnodes.
250          */
251         if ((featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) &&
252             !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LARGE_BLOCKS))
253                 return (SET_ERROR(ENOTSUP));
254         if ((featureflags & DMU_BACKUP_FEATURE_LARGE_DNODE) &&
255             !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LARGE_DNODE))
256                 return (SET_ERROR(ENOTSUP));
257
258         if (featureflags & DMU_BACKUP_FEATURE_RAW) {
259                 /* raw receives require the encryption feature */
260                 if (!spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_ENCRYPTION))
261                         return (SET_ERROR(ENOTSUP));
262
263                 /* embedded data is incompatible with encryption and raw recv */
264                 if (featureflags & DMU_BACKUP_FEATURE_EMBED_DATA)
265                         return (SET_ERROR(EINVAL));
266         } else {
267                 dsflags |= DS_HOLD_FLAG_DECRYPT;
268         }
269
270         error = dsl_dataset_hold_flags(dp, tofs, dsflags, FTAG, &ds);
271         if (error == 0) {
272                 /* target fs already exists; recv into temp clone */
273
274                 /* Can't recv a clone into an existing fs */
275                 if (flags & DRR_FLAG_CLONE || drba->drba_origin) {
276                         dsl_dataset_rele_flags(ds, dsflags, FTAG);
277                         return (SET_ERROR(EINVAL));
278                 }
279
280                 error = recv_begin_check_existing_impl(drba, ds, fromguid,
281                     featureflags);
282                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
283         } else if (error == ENOENT) {
284                 /* target fs does not exist; must be a full backup or clone */
285                 char buf[ZFS_MAX_DATASET_NAME_LEN];
286
287                 /*
288                  * If it's a non-clone incremental, we are missing the
289                  * target fs, so fail the recv.
290                  */
291                 if (fromguid != 0 && !(flags & DRR_FLAG_CLONE ||
292                     drba->drba_origin))
293                         return (SET_ERROR(ENOENT));
294
295                 /*
296                  * If we're receiving a full send as a clone, and it doesn't
297                  * contain all the necessary free records and freeobject
298                  * records, reject it.
299                  */
300                 if (fromguid == 0 && drba->drba_origin &&
301                     !(flags & DRR_FLAG_FREERECORDS))
302                         return (SET_ERROR(EINVAL));
303
304                 /* Open the parent of tofs */
305                 ASSERT3U(strlen(tofs), <, sizeof (buf));
306                 (void) strlcpy(buf, tofs, strrchr(tofs, '/') - tofs + 1);
307                 error = dsl_dataset_hold_flags(dp, buf, dsflags, FTAG, &ds);
308                 if (error != 0)
309                         return (error);
310
311                 if ((featureflags & DMU_BACKUP_FEATURE_RAW) == 0 &&
312                     drba->drba_origin == NULL) {
313                         boolean_t will_encrypt;
314
315                         /*
316                          * Check that we aren't breaking any encryption rules
317                          * and that we have all the parameters we need to
318                          * create an encrypted dataset if necessary. If we are
319                          * making an encrypted dataset the stream can't have
320                          * embedded data.
321                          */
322                         error = dmu_objset_create_crypt_check(ds->ds_dir,
323                             drba->drba_dcp, &will_encrypt);
324                         if (error != 0) {
325                                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
326                                 return (error);
327                         }
328
329                         if (will_encrypt &&
330                             (featureflags & DMU_BACKUP_FEATURE_EMBED_DATA)) {
331                                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
332                                 return (SET_ERROR(EINVAL));
333                         }
334                 }
335
336                 /*
337                  * Check filesystem and snapshot limits before receiving. We'll
338                  * recheck snapshot limits again at the end (we create the
339                  * filesystems and increment those counts during begin_sync).
340                  */
341                 error = dsl_fs_ss_limit_check(ds->ds_dir, 1,
342                     ZFS_PROP_FILESYSTEM_LIMIT, NULL, drba->drba_cred);
343                 if (error != 0) {
344                         dsl_dataset_rele_flags(ds, dsflags, FTAG);
345                         return (error);
346                 }
347
348                 error = dsl_fs_ss_limit_check(ds->ds_dir, 1,
349                     ZFS_PROP_SNAPSHOT_LIMIT, NULL, drba->drba_cred);
350                 if (error != 0) {
351                         dsl_dataset_rele_flags(ds, dsflags, FTAG);
352                         return (error);
353                 }
354
355                 if (drba->drba_origin != NULL) {
356                         dsl_dataset_t *origin;
357
358                         error = dsl_dataset_hold_flags(dp, drba->drba_origin,
359                             dsflags, FTAG, &origin);
360                         if (error != 0) {
361                                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
362                                 return (error);
363                         }
364                         if (!origin->ds_is_snapshot) {
365                                 dsl_dataset_rele_flags(origin, dsflags, FTAG);
366                                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
367                                 return (SET_ERROR(EINVAL));
368                         }
369                         if (dsl_dataset_phys(origin)->ds_guid != fromguid &&
370                             fromguid != 0) {
371                                 dsl_dataset_rele_flags(origin, dsflags, FTAG);
372                                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
373                                 return (SET_ERROR(ENODEV));
374                         }
375                         if (origin->ds_dir->dd_crypto_obj != 0 &&
376                             (featureflags & DMU_BACKUP_FEATURE_EMBED_DATA)) {
377                                 dsl_dataset_rele_flags(origin, dsflags, FTAG);
378                                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
379                                 return (SET_ERROR(EINVAL));
380                         }
381                         dsl_dataset_rele_flags(origin,
382                             dsflags, FTAG);
383                 }
384                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
385                 error = 0;
386         }
387         return (error);
388 }
389
390 static void
391 dmu_recv_begin_sync(void *arg, dmu_tx_t *tx)
392 {
393         dmu_recv_begin_arg_t *drba = arg;
394         dsl_pool_t *dp = dmu_tx_pool(tx);
395         objset_t *mos = dp->dp_meta_objset;
396         struct drr_begin *drrb = drba->drba_cookie->drc_drrb;
397         const char *tofs = drba->drba_cookie->drc_tofs;
398         uint64_t featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo);
399         dsl_dataset_t *ds, *newds;
400         objset_t *os;
401         uint64_t dsobj;
402         ds_hold_flags_t dsflags = 0;
403         int error;
404         uint64_t crflags = 0;
405         dsl_crypto_params_t dummy_dcp = { 0 };
406         dsl_crypto_params_t *dcp = drba->drba_dcp;
407
408         if (drrb->drr_flags & DRR_FLAG_CI_DATA)
409                 crflags |= DS_FLAG_CI_DATASET;
410
411         if ((featureflags & DMU_BACKUP_FEATURE_RAW) == 0)
412                 dsflags |= DS_HOLD_FLAG_DECRYPT;
413
414         /*
415          * Raw, non-incremental recvs always use a dummy dcp with
416          * the raw cmd set. Raw incremental recvs do not use a dcp
417          * since the encryption parameters are already set in stone.
418          */
419         if (dcp == NULL && drba->drba_snapobj == 0 &&
420             drba->drba_origin == NULL) {
421                 ASSERT3P(dcp, ==, NULL);
422                 dcp = &dummy_dcp;
423
424                 if (featureflags & DMU_BACKUP_FEATURE_RAW)
425                         dcp->cp_cmd = DCP_CMD_RAW_RECV;
426         }
427
428         error = dsl_dataset_hold_flags(dp, tofs, dsflags, FTAG, &ds);
429         if (error == 0) {
430                 /* create temporary clone */
431                 dsl_dataset_t *snap = NULL;
432
433                 if (drba->drba_snapobj != 0) {
434                         VERIFY0(dsl_dataset_hold_obj(dp,
435                             drba->drba_snapobj, FTAG, &snap));
436                         ASSERT3P(dcp, ==, NULL);
437                 }
438
439                 dsobj = dsl_dataset_create_sync(ds->ds_dir, recv_clone_name,
440                     snap, crflags, drba->drba_cred, dcp, tx);
441                 if (drba->drba_snapobj != 0)
442                         dsl_dataset_rele(snap, FTAG);
443                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
444         } else {
445                 dsl_dir_t *dd;
446                 const char *tail;
447                 dsl_dataset_t *origin = NULL;
448
449                 VERIFY0(dsl_dir_hold(dp, tofs, FTAG, &dd, &tail));
450
451                 if (drba->drba_origin != NULL) {
452                         VERIFY0(dsl_dataset_hold(dp, drba->drba_origin,
453                             FTAG, &origin));
454                         ASSERT3P(dcp, ==, NULL);
455                 }
456
457                 /* Create new dataset. */
458                 dsobj = dsl_dataset_create_sync(dd, strrchr(tofs, '/') + 1,
459                     origin, crflags, drba->drba_cred, dcp, tx);
460                 if (origin != NULL)
461                         dsl_dataset_rele(origin, FTAG);
462                 dsl_dir_rele(dd, FTAG);
463                 drba->drba_cookie->drc_newfs = B_TRUE;
464         }
465
466         VERIFY0(dsl_dataset_own_obj(dp, dsobj, dsflags, dmu_recv_tag, &newds));
467         VERIFY0(dmu_objset_from_ds(newds, &os));
468
469         if (drba->drba_cookie->drc_resumable) {
470                 dsl_dataset_zapify(newds, tx);
471                 if (drrb->drr_fromguid != 0) {
472                         VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_FROMGUID,
473                             8, 1, &drrb->drr_fromguid, tx));
474                 }
475                 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_TOGUID,
476                     8, 1, &drrb->drr_toguid, tx));
477                 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_TONAME,
478                     1, strlen(drrb->drr_toname) + 1, drrb->drr_toname, tx));
479                 uint64_t one = 1;
480                 uint64_t zero = 0;
481                 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_OBJECT,
482                     8, 1, &one, tx));
483                 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_OFFSET,
484                     8, 1, &zero, tx));
485                 VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_BYTES,
486                     8, 1, &zero, tx));
487                 if (featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) {
488                         VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_LARGEBLOCK,
489                             8, 1, &one, tx));
490                 }
491                 if (featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) {
492                         VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_EMBEDOK,
493                             8, 1, &one, tx));
494                 }
495                 if (featureflags & DMU_BACKUP_FEATURE_COMPRESSED) {
496                         VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_COMPRESSOK,
497                             8, 1, &one, tx));
498                 }
499                 if (featureflags & DMU_BACKUP_FEATURE_RAW) {
500                         VERIFY0(zap_add(mos, dsobj, DS_FIELD_RESUME_RAWOK,
501                             8, 1, &one, tx));
502                 }
503         }
504
505         /*
506          * Usually the os->os_encrypted value is tied to the presence of a
507          * DSL Crypto Key object in the dd. However, that will not be received
508          * until dmu_recv_stream(), so we set the value manually for now.
509          */
510         if (featureflags & DMU_BACKUP_FEATURE_RAW) {
511                 os->os_encrypted = B_TRUE;
512                 drba->drba_cookie->drc_raw = B_TRUE;
513         }
514
515         dmu_buf_will_dirty(newds->ds_dbuf, tx);
516         dsl_dataset_phys(newds)->ds_flags |= DS_FLAG_INCONSISTENT;
517
518         /*
519          * If we actually created a non-clone, we need to create the objset
520          * in our new dataset. If this is a raw send we postpone this until
521          * dmu_recv_stream() so that we can allocate the metadnode with the
522          * properties from the DRR_BEGIN payload.
523          */
524         rrw_enter(&newds->ds_bp_rwlock, RW_READER, FTAG);
525         if (BP_IS_HOLE(dsl_dataset_get_blkptr(newds)) &&
526             (featureflags & DMU_BACKUP_FEATURE_RAW) == 0) {
527                 (void) dmu_objset_create_impl(dp->dp_spa,
528                     newds, dsl_dataset_get_blkptr(newds), drrb->drr_type, tx);
529         }
530         rrw_exit(&newds->ds_bp_rwlock, FTAG);
531
532         drba->drba_cookie->drc_ds = newds;
533
534         spa_history_log_internal_ds(newds, "receive", tx, "");
535 }
536
537 static int
538 dmu_recv_resume_begin_check(void *arg, dmu_tx_t *tx)
539 {
540         dmu_recv_begin_arg_t *drba = arg;
541         dsl_pool_t *dp = dmu_tx_pool(tx);
542         struct drr_begin *drrb = drba->drba_cookie->drc_drrb;
543         int error;
544         ds_hold_flags_t dsflags = 0;
545         uint64_t featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo);
546         dsl_dataset_t *ds;
547         const char *tofs = drba->drba_cookie->drc_tofs;
548
549         /* already checked */
550         ASSERT3U(drrb->drr_magic, ==, DMU_BACKUP_MAGIC);
551         ASSERT(featureflags & DMU_BACKUP_FEATURE_RESUMING);
552
553         if (DMU_GET_STREAM_HDRTYPE(drrb->drr_versioninfo) ==
554             DMU_COMPOUNDSTREAM ||
555             drrb->drr_type >= DMU_OST_NUMTYPES)
556                 return (SET_ERROR(EINVAL));
557
558         /* Verify pool version supports SA if SA_SPILL feature set */
559         if ((featureflags & DMU_BACKUP_FEATURE_SA_SPILL) &&
560             spa_version(dp->dp_spa) < SPA_VERSION_SA)
561                 return (SET_ERROR(ENOTSUP));
562
563         /*
564          * The receiving code doesn't know how to translate a WRITE_EMBEDDED
565          * record to a plain WRITE record, so the pool must have the
566          * EMBEDDED_DATA feature enabled if the stream has WRITE_EMBEDDED
567          * records.  Same with WRITE_EMBEDDED records that use LZ4 compression.
568          */
569         if ((featureflags & DMU_BACKUP_FEATURE_EMBED_DATA) &&
570             !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_EMBEDDED_DATA))
571                 return (SET_ERROR(ENOTSUP));
572         if ((featureflags & DMU_BACKUP_FEATURE_LZ4) &&
573             !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LZ4_COMPRESS))
574                 return (SET_ERROR(ENOTSUP));
575
576         /*
577          * The receiving code doesn't know how to translate large blocks
578          * to smaller ones, so the pool must have the LARGE_BLOCKS
579          * feature enabled if the stream has LARGE_BLOCKS. Same with
580          * large dnodes.
581          */
582         if ((featureflags & DMU_BACKUP_FEATURE_LARGE_BLOCKS) &&
583             !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LARGE_BLOCKS))
584                 return (SET_ERROR(ENOTSUP));
585         if ((featureflags & DMU_BACKUP_FEATURE_LARGE_DNODE) &&
586             !spa_feature_is_enabled(dp->dp_spa, SPA_FEATURE_LARGE_DNODE))
587                 return (SET_ERROR(ENOTSUP));
588
589         /* 6 extra bytes for /%recv */
590         char recvname[ZFS_MAX_DATASET_NAME_LEN + 6];
591         (void) snprintf(recvname, sizeof (recvname), "%s/%s",
592             tofs, recv_clone_name);
593
594         if ((featureflags & DMU_BACKUP_FEATURE_RAW) == 0)
595                 dsflags |= DS_HOLD_FLAG_DECRYPT;
596
597         if (dsl_dataset_hold_flags(dp, recvname, dsflags, FTAG, &ds) != 0) {
598                 /* %recv does not exist; continue in tofs */
599                 error = dsl_dataset_hold_flags(dp, tofs, dsflags, FTAG, &ds);
600                 if (error != 0)
601                         return (error);
602         }
603
604         /* check that ds is marked inconsistent */
605         if (!DS_IS_INCONSISTENT(ds)) {
606                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
607                 return (SET_ERROR(EINVAL));
608         }
609
610         /* check that there is resuming data, and that the toguid matches */
611         if (!dsl_dataset_is_zapified(ds)) {
612                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
613                 return (SET_ERROR(EINVAL));
614         }
615         uint64_t val;
616         error = zap_lookup(dp->dp_meta_objset, ds->ds_object,
617             DS_FIELD_RESUME_TOGUID, sizeof (val), 1, &val);
618         if (error != 0 || drrb->drr_toguid != val) {
619                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
620                 return (SET_ERROR(EINVAL));
621         }
622
623         /*
624          * Check if the receive is still running.  If so, it will be owned.
625          * Note that nothing else can own the dataset (e.g. after the receive
626          * fails) because it will be marked inconsistent.
627          */
628         if (dsl_dataset_has_owner(ds)) {
629                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
630                 return (SET_ERROR(EBUSY));
631         }
632
633         /* There should not be any snapshots of this fs yet. */
634         if (ds->ds_prev != NULL && ds->ds_prev->ds_dir == ds->ds_dir) {
635                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
636                 return (SET_ERROR(EINVAL));
637         }
638
639         /*
640          * Note: resume point will be checked when we process the first WRITE
641          * record.
642          */
643
644         /* check that the origin matches */
645         val = 0;
646         (void) zap_lookup(dp->dp_meta_objset, ds->ds_object,
647             DS_FIELD_RESUME_FROMGUID, sizeof (val), 1, &val);
648         if (drrb->drr_fromguid != val) {
649                 dsl_dataset_rele_flags(ds, dsflags, FTAG);
650                 return (SET_ERROR(EINVAL));
651         }
652
653         dsl_dataset_rele_flags(ds, dsflags, FTAG);
654         return (0);
655 }
656
657 static void
658 dmu_recv_resume_begin_sync(void *arg, dmu_tx_t *tx)
659 {
660         dmu_recv_begin_arg_t *drba = arg;
661         dsl_pool_t *dp = dmu_tx_pool(tx);
662         const char *tofs = drba->drba_cookie->drc_tofs;
663         struct drr_begin *drrb = drba->drba_cookie->drc_drrb;
664         uint64_t featureflags = DMU_GET_FEATUREFLAGS(drrb->drr_versioninfo);
665         dsl_dataset_t *ds;
666         objset_t *os;
667         ds_hold_flags_t dsflags = 0;
668         uint64_t dsobj;
669         /* 6 extra bytes for /%recv */
670         char recvname[ZFS_MAX_DATASET_NAME_LEN + 6];
671
672         (void) snprintf(recvname, sizeof (recvname), "%s/%s",
673             tofs, recv_clone_name);
674
675         if (featureflags & DMU_BACKUP_FEATURE_RAW) {
676                 drba->drba_cookie->drc_raw = B_TRUE;
677         } else {
678                 dsflags |= DS_HOLD_FLAG_DECRYPT;
679         }
680
681         if (dsl_dataset_hold_flags(dp, recvname, dsflags, FTAG, &ds) != 0) {
682                 /* %recv does not exist; continue in tofs */
683                 VERIFY0(dsl_dataset_hold_flags(dp, tofs, dsflags, FTAG, &ds));
684                 drba->drba_cookie->drc_newfs = B_TRUE;
685         }
686
687         /* clear the inconsistent flag so that we can own it */
688         ASSERT(DS_IS_INCONSISTENT(ds));
689         dmu_buf_will_dirty(ds->ds_dbuf, tx);
690         dsl_dataset_phys(ds)->ds_flags &= ~DS_FLAG_INCONSISTENT;
691         dsobj = ds->ds_object;
692         dsl_dataset_rele_flags(ds, dsflags, FTAG);
693
694         VERIFY0(dsl_dataset_own_obj(dp, dsobj, dsflags, dmu_recv_tag, &ds));
695         VERIFY0(dmu_objset_from_ds(ds, &os));
696
697         dmu_buf_will_dirty(ds->ds_dbuf, tx);
698         dsl_dataset_phys(ds)->ds_flags |= DS_FLAG_INCONSISTENT;
699
700         rrw_enter(&ds->ds_bp_rwlock, RW_READER, FTAG);
701         ASSERT(!BP_IS_HOLE(dsl_dataset_get_blkptr(ds)) ||
702             drba->drba_cookie->drc_raw);
703         rrw_exit(&ds->ds_bp_rwlock, FTAG);
704
705         drba->drba_cookie->drc_ds = ds;
706
707         spa_history_log_internal_ds(ds, "resume receive", tx, "");
708 }
709
710 /*
711  * NB: callers *MUST* call dmu_recv_stream() if dmu_recv_begin()
712  * succeeds; otherwise we will leak the holds on the datasets.
713  */
714 int
715 dmu_recv_begin(char *tofs, char *tosnap, dmu_replay_record_t *drr_begin,
716     boolean_t force, boolean_t resumable, nvlist_t *localprops,
717     nvlist_t *hidden_args, char *origin, dmu_recv_cookie_t *drc)
718 {
719         dmu_recv_begin_arg_t drba = { 0 };
720
721         bzero(drc, sizeof (dmu_recv_cookie_t));
722         drc->drc_drr_begin = drr_begin;
723         drc->drc_drrb = &drr_begin->drr_u.drr_begin;
724         drc->drc_tosnap = tosnap;
725         drc->drc_tofs = tofs;
726         drc->drc_force = force;
727         drc->drc_resumable = resumable;
728         drc->drc_cred = CRED();
729         drc->drc_clone = (origin != NULL);
730
731         if (drc->drc_drrb->drr_magic == BSWAP_64(DMU_BACKUP_MAGIC)) {
732                 drc->drc_byteswap = B_TRUE;
733                 (void) fletcher_4_incremental_byteswap(drr_begin,
734                     sizeof (dmu_replay_record_t), &drc->drc_cksum);
735                 byteswap_record(drr_begin);
736         } else if (drc->drc_drrb->drr_magic == DMU_BACKUP_MAGIC) {
737                 (void) fletcher_4_incremental_native(drr_begin,
738                     sizeof (dmu_replay_record_t), &drc->drc_cksum);
739         } else {
740                 return (SET_ERROR(EINVAL));
741         }
742
743         drba.drba_origin = origin;
744         drba.drba_cookie = drc;
745         drba.drba_cred = CRED();
746
747         if (DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo) &
748             DMU_BACKUP_FEATURE_RESUMING) {
749                 return (dsl_sync_task(tofs,
750                     dmu_recv_resume_begin_check, dmu_recv_resume_begin_sync,
751                     &drba, 5, ZFS_SPACE_CHECK_NORMAL));
752         } else  {
753                 int err;
754
755                 /*
756                  * For non-raw, non-incremental, non-resuming receives the
757                  * user can specify encryption parameters on the command line
758                  * with "zfs recv -o". For these receives we create a dcp and
759                  * pass it to the sync task. Creating the dcp will implicitly
760                  * remove the encryption params from the localprops nvlist,
761                  * which avoids errors when trying to set these normally
762                  * read-only properties. Any other kind of receive that
763                  * attempts to set these properties will fail as a result.
764                  */
765                 if ((DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo) &
766                     DMU_BACKUP_FEATURE_RAW) == 0 &&
767                     origin == NULL && drc->drc_drrb->drr_fromguid == 0) {
768                         err = dsl_crypto_params_create_nvlist(DCP_CMD_NONE,
769                             localprops, hidden_args, &drba.drba_dcp);
770                         if (err != 0)
771                                 return (err);
772                 }
773
774                 err = dsl_sync_task(tofs,
775                     dmu_recv_begin_check, dmu_recv_begin_sync,
776                     &drba, 5, ZFS_SPACE_CHECK_NORMAL);
777                 dsl_crypto_params_free(drba.drba_dcp, !!err);
778
779                 return (err);
780         }
781 }
782
783 struct receive_record_arg {
784         dmu_replay_record_t header;
785         void *payload; /* Pointer to a buffer containing the payload */
786         /*
787          * If the record is a write, pointer to the arc_buf_t containing the
788          * payload.
789          */
790         arc_buf_t *arc_buf;
791         int payload_size;
792         uint64_t bytes_read; /* bytes read from stream when record created */
793         boolean_t eos_marker; /* Marks the end of the stream */
794         bqueue_node_t node;
795 };
796
797 struct receive_writer_arg {
798         objset_t *os;
799         boolean_t byteswap;
800         bqueue_t q;
801
802         /*
803          * These three args are used to signal to the main thread that we're
804          * done.
805          */
806         kmutex_t mutex;
807         kcondvar_t cv;
808         boolean_t done;
809
810         int err;
811         /* A map from guid to dataset to help handle dedup'd streams. */
812         avl_tree_t *guid_to_ds_map;
813         boolean_t resumable;
814         boolean_t raw;
815         uint64_t last_object;
816         uint64_t last_offset;
817         uint64_t max_object; /* highest object ID referenced in stream */
818         uint64_t bytes_read; /* bytes read when current record created */
819
820         /* Encryption parameters for the last received DRR_OBJECT_RANGE */
821         boolean_t or_crypt_params_present;
822         uint64_t or_firstobj;
823         uint64_t or_numslots;
824         uint8_t or_salt[ZIO_DATA_SALT_LEN];
825         uint8_t or_iv[ZIO_DATA_IV_LEN];
826         uint8_t or_mac[ZIO_DATA_MAC_LEN];
827         boolean_t or_byteorder;
828 };
829
830 struct objlist {
831         list_t list; /* List of struct receive_objnode. */
832         /*
833          * Last object looked up. Used to assert that objects are being looked
834          * up in ascending order.
835          */
836         uint64_t last_lookup;
837 };
838
839 struct receive_objnode {
840         list_node_t node;
841         uint64_t object;
842 };
843
844 struct receive_arg  {
845         objset_t *os;
846         vnode_t *vp; /* The vnode to read the stream from */
847         uint64_t voff; /* The current offset in the stream */
848         uint64_t bytes_read;
849         /*
850          * A record that has had its payload read in, but hasn't yet been handed
851          * off to the worker thread.
852          */
853         struct receive_record_arg *rrd;
854         /* A record that has had its header read in, but not its payload. */
855         struct receive_record_arg *next_rrd;
856         zio_cksum_t cksum;
857         zio_cksum_t prev_cksum;
858         int err;
859         boolean_t byteswap;
860         boolean_t raw;
861         uint64_t featureflags;
862         /* Sorted list of objects not to issue prefetches for. */
863         struct objlist ignore_objlist;
864 };
865
866 typedef struct guid_map_entry {
867         uint64_t        guid;
868         boolean_t       raw;
869         dsl_dataset_t   *gme_ds;
870         avl_node_t      avlnode;
871 } guid_map_entry_t;
872
873 static int
874 guid_compare(const void *arg1, const void *arg2)
875 {
876         const guid_map_entry_t *gmep1 = (const guid_map_entry_t *)arg1;
877         const guid_map_entry_t *gmep2 = (const guid_map_entry_t *)arg2;
878
879         return (AVL_CMP(gmep1->guid, gmep2->guid));
880 }
881
882 static void
883 free_guid_map_onexit(void *arg)
884 {
885         avl_tree_t *ca = arg;
886         void *cookie = NULL;
887         guid_map_entry_t *gmep;
888
889         while ((gmep = avl_destroy_nodes(ca, &cookie)) != NULL) {
890                 ds_hold_flags_t dsflags = DS_HOLD_FLAG_DECRYPT;
891
892                 if (gmep->raw) {
893                         gmep->gme_ds->ds_objset->os_raw_receive = B_FALSE;
894                         dsflags &= ~DS_HOLD_FLAG_DECRYPT;
895                 }
896
897                 dsl_dataset_disown(gmep->gme_ds, dsflags, gmep);
898                 kmem_free(gmep, sizeof (guid_map_entry_t));
899         }
900         avl_destroy(ca);
901         kmem_free(ca, sizeof (avl_tree_t));
902 }
903
904 static int
905 receive_read(struct receive_arg *ra, int len, void *buf)
906 {
907         int done = 0;
908
909         /*
910          * The code doesn't rely on this (lengths being multiples of 8).  See
911          * comment in dump_bytes.
912          */
913         ASSERT(len % 8 == 0 ||
914             (ra->featureflags & DMU_BACKUP_FEATURE_RAW) != 0);
915
916         while (done < len) {
917                 ssize_t resid;
918
919                 ra->err = vn_rdwr(UIO_READ, ra->vp,
920                     (char *)buf + done, len - done,
921                     ra->voff, UIO_SYSSPACE, FAPPEND,
922                     RLIM64_INFINITY, CRED(), &resid);
923
924                 if (resid == len - done) {
925                         /*
926                          * Note: ECKSUM indicates that the receive
927                          * was interrupted and can potentially be resumed.
928                          */
929                         ra->err = SET_ERROR(ECKSUM);
930                 }
931                 ra->voff += len - done - resid;
932                 done = len - resid;
933                 if (ra->err != 0)
934                         return (ra->err);
935         }
936
937         ra->bytes_read += len;
938
939         ASSERT3U(done, ==, len);
940         return (0);
941 }
942
943 noinline static void
944 byteswap_record(dmu_replay_record_t *drr)
945 {
946 #define DO64(X) (drr->drr_u.X = BSWAP_64(drr->drr_u.X))
947 #define DO32(X) (drr->drr_u.X = BSWAP_32(drr->drr_u.X))
948         drr->drr_type = BSWAP_32(drr->drr_type);
949         drr->drr_payloadlen = BSWAP_32(drr->drr_payloadlen);
950
951         switch (drr->drr_type) {
952         case DRR_BEGIN:
953                 DO64(drr_begin.drr_magic);
954                 DO64(drr_begin.drr_versioninfo);
955                 DO64(drr_begin.drr_creation_time);
956                 DO32(drr_begin.drr_type);
957                 DO32(drr_begin.drr_flags);
958                 DO64(drr_begin.drr_toguid);
959                 DO64(drr_begin.drr_fromguid);
960                 break;
961         case DRR_OBJECT:
962                 DO64(drr_object.drr_object);
963                 DO32(drr_object.drr_type);
964                 DO32(drr_object.drr_bonustype);
965                 DO32(drr_object.drr_blksz);
966                 DO32(drr_object.drr_bonuslen);
967                 DO32(drr_object.drr_raw_bonuslen);
968                 DO64(drr_object.drr_toguid);
969                 DO64(drr_object.drr_maxblkid);
970                 break;
971         case DRR_FREEOBJECTS:
972                 DO64(drr_freeobjects.drr_firstobj);
973                 DO64(drr_freeobjects.drr_numobjs);
974                 DO64(drr_freeobjects.drr_toguid);
975                 break;
976         case DRR_WRITE:
977                 DO64(drr_write.drr_object);
978                 DO32(drr_write.drr_type);
979                 DO64(drr_write.drr_offset);
980                 DO64(drr_write.drr_logical_size);
981                 DO64(drr_write.drr_toguid);
982                 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_write.drr_key.ddk_cksum);
983                 DO64(drr_write.drr_key.ddk_prop);
984                 DO64(drr_write.drr_compressed_size);
985                 break;
986         case DRR_WRITE_BYREF:
987                 DO64(drr_write_byref.drr_object);
988                 DO64(drr_write_byref.drr_offset);
989                 DO64(drr_write_byref.drr_length);
990                 DO64(drr_write_byref.drr_toguid);
991                 DO64(drr_write_byref.drr_refguid);
992                 DO64(drr_write_byref.drr_refobject);
993                 DO64(drr_write_byref.drr_refoffset);
994                 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_write_byref.
995                     drr_key.ddk_cksum);
996                 DO64(drr_write_byref.drr_key.ddk_prop);
997                 break;
998         case DRR_WRITE_EMBEDDED:
999                 DO64(drr_write_embedded.drr_object);
1000                 DO64(drr_write_embedded.drr_offset);
1001                 DO64(drr_write_embedded.drr_length);
1002                 DO64(drr_write_embedded.drr_toguid);
1003                 DO32(drr_write_embedded.drr_lsize);
1004                 DO32(drr_write_embedded.drr_psize);
1005                 break;
1006         case DRR_FREE:
1007                 DO64(drr_free.drr_object);
1008                 DO64(drr_free.drr_offset);
1009                 DO64(drr_free.drr_length);
1010                 DO64(drr_free.drr_toguid);
1011                 break;
1012         case DRR_SPILL:
1013                 DO64(drr_spill.drr_object);
1014                 DO64(drr_spill.drr_length);
1015                 DO64(drr_spill.drr_toguid);
1016                 DO64(drr_spill.drr_compressed_size);
1017                 DO32(drr_spill.drr_type);
1018                 break;
1019         case DRR_OBJECT_RANGE:
1020                 DO64(drr_object_range.drr_firstobj);
1021                 DO64(drr_object_range.drr_numslots);
1022                 DO64(drr_object_range.drr_toguid);
1023                 break;
1024         case DRR_END:
1025                 DO64(drr_end.drr_toguid);
1026                 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_end.drr_checksum);
1027                 break;
1028         default:
1029                 break;
1030         }
1031
1032         if (drr->drr_type != DRR_BEGIN) {
1033                 ZIO_CHECKSUM_BSWAP(&drr->drr_u.drr_checksum.drr_checksum);
1034         }
1035
1036 #undef DO64
1037 #undef DO32
1038 }
1039
1040 static inline uint8_t
1041 deduce_nblkptr(dmu_object_type_t bonus_type, uint64_t bonus_size)
1042 {
1043         if (bonus_type == DMU_OT_SA) {
1044                 return (1);
1045         } else {
1046                 return (1 +
1047                     ((DN_OLD_MAX_BONUSLEN -
1048                     MIN(DN_OLD_MAX_BONUSLEN, bonus_size)) >> SPA_BLKPTRSHIFT));
1049         }
1050 }
1051
1052 static void
1053 save_resume_state(struct receive_writer_arg *rwa,
1054     uint64_t object, uint64_t offset, dmu_tx_t *tx)
1055 {
1056         int txgoff = dmu_tx_get_txg(tx) & TXG_MASK;
1057
1058         if (!rwa->resumable)
1059                 return;
1060
1061         /*
1062          * We use ds_resume_bytes[] != 0 to indicate that we need to
1063          * update this on disk, so it must not be 0.
1064          */
1065         ASSERT(rwa->bytes_read != 0);
1066
1067         /*
1068          * We only resume from write records, which have a valid
1069          * (non-meta-dnode) object number.
1070          */
1071         ASSERT(object != 0);
1072
1073         /*
1074          * For resuming to work correctly, we must receive records in order,
1075          * sorted by object,offset.  This is checked by the callers, but
1076          * assert it here for good measure.
1077          */
1078         ASSERT3U(object, >=, rwa->os->os_dsl_dataset->ds_resume_object[txgoff]);
1079         ASSERT(object != rwa->os->os_dsl_dataset->ds_resume_object[txgoff] ||
1080             offset >= rwa->os->os_dsl_dataset->ds_resume_offset[txgoff]);
1081         ASSERT3U(rwa->bytes_read, >=,
1082             rwa->os->os_dsl_dataset->ds_resume_bytes[txgoff]);
1083
1084         rwa->os->os_dsl_dataset->ds_resume_object[txgoff] = object;
1085         rwa->os->os_dsl_dataset->ds_resume_offset[txgoff] = offset;
1086         rwa->os->os_dsl_dataset->ds_resume_bytes[txgoff] = rwa->bytes_read;
1087 }
1088
1089 noinline static int
1090 receive_object(struct receive_writer_arg *rwa, struct drr_object *drro,
1091     void *data)
1092 {
1093         dmu_object_info_t doi;
1094         dmu_tx_t *tx;
1095         uint64_t object;
1096         int err;
1097         uint8_t dn_slots = drro->drr_dn_slots != 0 ?
1098             drro->drr_dn_slots : DNODE_MIN_SLOTS;
1099
1100         if (drro->drr_type == DMU_OT_NONE ||
1101             !DMU_OT_IS_VALID(drro->drr_type) ||
1102             !DMU_OT_IS_VALID(drro->drr_bonustype) ||
1103             drro->drr_checksumtype >= ZIO_CHECKSUM_FUNCTIONS ||
1104             drro->drr_compress >= ZIO_COMPRESS_FUNCTIONS ||
1105             P2PHASE(drro->drr_blksz, SPA_MINBLOCKSIZE) ||
1106             drro->drr_blksz < SPA_MINBLOCKSIZE ||
1107             drro->drr_blksz > spa_maxblocksize(dmu_objset_spa(rwa->os)) ||
1108             drro->drr_bonuslen >
1109             DN_BONUS_SIZE(spa_maxdnodesize(dmu_objset_spa(rwa->os))) ||
1110             dn_slots >
1111             (spa_maxdnodesize(dmu_objset_spa(rwa->os)) >> DNODE_SHIFT))  {
1112                 return (SET_ERROR(EINVAL));
1113         }
1114
1115         if (rwa->raw) {
1116                 /*
1117                  * We should have received a DRR_OBJECT_RANGE record
1118                  * containing this block and stored it in rwa.
1119                  */
1120                 if (drro->drr_object < rwa->or_firstobj ||
1121                     drro->drr_object >= rwa->or_firstobj + rwa->or_numslots ||
1122                     drro->drr_raw_bonuslen < drro->drr_bonuslen ||
1123                     drro->drr_indblkshift > SPA_MAXBLOCKSHIFT ||
1124                     drro->drr_nlevels > DN_MAX_LEVELS ||
1125                     drro->drr_nblkptr > DN_MAX_NBLKPTR ||
1126                     DN_SLOTS_TO_BONUSLEN(dn_slots) <
1127                     drro->drr_raw_bonuslen)
1128                         return (SET_ERROR(EINVAL));
1129         } else {
1130                 if (drro->drr_flags != 0 || drro->drr_raw_bonuslen != 0 ||
1131                     drro->drr_indblkshift != 0 || drro->drr_nlevels != 0 ||
1132                     drro->drr_nblkptr != 0)
1133                         return (SET_ERROR(EINVAL));
1134         }
1135
1136         err = dmu_object_info(rwa->os, drro->drr_object, &doi);
1137         if (err != 0 && err != ENOENT && err != EEXIST)
1138                 return (SET_ERROR(EINVAL));
1139
1140         if (drro->drr_object > rwa->max_object)
1141                 rwa->max_object = drro->drr_object;
1142
1143         /*
1144          * If we are losing blkptrs or changing the block size this must
1145          * be a new file instance.  We must clear out the previous file
1146          * contents before we can change this type of metadata in the dnode.
1147          * Raw receives will also check that the indirect structure of the
1148          * dnode hasn't changed.
1149          */
1150         if (err == 0) {
1151                 uint32_t indblksz = drro->drr_indblkshift ?
1152                     1ULL << drro->drr_indblkshift : 0;
1153                 int nblkptr = deduce_nblkptr(drro->drr_bonustype,
1154                     drro->drr_bonuslen);
1155
1156                 object = drro->drr_object;
1157
1158                 /* nblkptr will be bounded by the bonus size and type */
1159                 if (rwa->raw && nblkptr != drro->drr_nblkptr)
1160                         return (SET_ERROR(EINVAL));
1161
1162                 if (drro->drr_blksz != doi.doi_data_block_size ||
1163                     nblkptr < doi.doi_nblkptr ||
1164                     dn_slots != doi.doi_dnodesize >> DNODE_SHIFT ||
1165                     (rwa->raw &&
1166                     (indblksz != doi.doi_metadata_block_size ||
1167                     drro->drr_nlevels < doi.doi_indirection))) {
1168                         err = dmu_free_long_range(rwa->os,
1169                             drro->drr_object, 0, DMU_OBJECT_END);
1170                         if (err != 0)
1171                                 return (SET_ERROR(EINVAL));
1172                 }
1173
1174                 /*
1175                  * The dmu does not currently support decreasing nlevels
1176                  * on an object. For non-raw sends, this does not matter
1177                  * and the new object can just use the previous one's nlevels.
1178                  * For raw sends, however, the structure of the received dnode
1179                  * (including nlevels) must match that of the send side.
1180                  * Therefore, instead of using dmu_object_reclaim(), we must
1181                  * free the object completely and call dmu_object_claim_dnsize()
1182                  * instead.
1183                  */
1184                 if ((rwa->raw && drro->drr_nlevels < doi.doi_indirection) ||
1185                     dn_slots != doi.doi_dnodesize >> DNODE_SHIFT) {
1186                         err = dmu_free_long_object(rwa->os, drro->drr_object);
1187                         if (err != 0)
1188                                 return (SET_ERROR(EINVAL));
1189
1190                         txg_wait_synced(dmu_objset_pool(rwa->os), 0);
1191                         object = DMU_NEW_OBJECT;
1192                 }
1193         } else if (err == EEXIST) {
1194                 /*
1195                  * The object requested is currently an interior slot of a
1196                  * multi-slot dnode. This will be resolved when the next txg
1197                  * is synced out, since the send stream will have told us
1198                  * to free this slot when we freed the associated dnode
1199                  * earlier in the stream.
1200                  */
1201                 txg_wait_synced(dmu_objset_pool(rwa->os), 0);
1202                 object = drro->drr_object;
1203         } else {
1204                 /* object is free and we are about to allocate a new one */
1205                 object = DMU_NEW_OBJECT;
1206         }
1207
1208         /*
1209          * If this is a multi-slot dnode there is a chance that this
1210          * object will expand into a slot that is already used by
1211          * another object from the previous snapshot. We must free
1212          * these objects before we attempt to allocate the new dnode.
1213          */
1214         if (dn_slots > 1) {
1215                 boolean_t need_sync = B_FALSE;
1216
1217                 for (uint64_t slot = drro->drr_object + 1;
1218                     slot < drro->drr_object + dn_slots;
1219                     slot++) {
1220                         dmu_object_info_t slot_doi;
1221
1222                         err = dmu_object_info(rwa->os, slot, &slot_doi);
1223                         if (err == ENOENT || err == EEXIST)
1224                                 continue;
1225                         else if (err != 0)
1226                                 return (err);
1227
1228                         err = dmu_free_long_object(rwa->os, slot);
1229
1230                         if (err != 0)
1231                                 return (err);
1232
1233                         need_sync = B_TRUE;
1234                 }
1235
1236                 if (need_sync)
1237                         txg_wait_synced(dmu_objset_pool(rwa->os), 0);
1238         }
1239
1240         tx = dmu_tx_create(rwa->os);
1241         dmu_tx_hold_bonus(tx, object);
1242         dmu_tx_hold_write(tx, object, 0, 0);
1243         err = dmu_tx_assign(tx, TXG_WAIT);
1244         if (err != 0) {
1245                 dmu_tx_abort(tx);
1246                 return (err);
1247         }
1248
1249         if (object == DMU_NEW_OBJECT) {
1250                 /* currently free, want to be allocated */
1251                 err = dmu_object_claim_dnsize(rwa->os, drro->drr_object,
1252                     drro->drr_type, drro->drr_blksz,
1253                     drro->drr_bonustype, drro->drr_bonuslen,
1254                     dn_slots << DNODE_SHIFT, tx);
1255         } else if (drro->drr_type != doi.doi_type ||
1256             drro->drr_blksz != doi.doi_data_block_size ||
1257             drro->drr_bonustype != doi.doi_bonus_type ||
1258             drro->drr_bonuslen != doi.doi_bonus_size) {
1259                 /* currently allocated, but with different properties */
1260                 err = dmu_object_reclaim_dnsize(rwa->os, drro->drr_object,
1261                     drro->drr_type, drro->drr_blksz,
1262                     drro->drr_bonustype, drro->drr_bonuslen,
1263                     dn_slots << DNODE_SHIFT, tx);
1264         }
1265         if (err != 0) {
1266                 dmu_tx_commit(tx);
1267                 return (SET_ERROR(EINVAL));
1268         }
1269
1270         if (rwa->or_crypt_params_present) {
1271                 /*
1272                  * Set the crypt params for the buffer associated with this
1273                  * range of dnodes.  This causes the blkptr_t to have the
1274                  * same crypt params (byteorder, salt, iv, mac) as on the
1275                  * sending side.
1276                  *
1277                  * Since we are committing this tx now, it is possible for
1278                  * the dnode block to end up on-disk with the incorrect MAC,
1279                  * if subsequent objects in this block are received in a
1280                  * different txg.  However, since the dataset is marked as
1281                  * inconsistent, no code paths will do a non-raw read (or
1282                  * decrypt the block / verify the MAC). The receive code and
1283                  * scrub code can safely do raw reads and verify the
1284                  * checksum.  They don't need to verify the MAC.
1285                  */
1286                 dmu_buf_t *db = NULL;
1287                 uint64_t offset = rwa->or_firstobj * DNODE_MIN_SIZE;
1288
1289                 err = dmu_buf_hold_by_dnode(DMU_META_DNODE(rwa->os),
1290                     offset, FTAG, &db, DMU_READ_PREFETCH | DMU_READ_NO_DECRYPT);
1291                 if (err != 0) {
1292                         dmu_tx_commit(tx);
1293                         return (SET_ERROR(EINVAL));
1294                 }
1295
1296                 dmu_buf_set_crypt_params(db, rwa->or_byteorder,
1297                     rwa->or_salt, rwa->or_iv, rwa->or_mac, tx);
1298
1299                 dmu_buf_rele(db, FTAG);
1300
1301                 rwa->or_crypt_params_present = B_FALSE;
1302         }
1303
1304         dmu_object_set_checksum(rwa->os, drro->drr_object,
1305             drro->drr_checksumtype, tx);
1306         dmu_object_set_compress(rwa->os, drro->drr_object,
1307             drro->drr_compress, tx);
1308
1309         /* handle more restrictive dnode structuring for raw recvs */
1310         if (rwa->raw) {
1311                 /*
1312                  * Set the indirect block shift and nlevels. This will not fail
1313                  * because we ensured all of the blocks were free earlier if
1314                  * this is a new object.
1315                  */
1316                 VERIFY0(dmu_object_set_blocksize(rwa->os, drro->drr_object,
1317                     drro->drr_blksz, drro->drr_indblkshift, tx));
1318                 VERIFY0(dmu_object_set_nlevels(rwa->os, drro->drr_object,
1319                     drro->drr_nlevels, tx));
1320                 VERIFY0(dmu_object_set_maxblkid(rwa->os, drro->drr_object,
1321                     drro->drr_maxblkid, tx));
1322         }
1323
1324         if (data != NULL) {
1325                 dmu_buf_t *db;
1326                 dnode_t *dn;
1327                 uint32_t flags = DMU_READ_NO_PREFETCH;
1328
1329                 if (rwa->raw)
1330                         flags |= DMU_READ_NO_DECRYPT;
1331
1332                 VERIFY0(dnode_hold(rwa->os, drro->drr_object, FTAG, &dn));
1333                 VERIFY0(dmu_bonus_hold_by_dnode(dn, FTAG, &db, flags));
1334
1335                 dmu_buf_will_dirty(db, tx);
1336
1337                 ASSERT3U(db->db_size, >=, drro->drr_bonuslen);
1338                 bcopy(data, db->db_data, DRR_OBJECT_PAYLOAD_SIZE(drro));
1339
1340                 /*
1341                  * Raw bonus buffers have their byteorder determined by the
1342                  * DRR_OBJECT_RANGE record.
1343                  */
1344                 if (rwa->byteswap && !rwa->raw) {
1345                         dmu_object_byteswap_t byteswap =
1346                             DMU_OT_BYTESWAP(drro->drr_bonustype);
1347                         dmu_ot_byteswap[byteswap].ob_func(db->db_data,
1348                             DRR_OBJECT_PAYLOAD_SIZE(drro));
1349                 }
1350                 dmu_buf_rele(db, FTAG);
1351                 dnode_rele(dn, FTAG);
1352         }
1353         dmu_tx_commit(tx);
1354
1355         return (0);
1356 }
1357
1358 /* ARGSUSED */
1359 noinline static int
1360 receive_freeobjects(struct receive_writer_arg *rwa,
1361     struct drr_freeobjects *drrfo)
1362 {
1363         uint64_t obj;
1364         int next_err = 0;
1365
1366         if (drrfo->drr_firstobj + drrfo->drr_numobjs < drrfo->drr_firstobj)
1367                 return (SET_ERROR(EINVAL));
1368
1369         for (obj = drrfo->drr_firstobj == 0 ? 1 : drrfo->drr_firstobj;
1370             obj < drrfo->drr_firstobj + drrfo->drr_numobjs && next_err == 0;
1371             next_err = dmu_object_next(rwa->os, &obj, FALSE, 0)) {
1372                 dmu_object_info_t doi;
1373                 int err;
1374
1375                 err = dmu_object_info(rwa->os, obj, &doi);
1376                 if (err == ENOENT)
1377                         continue;
1378                 else if (err != 0)
1379                         return (err);
1380
1381                 err = dmu_free_long_object(rwa->os, obj);
1382
1383                 if (err != 0)
1384                         return (err);
1385
1386                 if (obj > rwa->max_object)
1387                         rwa->max_object = obj;
1388         }
1389         if (next_err != ESRCH)
1390                 return (next_err);
1391         return (0);
1392 }
1393
1394 noinline static int
1395 receive_write(struct receive_writer_arg *rwa, struct drr_write *drrw,
1396     arc_buf_t *abuf)
1397 {
1398         int err;
1399         dmu_tx_t *tx;
1400         dnode_t *dn;
1401
1402         if (drrw->drr_offset + drrw->drr_logical_size < drrw->drr_offset ||
1403             !DMU_OT_IS_VALID(drrw->drr_type))
1404                 return (SET_ERROR(EINVAL));
1405
1406         /*
1407          * For resuming to work, records must be in increasing order
1408          * by (object, offset).
1409          */
1410         if (drrw->drr_object < rwa->last_object ||
1411             (drrw->drr_object == rwa->last_object &&
1412             drrw->drr_offset < rwa->last_offset)) {
1413                 return (SET_ERROR(EINVAL));
1414         }
1415         rwa->last_object = drrw->drr_object;
1416         rwa->last_offset = drrw->drr_offset;
1417
1418         if (rwa->last_object > rwa->max_object)
1419                 rwa->max_object = rwa->last_object;
1420
1421         if (dmu_object_info(rwa->os, drrw->drr_object, NULL) != 0)
1422                 return (SET_ERROR(EINVAL));
1423
1424         tx = dmu_tx_create(rwa->os);
1425         dmu_tx_hold_write(tx, drrw->drr_object,
1426             drrw->drr_offset, drrw->drr_logical_size);
1427         err = dmu_tx_assign(tx, TXG_WAIT);
1428         if (err != 0) {
1429                 dmu_tx_abort(tx);
1430                 return (err);
1431         }
1432
1433         if (rwa->byteswap && !arc_is_encrypted(abuf) &&
1434             arc_get_compression(abuf) == ZIO_COMPRESS_OFF) {
1435                 dmu_object_byteswap_t byteswap =
1436                     DMU_OT_BYTESWAP(drrw->drr_type);
1437                 dmu_ot_byteswap[byteswap].ob_func(abuf->b_data,
1438                     DRR_WRITE_PAYLOAD_SIZE(drrw));
1439         }
1440
1441         VERIFY0(dnode_hold(rwa->os, drrw->drr_object, FTAG, &dn));
1442         dmu_assign_arcbuf_by_dnode(dn, drrw->drr_offset, abuf, tx);
1443         dnode_rele(dn, FTAG);
1444
1445         /*
1446          * Note: If the receive fails, we want the resume stream to start
1447          * with the same record that we last successfully received (as opposed
1448          * to the next record), so that we can verify that we are
1449          * resuming from the correct location.
1450          */
1451         save_resume_state(rwa, drrw->drr_object, drrw->drr_offset, tx);
1452         dmu_tx_commit(tx);
1453
1454         return (0);
1455 }
1456
1457 /*
1458  * Handle a DRR_WRITE_BYREF record.  This record is used in dedup'ed
1459  * streams to refer to a copy of the data that is already on the
1460  * system because it came in earlier in the stream.  This function
1461  * finds the earlier copy of the data, and uses that copy instead of
1462  * data from the stream to fulfill this write.
1463  */
1464 static int
1465 receive_write_byref(struct receive_writer_arg *rwa,
1466     struct drr_write_byref *drrwbr)
1467 {
1468         dmu_tx_t *tx;
1469         int err;
1470         guid_map_entry_t gmesrch;
1471         guid_map_entry_t *gmep;
1472         avl_index_t where;
1473         objset_t *ref_os = NULL;
1474         int flags = DMU_READ_PREFETCH;
1475         dmu_buf_t *dbp;
1476
1477         if (drrwbr->drr_offset + drrwbr->drr_length < drrwbr->drr_offset)
1478                 return (SET_ERROR(EINVAL));
1479
1480         /*
1481          * If the GUID of the referenced dataset is different from the
1482          * GUID of the target dataset, find the referenced dataset.
1483          */
1484         if (drrwbr->drr_toguid != drrwbr->drr_refguid) {
1485                 gmesrch.guid = drrwbr->drr_refguid;
1486                 if ((gmep = avl_find(rwa->guid_to_ds_map, &gmesrch,
1487                     &where)) == NULL) {
1488                         return (SET_ERROR(EINVAL));
1489                 }
1490                 if (dmu_objset_from_ds(gmep->gme_ds, &ref_os))
1491                         return (SET_ERROR(EINVAL));
1492         } else {
1493                 ref_os = rwa->os;
1494         }
1495
1496         if (drrwbr->drr_object > rwa->max_object)
1497                 rwa->max_object = drrwbr->drr_object;
1498
1499         if (rwa->raw)
1500                 flags |= DMU_READ_NO_DECRYPT;
1501
1502         /* may return either a regular db or an encrypted one */
1503         err = dmu_buf_hold(ref_os, drrwbr->drr_refobject,
1504             drrwbr->drr_refoffset, FTAG, &dbp, flags);
1505         if (err != 0)
1506                 return (err);
1507
1508         tx = dmu_tx_create(rwa->os);
1509
1510         dmu_tx_hold_write(tx, drrwbr->drr_object,
1511             drrwbr->drr_offset, drrwbr->drr_length);
1512         err = dmu_tx_assign(tx, TXG_WAIT);
1513         if (err != 0) {
1514                 dmu_tx_abort(tx);
1515                 return (err);
1516         }
1517
1518         if (rwa->raw) {
1519                 dmu_copy_from_buf(rwa->os, drrwbr->drr_object,
1520                     drrwbr->drr_offset, dbp, tx);
1521         } else {
1522                 dmu_write(rwa->os, drrwbr->drr_object,
1523                     drrwbr->drr_offset, drrwbr->drr_length, dbp->db_data, tx);
1524         }
1525         dmu_buf_rele(dbp, FTAG);
1526
1527         /* See comment in restore_write. */
1528         save_resume_state(rwa, drrwbr->drr_object, drrwbr->drr_offset, tx);
1529         dmu_tx_commit(tx);
1530         return (0);
1531 }
1532
1533 static int
1534 receive_write_embedded(struct receive_writer_arg *rwa,
1535     struct drr_write_embedded *drrwe, void *data)
1536 {
1537         dmu_tx_t *tx;
1538         int err;
1539
1540         if (drrwe->drr_offset + drrwe->drr_length < drrwe->drr_offset)
1541                 return (SET_ERROR(EINVAL));
1542
1543         if (drrwe->drr_psize > BPE_PAYLOAD_SIZE)
1544                 return (SET_ERROR(EINVAL));
1545
1546         if (drrwe->drr_etype >= NUM_BP_EMBEDDED_TYPES)
1547                 return (SET_ERROR(EINVAL));
1548         if (drrwe->drr_compression >= ZIO_COMPRESS_FUNCTIONS)
1549                 return (SET_ERROR(EINVAL));
1550         if (rwa->raw)
1551                 return (SET_ERROR(EINVAL));
1552
1553         if (drrwe->drr_object > rwa->max_object)
1554                 rwa->max_object = drrwe->drr_object;
1555
1556         tx = dmu_tx_create(rwa->os);
1557
1558         dmu_tx_hold_write(tx, drrwe->drr_object,
1559             drrwe->drr_offset, drrwe->drr_length);
1560         err = dmu_tx_assign(tx, TXG_WAIT);
1561         if (err != 0) {
1562                 dmu_tx_abort(tx);
1563                 return (err);
1564         }
1565
1566         dmu_write_embedded(rwa->os, drrwe->drr_object,
1567             drrwe->drr_offset, data, drrwe->drr_etype,
1568             drrwe->drr_compression, drrwe->drr_lsize, drrwe->drr_psize,
1569             rwa->byteswap ^ ZFS_HOST_BYTEORDER, tx);
1570
1571         /* See comment in restore_write. */
1572         save_resume_state(rwa, drrwe->drr_object, drrwe->drr_offset, tx);
1573         dmu_tx_commit(tx);
1574         return (0);
1575 }
1576
1577 static int
1578 receive_spill(struct receive_writer_arg *rwa, struct drr_spill *drrs,
1579     arc_buf_t *abuf)
1580 {
1581         dmu_tx_t *tx;
1582         dmu_buf_t *db, *db_spill;
1583         int err;
1584         uint32_t flags = 0;
1585
1586         if (drrs->drr_length < SPA_MINBLOCKSIZE ||
1587             drrs->drr_length > spa_maxblocksize(dmu_objset_spa(rwa->os)))
1588                 return (SET_ERROR(EINVAL));
1589
1590         if (rwa->raw) {
1591                 if (!DMU_OT_IS_VALID(drrs->drr_type) ||
1592                     drrs->drr_compressiontype >= ZIO_COMPRESS_FUNCTIONS ||
1593                     drrs->drr_compressed_size == 0)
1594                         return (SET_ERROR(EINVAL));
1595
1596                 flags |= DMU_READ_NO_DECRYPT;
1597         }
1598
1599         if (dmu_object_info(rwa->os, drrs->drr_object, NULL) != 0)
1600                 return (SET_ERROR(EINVAL));
1601
1602         if (drrs->drr_object > rwa->max_object)
1603                 rwa->max_object = drrs->drr_object;
1604
1605         VERIFY0(dmu_bonus_hold(rwa->os, drrs->drr_object, FTAG, &db));
1606         if ((err = dmu_spill_hold_by_bonus(db, DMU_READ_NO_DECRYPT, FTAG,
1607             &db_spill)) != 0) {
1608                 dmu_buf_rele(db, FTAG);
1609                 return (err);
1610         }
1611
1612         tx = dmu_tx_create(rwa->os);
1613
1614         dmu_tx_hold_spill(tx, db->db_object);
1615
1616         err = dmu_tx_assign(tx, TXG_WAIT);
1617         if (err != 0) {
1618                 dmu_buf_rele(db, FTAG);
1619                 dmu_buf_rele(db_spill, FTAG);
1620                 dmu_tx_abort(tx);
1621                 return (err);
1622         }
1623
1624         if (db_spill->db_size < drrs->drr_length)
1625                 VERIFY(0 == dbuf_spill_set_blksz(db_spill,
1626                     drrs->drr_length, tx));
1627
1628         if (rwa->byteswap && !arc_is_encrypted(abuf) &&
1629             arc_get_compression(abuf) == ZIO_COMPRESS_OFF) {
1630                 dmu_object_byteswap_t byteswap =
1631                     DMU_OT_BYTESWAP(drrs->drr_type);
1632                 dmu_ot_byteswap[byteswap].ob_func(abuf->b_data,
1633                     DRR_SPILL_PAYLOAD_SIZE(drrs));
1634         }
1635
1636         dbuf_assign_arcbuf((dmu_buf_impl_t *)db_spill, abuf, tx);
1637
1638         dmu_buf_rele(db, FTAG);
1639         dmu_buf_rele(db_spill, FTAG);
1640
1641         dmu_tx_commit(tx);
1642         return (0);
1643 }
1644
1645 /* ARGSUSED */
1646 noinline static int
1647 receive_free(struct receive_writer_arg *rwa, struct drr_free *drrf)
1648 {
1649         int err;
1650
1651         if (drrf->drr_length != DMU_OBJECT_END &&
1652             drrf->drr_offset + drrf->drr_length < drrf->drr_offset)
1653                 return (SET_ERROR(EINVAL));
1654
1655         if (dmu_object_info(rwa->os, drrf->drr_object, NULL) != 0)
1656                 return (SET_ERROR(EINVAL));
1657
1658         if (drrf->drr_object > rwa->max_object)
1659                 rwa->max_object = drrf->drr_object;
1660
1661         err = dmu_free_long_range(rwa->os, drrf->drr_object,
1662             drrf->drr_offset, drrf->drr_length);
1663
1664         return (err);
1665 }
1666
1667 static int
1668 receive_object_range(struct receive_writer_arg *rwa,
1669     struct drr_object_range *drror)
1670 {
1671         /*
1672          * By default, we assume this block is in our native format
1673          * (ZFS_HOST_BYTEORDER). We then take into account whether
1674          * the send stream is byteswapped (rwa->byteswap). Finally,
1675          * we need to byteswap again if this particular block was
1676          * in non-native format on the send side.
1677          */
1678         boolean_t byteorder = ZFS_HOST_BYTEORDER ^ rwa->byteswap ^
1679             !!DRR_IS_RAW_BYTESWAPPED(drror->drr_flags);
1680
1681         /*
1682          * Since dnode block sizes are constant, we should not need to worry
1683          * about making sure that the dnode block size is the same on the
1684          * sending and receiving sides for the time being. For non-raw sends,
1685          * this does not matter (and in fact we do not send a DRR_OBJECT_RANGE
1686          * record at all). Raw sends require this record type because the
1687          * encryption parameters are used to protect an entire block of bonus
1688          * buffers. If the size of dnode blocks ever becomes variable,
1689          * handling will need to be added to ensure that dnode block sizes
1690          * match on the sending and receiving side.
1691          */
1692         if (drror->drr_numslots != DNODES_PER_BLOCK ||
1693             P2PHASE(drror->drr_firstobj, DNODES_PER_BLOCK) != 0 ||
1694             !rwa->raw)
1695                 return (SET_ERROR(EINVAL));
1696
1697         if (drror->drr_firstobj > rwa->max_object)
1698                 rwa->max_object = drror->drr_firstobj;
1699
1700         /*
1701          * The DRR_OBJECT_RANGE handling must be deferred to receive_object()
1702          * so that the block of dnodes is not written out when it's empty,
1703          * and converted to a HOLE BP.
1704          */
1705         rwa->or_crypt_params_present = B_TRUE;
1706         rwa->or_firstobj = drror->drr_firstobj;
1707         rwa->or_numslots = drror->drr_numslots;
1708         bcopy(drror->drr_salt, rwa->or_salt, ZIO_DATA_SALT_LEN);
1709         bcopy(drror->drr_iv, rwa->or_iv, ZIO_DATA_IV_LEN);
1710         bcopy(drror->drr_mac, rwa->or_mac, ZIO_DATA_MAC_LEN);
1711         rwa->or_byteorder = byteorder;
1712
1713         return (0);
1714 }
1715
1716 /* used to destroy the drc_ds on error */
1717 static void
1718 dmu_recv_cleanup_ds(dmu_recv_cookie_t *drc)
1719 {
1720         dsl_dataset_t *ds = drc->drc_ds;
1721         ds_hold_flags_t dsflags = (drc->drc_raw) ? 0 : DS_HOLD_FLAG_DECRYPT;
1722
1723         /*
1724          * Wait for the txg sync before cleaning up the receive. For
1725          * resumable receives, this ensures that our resume state has
1726          * been written out to disk. For raw receives, this ensures
1727          * that the user accounting code will not attempt to do anything
1728          * after we stopped receiving the dataset.
1729          */
1730         txg_wait_synced(ds->ds_dir->dd_pool, 0);
1731         ds->ds_objset->os_raw_receive = B_FALSE;
1732
1733         rrw_enter(&ds->ds_bp_rwlock, RW_READER, FTAG);
1734         if (drc->drc_resumable && !BP_IS_HOLE(dsl_dataset_get_blkptr(ds))) {
1735                 rrw_exit(&ds->ds_bp_rwlock, FTAG);
1736                 dsl_dataset_disown(ds, dsflags, dmu_recv_tag);
1737         } else {
1738                 char name[ZFS_MAX_DATASET_NAME_LEN];
1739                 rrw_exit(&ds->ds_bp_rwlock, FTAG);
1740                 dsl_dataset_name(ds, name);
1741                 dsl_dataset_disown(ds, dsflags, dmu_recv_tag);
1742                 (void) dsl_destroy_head(name);
1743         }
1744 }
1745
1746 static void
1747 receive_cksum(struct receive_arg *ra, int len, void *buf)
1748 {
1749         if (ra->byteswap) {
1750                 (void) fletcher_4_incremental_byteswap(buf, len, &ra->cksum);
1751         } else {
1752                 (void) fletcher_4_incremental_native(buf, len, &ra->cksum);
1753         }
1754 }
1755
1756 /*
1757  * Read the payload into a buffer of size len, and update the current record's
1758  * payload field.
1759  * Allocate ra->next_rrd and read the next record's header into
1760  * ra->next_rrd->header.
1761  * Verify checksum of payload and next record.
1762  */
1763 static int
1764 receive_read_payload_and_next_header(struct receive_arg *ra, int len, void *buf)
1765 {
1766         int err;
1767         zio_cksum_t cksum_orig;
1768         zio_cksum_t *cksump;
1769
1770         if (len != 0) {
1771                 ASSERT3U(len, <=, SPA_MAXBLOCKSIZE);
1772                 err = receive_read(ra, len, buf);
1773                 if (err != 0)
1774                         return (err);
1775                 receive_cksum(ra, len, buf);
1776
1777                 /* note: rrd is NULL when reading the begin record's payload */
1778                 if (ra->rrd != NULL) {
1779                         ra->rrd->payload = buf;
1780                         ra->rrd->payload_size = len;
1781                         ra->rrd->bytes_read = ra->bytes_read;
1782                 }
1783         }
1784
1785         ra->prev_cksum = ra->cksum;
1786
1787         ra->next_rrd = kmem_zalloc(sizeof (*ra->next_rrd), KM_SLEEP);
1788         err = receive_read(ra, sizeof (ra->next_rrd->header),
1789             &ra->next_rrd->header);
1790         ra->next_rrd->bytes_read = ra->bytes_read;
1791
1792         if (err != 0) {
1793                 kmem_free(ra->next_rrd, sizeof (*ra->next_rrd));
1794                 ra->next_rrd = NULL;
1795                 return (err);
1796         }
1797         if (ra->next_rrd->header.drr_type == DRR_BEGIN) {
1798                 kmem_free(ra->next_rrd, sizeof (*ra->next_rrd));
1799                 ra->next_rrd = NULL;
1800                 return (SET_ERROR(EINVAL));
1801         }
1802
1803         /*
1804          * Note: checksum is of everything up to but not including the
1805          * checksum itself.
1806          */
1807         ASSERT3U(offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
1808             ==, sizeof (dmu_replay_record_t) - sizeof (zio_cksum_t));
1809         receive_cksum(ra,
1810             offsetof(dmu_replay_record_t, drr_u.drr_checksum.drr_checksum),
1811             &ra->next_rrd->header);
1812
1813         cksum_orig = ra->next_rrd->header.drr_u.drr_checksum.drr_checksum;
1814         cksump = &ra->next_rrd->header.drr_u.drr_checksum.drr_checksum;
1815
1816         if (ra->byteswap)
1817                 byteswap_record(&ra->next_rrd->header);
1818
1819         if ((!ZIO_CHECKSUM_IS_ZERO(cksump)) &&
1820             !ZIO_CHECKSUM_EQUAL(ra->cksum, *cksump)) {
1821                 kmem_free(ra->next_rrd, sizeof (*ra->next_rrd));
1822                 ra->next_rrd = NULL;
1823                 return (SET_ERROR(ECKSUM));
1824         }
1825
1826         receive_cksum(ra, sizeof (cksum_orig), &cksum_orig);
1827
1828         return (0);
1829 }
1830
1831 static void
1832 objlist_create(struct objlist *list)
1833 {
1834         list_create(&list->list, sizeof (struct receive_objnode),
1835             offsetof(struct receive_objnode, node));
1836         list->last_lookup = 0;
1837 }
1838
1839 static void
1840 objlist_destroy(struct objlist *list)
1841 {
1842         for (struct receive_objnode *n = list_remove_head(&list->list);
1843             n != NULL; n = list_remove_head(&list->list)) {
1844                 kmem_free(n, sizeof (*n));
1845         }
1846         list_destroy(&list->list);
1847 }
1848
1849 /*
1850  * This function looks through the objlist to see if the specified object number
1851  * is contained in the objlist.  In the process, it will remove all object
1852  * numbers in the list that are smaller than the specified object number.  Thus,
1853  * any lookup of an object number smaller than a previously looked up object
1854  * number will always return false; therefore, all lookups should be done in
1855  * ascending order.
1856  */
1857 static boolean_t
1858 objlist_exists(struct objlist *list, uint64_t object)
1859 {
1860         struct receive_objnode *node = list_head(&list->list);
1861         ASSERT3U(object, >=, list->last_lookup);
1862         list->last_lookup = object;
1863         while (node != NULL && node->object < object) {
1864                 VERIFY3P(node, ==, list_remove_head(&list->list));
1865                 kmem_free(node, sizeof (*node));
1866                 node = list_head(&list->list);
1867         }
1868         return (node != NULL && node->object == object);
1869 }
1870
1871 /*
1872  * The objlist is a list of object numbers stored in ascending order.  However,
1873  * the insertion of new object numbers does not seek out the correct location to
1874  * store a new object number; instead, it appends it to the list for simplicity.
1875  * Thus, any users must take care to only insert new object numbers in ascending
1876  * order.
1877  */
1878 static void
1879 objlist_insert(struct objlist *list, uint64_t object)
1880 {
1881         struct receive_objnode *node = kmem_zalloc(sizeof (*node), KM_SLEEP);
1882         node->object = object;
1883 #ifdef ZFS_DEBUG
1884         {
1885         struct receive_objnode *last_object = list_tail(&list->list);
1886         uint64_t last_objnum = (last_object != NULL ? last_object->object : 0);
1887         ASSERT3U(node->object, >, last_objnum);
1888         }
1889 #endif
1890         list_insert_tail(&list->list, node);
1891 }
1892
1893 /*
1894  * Issue the prefetch reads for any necessary indirect blocks.
1895  *
1896  * We use the object ignore list to tell us whether or not to issue prefetches
1897  * for a given object.  We do this for both correctness (in case the blocksize
1898  * of an object has changed) and performance (if the object doesn't exist, don't
1899  * needlessly try to issue prefetches).  We also trim the list as we go through
1900  * the stream to prevent it from growing to an unbounded size.
1901  *
1902  * The object numbers within will always be in sorted order, and any write
1903  * records we see will also be in sorted order, but they're not sorted with
1904  * respect to each other (i.e. we can get several object records before
1905  * receiving each object's write records).  As a result, once we've reached a
1906  * given object number, we can safely remove any reference to lower object
1907  * numbers in the ignore list. In practice, we receive up to 32 object records
1908  * before receiving write records, so the list can have up to 32 nodes in it.
1909  */
1910 /* ARGSUSED */
1911 static void
1912 receive_read_prefetch(struct receive_arg *ra,
1913     uint64_t object, uint64_t offset, uint64_t length)
1914 {
1915         if (!objlist_exists(&ra->ignore_objlist, object)) {
1916                 dmu_prefetch(ra->os, object, 1, offset, length,
1917                     ZIO_PRIORITY_SYNC_READ);
1918         }
1919 }
1920
1921 /*
1922  * Read records off the stream, issuing any necessary prefetches.
1923  */
1924 static int
1925 receive_read_record(struct receive_arg *ra)
1926 {
1927         int err;
1928
1929         switch (ra->rrd->header.drr_type) {
1930         case DRR_OBJECT:
1931         {
1932                 struct drr_object *drro = &ra->rrd->header.drr_u.drr_object;
1933                 uint32_t size = DRR_OBJECT_PAYLOAD_SIZE(drro);
1934                 void *buf = kmem_zalloc(size, KM_SLEEP);
1935                 dmu_object_info_t doi;
1936
1937                 err = receive_read_payload_and_next_header(ra, size, buf);
1938                 if (err != 0) {
1939                         kmem_free(buf, size);
1940                         return (err);
1941                 }
1942                 err = dmu_object_info(ra->os, drro->drr_object, &doi);
1943                 /*
1944                  * See receive_read_prefetch for an explanation why we're
1945                  * storing this object in the ignore_obj_list.
1946                  */
1947                 if (err == ENOENT || err == EEXIST ||
1948                     (err == 0 && doi.doi_data_block_size != drro->drr_blksz)) {
1949                         objlist_insert(&ra->ignore_objlist, drro->drr_object);
1950                         err = 0;
1951                 }
1952                 return (err);
1953         }
1954         case DRR_FREEOBJECTS:
1955         {
1956                 err = receive_read_payload_and_next_header(ra, 0, NULL);
1957                 return (err);
1958         }
1959         case DRR_WRITE:
1960         {
1961                 struct drr_write *drrw = &ra->rrd->header.drr_u.drr_write;
1962                 arc_buf_t *abuf;
1963                 boolean_t is_meta = DMU_OT_IS_METADATA(drrw->drr_type);
1964
1965                 if (ra->raw) {
1966                         boolean_t byteorder = ZFS_HOST_BYTEORDER ^
1967                             !!DRR_IS_RAW_BYTESWAPPED(drrw->drr_flags) ^
1968                             ra->byteswap;
1969
1970                         abuf = arc_loan_raw_buf(dmu_objset_spa(ra->os),
1971                             drrw->drr_object, byteorder, drrw->drr_salt,
1972                             drrw->drr_iv, drrw->drr_mac, drrw->drr_type,
1973                             drrw->drr_compressed_size, drrw->drr_logical_size,
1974                             drrw->drr_compressiontype);
1975                 } else if (DRR_WRITE_COMPRESSED(drrw)) {
1976                         ASSERT3U(drrw->drr_compressed_size, >, 0);
1977                         ASSERT3U(drrw->drr_logical_size, >=,
1978                             drrw->drr_compressed_size);
1979                         ASSERT(!is_meta);
1980                         abuf = arc_loan_compressed_buf(
1981                             dmu_objset_spa(ra->os),
1982                             drrw->drr_compressed_size, drrw->drr_logical_size,
1983                             drrw->drr_compressiontype);
1984                 } else {
1985                         abuf = arc_loan_buf(dmu_objset_spa(ra->os),
1986                             is_meta, drrw->drr_logical_size);
1987                 }
1988
1989                 err = receive_read_payload_and_next_header(ra,
1990                     DRR_WRITE_PAYLOAD_SIZE(drrw), abuf->b_data);
1991                 if (err != 0) {
1992                         dmu_return_arcbuf(abuf);
1993                         return (err);
1994                 }
1995                 ra->rrd->arc_buf = abuf;
1996                 receive_read_prefetch(ra, drrw->drr_object, drrw->drr_offset,
1997                     drrw->drr_logical_size);
1998                 return (err);
1999         }
2000         case DRR_WRITE_BYREF:
2001         {
2002                 struct drr_write_byref *drrwb =
2003                     &ra->rrd->header.drr_u.drr_write_byref;
2004                 err = receive_read_payload_and_next_header(ra, 0, NULL);
2005                 receive_read_prefetch(ra, drrwb->drr_object, drrwb->drr_offset,
2006                     drrwb->drr_length);
2007                 return (err);
2008         }
2009         case DRR_WRITE_EMBEDDED:
2010         {
2011                 struct drr_write_embedded *drrwe =
2012                     &ra->rrd->header.drr_u.drr_write_embedded;
2013                 uint32_t size = P2ROUNDUP(drrwe->drr_psize, 8);
2014                 void *buf = kmem_zalloc(size, KM_SLEEP);
2015
2016                 err = receive_read_payload_and_next_header(ra, size, buf);
2017                 if (err != 0) {
2018                         kmem_free(buf, size);
2019                         return (err);
2020                 }
2021
2022                 receive_read_prefetch(ra, drrwe->drr_object, drrwe->drr_offset,
2023                     drrwe->drr_length);
2024                 return (err);
2025         }
2026         case DRR_FREE:
2027         {
2028                 /*
2029                  * It might be beneficial to prefetch indirect blocks here, but
2030                  * we don't really have the data to decide for sure.
2031                  */
2032                 err = receive_read_payload_and_next_header(ra, 0, NULL);
2033                 return (err);
2034         }
2035         case DRR_END:
2036         {
2037                 struct drr_end *drre = &ra->rrd->header.drr_u.drr_end;
2038                 if (!ZIO_CHECKSUM_EQUAL(ra->prev_cksum, drre->drr_checksum))
2039                         return (SET_ERROR(ECKSUM));
2040                 return (0);
2041         }
2042         case DRR_SPILL:
2043         {
2044                 struct drr_spill *drrs = &ra->rrd->header.drr_u.drr_spill;
2045                 arc_buf_t *abuf;
2046                 int len = DRR_SPILL_PAYLOAD_SIZE(drrs);
2047
2048                 /* DRR_SPILL records are either raw or uncompressed */
2049                 if (ra->raw) {
2050                         boolean_t byteorder = ZFS_HOST_BYTEORDER ^
2051                             !!DRR_IS_RAW_BYTESWAPPED(drrs->drr_flags) ^
2052                             ra->byteswap;
2053
2054                         abuf = arc_loan_raw_buf(dmu_objset_spa(ra->os),
2055                             dmu_objset_id(ra->os), byteorder, drrs->drr_salt,
2056                             drrs->drr_iv, drrs->drr_mac, drrs->drr_type,
2057                             drrs->drr_compressed_size, drrs->drr_length,
2058                             drrs->drr_compressiontype);
2059                 } else {
2060                         abuf = arc_loan_buf(dmu_objset_spa(ra->os),
2061                             DMU_OT_IS_METADATA(drrs->drr_type),
2062                             drrs->drr_length);
2063                 }
2064
2065                 err = receive_read_payload_and_next_header(ra, len,
2066                     abuf->b_data);
2067                 if (err != 0) {
2068                         dmu_return_arcbuf(abuf);
2069                         return (err);
2070                 }
2071                 ra->rrd->arc_buf = abuf;
2072                 return (err);
2073         }
2074         case DRR_OBJECT_RANGE:
2075         {
2076                 err = receive_read_payload_and_next_header(ra, 0, NULL);
2077                 return (err);
2078         }
2079         default:
2080                 return (SET_ERROR(EINVAL));
2081         }
2082 }
2083
2084 static void
2085 dprintf_drr(struct receive_record_arg *rrd, int err)
2086 {
2087 #ifdef ZFS_DEBUG
2088         switch (rrd->header.drr_type) {
2089         case DRR_OBJECT:
2090         {
2091                 struct drr_object *drro = &rrd->header.drr_u.drr_object;
2092                 dprintf("drr_type = OBJECT obj = %llu type = %u "
2093                     "bonustype = %u blksz = %u bonuslen = %u cksumtype = %u "
2094                     "compress = %u dn_slots = %u err = %d\n",
2095                     drro->drr_object, drro->drr_type,  drro->drr_bonustype,
2096                     drro->drr_blksz, drro->drr_bonuslen,
2097                     drro->drr_checksumtype, drro->drr_compress,
2098                     drro->drr_dn_slots, err);
2099                 break;
2100         }
2101         case DRR_FREEOBJECTS:
2102         {
2103                 struct drr_freeobjects *drrfo =
2104                     &rrd->header.drr_u.drr_freeobjects;
2105                 dprintf("drr_type = FREEOBJECTS firstobj = %llu "
2106                     "numobjs = %llu err = %d\n",
2107                     drrfo->drr_firstobj, drrfo->drr_numobjs, err);
2108                 break;
2109         }
2110         case DRR_WRITE:
2111         {
2112                 struct drr_write *drrw = &rrd->header.drr_u.drr_write;
2113                 dprintf("drr_type = WRITE obj = %llu type = %u offset = %llu "
2114                     "lsize = %llu cksumtype = %u cksumflags = %u "
2115                     "compress = %u psize = %llu err = %d\n",
2116                     drrw->drr_object, drrw->drr_type, drrw->drr_offset,
2117                     drrw->drr_logical_size, drrw->drr_checksumtype,
2118                     drrw->drr_flags, drrw->drr_compressiontype,
2119                     drrw->drr_compressed_size, err);
2120                 break;
2121         }
2122         case DRR_WRITE_BYREF:
2123         {
2124                 struct drr_write_byref *drrwbr =
2125                     &rrd->header.drr_u.drr_write_byref;
2126                 dprintf("drr_type = WRITE_BYREF obj = %llu offset = %llu "
2127                     "length = %llu toguid = %llx refguid = %llx "
2128                     "refobject = %llu refoffset = %llu cksumtype = %u "
2129                     "cksumflags = %u err = %d\n",
2130                     drrwbr->drr_object, drrwbr->drr_offset,
2131                     drrwbr->drr_length, drrwbr->drr_toguid,
2132                     drrwbr->drr_refguid, drrwbr->drr_refobject,
2133                     drrwbr->drr_refoffset, drrwbr->drr_checksumtype,
2134                     drrwbr->drr_flags, err);
2135                 break;
2136         }
2137         case DRR_WRITE_EMBEDDED:
2138         {
2139                 struct drr_write_embedded *drrwe =
2140                     &rrd->header.drr_u.drr_write_embedded;
2141                 dprintf("drr_type = WRITE_EMBEDDED obj = %llu offset = %llu "
2142                     "length = %llu compress = %u etype = %u lsize = %u "
2143                     "psize = %u err = %d\n",
2144                     drrwe->drr_object, drrwe->drr_offset, drrwe->drr_length,
2145                     drrwe->drr_compression, drrwe->drr_etype,
2146                     drrwe->drr_lsize, drrwe->drr_psize, err);
2147                 break;
2148         }
2149         case DRR_FREE:
2150         {
2151                 struct drr_free *drrf = &rrd->header.drr_u.drr_free;
2152                 dprintf("drr_type = FREE obj = %llu offset = %llu "
2153                     "length = %lld err = %d\n",
2154                     drrf->drr_object, drrf->drr_offset, drrf->drr_length,
2155                     err);
2156                 break;
2157         }
2158         case DRR_SPILL:
2159         {
2160                 struct drr_spill *drrs = &rrd->header.drr_u.drr_spill;
2161                 dprintf("drr_type = SPILL obj = %llu length = %llu "
2162                     "err = %d\n", drrs->drr_object, drrs->drr_length, err);
2163                 break;
2164         }
2165         default:
2166                 return;
2167         }
2168 #endif
2169 }
2170
2171 /*
2172  * Commit the records to the pool.
2173  */
2174 static int
2175 receive_process_record(struct receive_writer_arg *rwa,
2176     struct receive_record_arg *rrd)
2177 {
2178         int err;
2179
2180         /* Processing in order, therefore bytes_read should be increasing. */
2181         ASSERT3U(rrd->bytes_read, >=, rwa->bytes_read);
2182         rwa->bytes_read = rrd->bytes_read;
2183
2184         switch (rrd->header.drr_type) {
2185         case DRR_OBJECT:
2186         {
2187                 struct drr_object *drro = &rrd->header.drr_u.drr_object;
2188                 err = receive_object(rwa, drro, rrd->payload);
2189                 kmem_free(rrd->payload, rrd->payload_size);
2190                 rrd->payload = NULL;
2191                 break;
2192         }
2193         case DRR_FREEOBJECTS:
2194         {
2195                 struct drr_freeobjects *drrfo =
2196                     &rrd->header.drr_u.drr_freeobjects;
2197                 err = receive_freeobjects(rwa, drrfo);
2198                 break;
2199         }
2200         case DRR_WRITE:
2201         {
2202                 struct drr_write *drrw = &rrd->header.drr_u.drr_write;
2203                 err = receive_write(rwa, drrw, rrd->arc_buf);
2204                 /* if receive_write() is successful, it consumes the arc_buf */
2205                 if (err != 0)
2206                         dmu_return_arcbuf(rrd->arc_buf);
2207                 rrd->arc_buf = NULL;
2208                 rrd->payload = NULL;
2209                 break;
2210         }
2211         case DRR_WRITE_BYREF:
2212         {
2213                 struct drr_write_byref *drrwbr =
2214                     &rrd->header.drr_u.drr_write_byref;
2215                 err = receive_write_byref(rwa, drrwbr);
2216                 break;
2217         }
2218         case DRR_WRITE_EMBEDDED:
2219         {
2220                 struct drr_write_embedded *drrwe =
2221                     &rrd->header.drr_u.drr_write_embedded;
2222                 err = receive_write_embedded(rwa, drrwe, rrd->payload);
2223                 kmem_free(rrd->payload, rrd->payload_size);
2224                 rrd->payload = NULL;
2225                 break;
2226         }
2227         case DRR_FREE:
2228         {
2229                 struct drr_free *drrf = &rrd->header.drr_u.drr_free;
2230                 err = receive_free(rwa, drrf);
2231                 break;
2232         }
2233         case DRR_SPILL:
2234         {
2235                 struct drr_spill *drrs = &rrd->header.drr_u.drr_spill;
2236                 err = receive_spill(rwa, drrs, rrd->arc_buf);
2237                 /* if receive_spill() is successful, it consumes the arc_buf */
2238                 if (err != 0)
2239                         dmu_return_arcbuf(rrd->arc_buf);
2240                 rrd->arc_buf = NULL;
2241                 rrd->payload = NULL;
2242                 break;
2243         }
2244         case DRR_OBJECT_RANGE:
2245         {
2246                 struct drr_object_range *drror =
2247                     &rrd->header.drr_u.drr_object_range;
2248                 return (receive_object_range(rwa, drror));
2249         }
2250         default:
2251                 return (SET_ERROR(EINVAL));
2252         }
2253
2254         if (err != 0)
2255                 dprintf_drr(rrd, err);
2256
2257         return (err);
2258 }
2259
2260 /*
2261  * dmu_recv_stream's worker thread; pull records off the queue, and then call
2262  * receive_process_record  When we're done, signal the main thread and exit.
2263  */
2264 static void
2265 receive_writer_thread(void *arg)
2266 {
2267         struct receive_writer_arg *rwa = arg;
2268         struct receive_record_arg *rrd;
2269         fstrans_cookie_t cookie = spl_fstrans_mark();
2270
2271         for (rrd = bqueue_dequeue(&rwa->q); !rrd->eos_marker;
2272             rrd = bqueue_dequeue(&rwa->q)) {
2273                 /*
2274                  * If there's an error, the main thread will stop putting things
2275                  * on the queue, but we need to clear everything in it before we
2276                  * can exit.
2277                  */
2278                 if (rwa->err == 0) {
2279                         rwa->err = receive_process_record(rwa, rrd);
2280                 } else if (rrd->arc_buf != NULL) {
2281                         dmu_return_arcbuf(rrd->arc_buf);
2282                         rrd->arc_buf = NULL;
2283                         rrd->payload = NULL;
2284                 } else if (rrd->payload != NULL) {
2285                         kmem_free(rrd->payload, rrd->payload_size);
2286                         rrd->payload = NULL;
2287                 }
2288                 kmem_free(rrd, sizeof (*rrd));
2289         }
2290         kmem_free(rrd, sizeof (*rrd));
2291         mutex_enter(&rwa->mutex);
2292         rwa->done = B_TRUE;
2293         cv_signal(&rwa->cv);
2294         mutex_exit(&rwa->mutex);
2295         spl_fstrans_unmark(cookie);
2296         thread_exit();
2297 }
2298
2299 static int
2300 resume_check(struct receive_arg *ra, nvlist_t *begin_nvl)
2301 {
2302         uint64_t val;
2303         objset_t *mos = dmu_objset_pool(ra->os)->dp_meta_objset;
2304         uint64_t dsobj = dmu_objset_id(ra->os);
2305         uint64_t resume_obj, resume_off;
2306
2307         if (nvlist_lookup_uint64(begin_nvl,
2308             "resume_object", &resume_obj) != 0 ||
2309             nvlist_lookup_uint64(begin_nvl,
2310             "resume_offset", &resume_off) != 0) {
2311                 return (SET_ERROR(EINVAL));
2312         }
2313         VERIFY0(zap_lookup(mos, dsobj,
2314             DS_FIELD_RESUME_OBJECT, sizeof (val), 1, &val));
2315         if (resume_obj != val)
2316                 return (SET_ERROR(EINVAL));
2317         VERIFY0(zap_lookup(mos, dsobj,
2318             DS_FIELD_RESUME_OFFSET, sizeof (val), 1, &val));
2319         if (resume_off != val)
2320                 return (SET_ERROR(EINVAL));
2321
2322         return (0);
2323 }
2324
2325 /*
2326  * Read in the stream's records, one by one, and apply them to the pool.  There
2327  * are two threads involved; the thread that calls this function will spin up a
2328  * worker thread, read the records off the stream one by one, and issue
2329  * prefetches for any necessary indirect blocks.  It will then push the records
2330  * onto an internal blocking queue.  The worker thread will pull the records off
2331  * the queue, and actually write the data into the DMU.  This way, the worker
2332  * thread doesn't have to wait for reads to complete, since everything it needs
2333  * (the indirect blocks) will be prefetched.
2334  *
2335  * NB: callers *must* call dmu_recv_end() if this succeeds.
2336  */
2337 int
2338 dmu_recv_stream(dmu_recv_cookie_t *drc, vnode_t *vp, offset_t *voffp,
2339     int cleanup_fd, uint64_t *action_handlep)
2340 {
2341         int err = 0;
2342         struct receive_arg *ra;
2343         struct receive_writer_arg *rwa;
2344         int featureflags;
2345         uint32_t payloadlen;
2346         void *payload;
2347         nvlist_t *begin_nvl = NULL;
2348
2349         ra = kmem_zalloc(sizeof (*ra), KM_SLEEP);
2350         rwa = kmem_zalloc(sizeof (*rwa), KM_SLEEP);
2351
2352         ra->byteswap = drc->drc_byteswap;
2353         ra->raw = drc->drc_raw;
2354         ra->cksum = drc->drc_cksum;
2355         ra->vp = vp;
2356         ra->voff = *voffp;
2357
2358         if (dsl_dataset_is_zapified(drc->drc_ds)) {
2359                 (void) zap_lookup(drc->drc_ds->ds_dir->dd_pool->dp_meta_objset,
2360                     drc->drc_ds->ds_object, DS_FIELD_RESUME_BYTES,
2361                     sizeof (ra->bytes_read), 1, &ra->bytes_read);
2362         }
2363
2364         objlist_create(&ra->ignore_objlist);
2365
2366         /* these were verified in dmu_recv_begin */
2367         ASSERT3U(DMU_GET_STREAM_HDRTYPE(drc->drc_drrb->drr_versioninfo), ==,
2368             DMU_SUBSTREAM);
2369         ASSERT3U(drc->drc_drrb->drr_type, <, DMU_OST_NUMTYPES);
2370
2371         /*
2372          * Open the objset we are modifying.
2373          */
2374         VERIFY0(dmu_objset_from_ds(drc->drc_ds, &ra->os));
2375
2376         ASSERT(dsl_dataset_phys(drc->drc_ds)->ds_flags & DS_FLAG_INCONSISTENT);
2377
2378         featureflags = DMU_GET_FEATUREFLAGS(drc->drc_drrb->drr_versioninfo);
2379         ra->featureflags = featureflags;
2380
2381         ASSERT0(ra->os->os_encrypted &&
2382             (featureflags & DMU_BACKUP_FEATURE_EMBED_DATA));
2383
2384         /* if this stream is dedup'ed, set up the avl tree for guid mapping */
2385         if (featureflags & DMU_BACKUP_FEATURE_DEDUP) {
2386                 minor_t minor;
2387
2388                 if (cleanup_fd == -1) {
2389                         err = SET_ERROR(EBADF);
2390                         goto out;
2391                 }
2392                 err = zfs_onexit_fd_hold(cleanup_fd, &minor);
2393                 if (err != 0) {
2394                         cleanup_fd = -1;
2395                         goto out;
2396                 }
2397
2398                 if (*action_handlep == 0) {
2399                         rwa->guid_to_ds_map =
2400                             kmem_alloc(sizeof (avl_tree_t), KM_SLEEP);
2401                         avl_create(rwa->guid_to_ds_map, guid_compare,
2402                             sizeof (guid_map_entry_t),
2403                             offsetof(guid_map_entry_t, avlnode));
2404                         err = zfs_onexit_add_cb(minor,
2405                             free_guid_map_onexit, rwa->guid_to_ds_map,
2406                             action_handlep);
2407                         if (err != 0)
2408                                 goto out;
2409                 } else {
2410                         err = zfs_onexit_cb_data(minor, *action_handlep,
2411                             (void **)&rwa->guid_to_ds_map);
2412                         if (err != 0)
2413                                 goto out;
2414                 }
2415
2416                 drc->drc_guid_to_ds_map = rwa->guid_to_ds_map;
2417         }
2418
2419         payloadlen = drc->drc_drr_begin->drr_payloadlen;
2420         payload = NULL;
2421         if (payloadlen != 0)
2422                 payload = kmem_alloc(payloadlen, KM_SLEEP);
2423
2424         err = receive_read_payload_and_next_header(ra, payloadlen, payload);
2425         if (err != 0) {
2426                 if (payloadlen != 0)
2427                         kmem_free(payload, payloadlen);
2428                 goto out;
2429         }
2430         if (payloadlen != 0) {
2431                 err = nvlist_unpack(payload, payloadlen, &begin_nvl, KM_SLEEP);
2432                 kmem_free(payload, payloadlen);
2433                 if (err != 0)
2434                         goto out;
2435         }
2436
2437         /* handle DSL encryption key payload */
2438         if (featureflags & DMU_BACKUP_FEATURE_RAW) {
2439                 nvlist_t *keynvl = NULL;
2440
2441                 ASSERT(ra->os->os_encrypted);
2442                 ASSERT(drc->drc_raw);
2443
2444                 err = nvlist_lookup_nvlist(begin_nvl, "crypt_keydata", &keynvl);
2445                 if (err != 0)
2446                         goto out;
2447
2448                 /*
2449                  * If this is a new dataset we set the key immediately.
2450                  * Otherwise we don't want to change the key until we
2451                  * are sure the rest of the receive succeeded so we stash
2452                  * the keynvl away until then.
2453                  */
2454                 err = dsl_crypto_recv_raw(spa_name(ra->os->os_spa),
2455                     drc->drc_ds->ds_object, drc->drc_drrb->drr_type,
2456                     keynvl, drc->drc_newfs);
2457                 if (err != 0)
2458                         goto out;
2459
2460                 if (!drc->drc_newfs)
2461                         drc->drc_keynvl = fnvlist_dup(keynvl);
2462         }
2463
2464         if (featureflags & DMU_BACKUP_FEATURE_RESUMING) {
2465                 err = resume_check(ra, begin_nvl);
2466                 if (err != 0)
2467                         goto out;
2468         }
2469
2470         (void) bqueue_init(&rwa->q,
2471             MAX(zfs_recv_queue_length, 2 * zfs_max_recordsize),
2472             offsetof(struct receive_record_arg, node));
2473         cv_init(&rwa->cv, NULL, CV_DEFAULT, NULL);
2474         mutex_init(&rwa->mutex, NULL, MUTEX_DEFAULT, NULL);
2475         rwa->os = ra->os;
2476         rwa->byteswap = drc->drc_byteswap;
2477         rwa->resumable = drc->drc_resumable;
2478         rwa->raw = drc->drc_raw;
2479         rwa->os->os_raw_receive = drc->drc_raw;
2480
2481         (void) thread_create(NULL, 0, receive_writer_thread, rwa, 0, curproc,
2482             TS_RUN, minclsyspri);
2483         /*
2484          * We're reading rwa->err without locks, which is safe since we are the
2485          * only reader, and the worker thread is the only writer.  It's ok if we
2486          * miss a write for an iteration or two of the loop, since the writer
2487          * thread will keep freeing records we send it until we send it an eos
2488          * marker.
2489          *
2490          * We can leave this loop in 3 ways:  First, if rwa->err is
2491          * non-zero.  In that case, the writer thread will free the rrd we just
2492          * pushed.  Second, if  we're interrupted; in that case, either it's the
2493          * first loop and ra->rrd was never allocated, or it's later and ra->rrd
2494          * has been handed off to the writer thread who will free it.  Finally,
2495          * if receive_read_record fails or we're at the end of the stream, then
2496          * we free ra->rrd and exit.
2497          */
2498         while (rwa->err == 0) {
2499                 if (issig(JUSTLOOKING) && issig(FORREAL)) {
2500                         err = SET_ERROR(EINTR);
2501                         break;
2502                 }
2503
2504                 ASSERT3P(ra->rrd, ==, NULL);
2505                 ra->rrd = ra->next_rrd;
2506                 ra->next_rrd = NULL;
2507                 /* Allocates and loads header into ra->next_rrd */
2508                 err = receive_read_record(ra);
2509
2510                 if (ra->rrd->header.drr_type == DRR_END || err != 0) {
2511                         kmem_free(ra->rrd, sizeof (*ra->rrd));
2512                         ra->rrd = NULL;
2513                         break;
2514                 }
2515
2516                 bqueue_enqueue(&rwa->q, ra->rrd,
2517                     sizeof (struct receive_record_arg) + ra->rrd->payload_size);
2518                 ra->rrd = NULL;
2519         }
2520         if (ra->next_rrd == NULL)
2521                 ra->next_rrd = kmem_zalloc(sizeof (*ra->next_rrd), KM_SLEEP);
2522         ra->next_rrd->eos_marker = B_TRUE;
2523         bqueue_enqueue(&rwa->q, ra->next_rrd, 1);
2524
2525         mutex_enter(&rwa->mutex);
2526         while (!rwa->done) {
2527                 cv_wait(&rwa->cv, &rwa->mutex);
2528         }
2529         mutex_exit(&rwa->mutex);
2530
2531         /*
2532          * If we are receiving a full stream as a clone, all object IDs which
2533          * are greater than the maximum ID referenced in the stream are
2534          * by definition unused and must be freed.
2535          */
2536         if (drc->drc_clone && drc->drc_drrb->drr_fromguid == 0) {
2537                 uint64_t obj = rwa->max_object + 1;
2538                 int free_err = 0;
2539                 int next_err = 0;
2540
2541                 while (next_err == 0) {
2542                         free_err = dmu_free_long_object(rwa->os, obj);
2543                         if (free_err != 0 && free_err != ENOENT)
2544                                 break;
2545
2546                         next_err = dmu_object_next(rwa->os, &obj, FALSE, 0);
2547                 }
2548
2549                 if (err == 0) {
2550                         if (free_err != 0 && free_err != ENOENT)
2551                                 err = free_err;
2552                         else if (next_err != ESRCH)
2553                                 err = next_err;
2554                 }
2555         }
2556
2557         cv_destroy(&rwa->cv);
2558         mutex_destroy(&rwa->mutex);
2559         bqueue_destroy(&rwa->q);
2560         if (err == 0)
2561                 err = rwa->err;
2562
2563 out:
2564         nvlist_free(begin_nvl);
2565         if ((featureflags & DMU_BACKUP_FEATURE_DEDUP) && (cleanup_fd != -1))
2566                 zfs_onexit_fd_rele(cleanup_fd);
2567
2568         if (err != 0) {
2569                 /*
2570                  * Clean up references. If receive is not resumable,
2571                  * destroy what we created, so we don't leave it in
2572                  * the inconsistent state.
2573                  */
2574                 dmu_recv_cleanup_ds(drc);
2575                 nvlist_free(drc->drc_keynvl);
2576         }
2577
2578         *voffp = ra->voff;
2579         objlist_destroy(&ra->ignore_objlist);
2580         kmem_free(ra, sizeof (*ra));
2581         kmem_free(rwa, sizeof (*rwa));
2582         return (err);
2583 }
2584
2585 static int
2586 dmu_recv_end_check(void *arg, dmu_tx_t *tx)
2587 {
2588         dmu_recv_cookie_t *drc = arg;
2589         dsl_pool_t *dp = dmu_tx_pool(tx);
2590         int error;
2591
2592         ASSERT3P(drc->drc_ds->ds_owner, ==, dmu_recv_tag);
2593
2594         if (!drc->drc_newfs) {
2595                 dsl_dataset_t *origin_head;
2596
2597                 error = dsl_dataset_hold(dp, drc->drc_tofs, FTAG, &origin_head);
2598                 if (error != 0)
2599                         return (error);
2600                 if (drc->drc_force) {
2601                         /*
2602                          * We will destroy any snapshots in tofs (i.e. before
2603                          * origin_head) that are after the origin (which is
2604                          * the snap before drc_ds, because drc_ds can not
2605                          * have any snaps of its own).
2606                          */
2607                         uint64_t obj;
2608
2609                         obj = dsl_dataset_phys(origin_head)->ds_prev_snap_obj;
2610                         while (obj !=
2611                             dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj) {
2612                                 dsl_dataset_t *snap;
2613                                 error = dsl_dataset_hold_obj(dp, obj, FTAG,
2614                                     &snap);
2615                                 if (error != 0)
2616                                         break;
2617                                 if (snap->ds_dir != origin_head->ds_dir)
2618                                         error = SET_ERROR(EINVAL);
2619                                 if (error == 0)  {
2620                                         error = dsl_destroy_snapshot_check_impl(
2621                                             snap, B_FALSE);
2622                                 }
2623                                 obj = dsl_dataset_phys(snap)->ds_prev_snap_obj;
2624                                 dsl_dataset_rele(snap, FTAG);
2625                                 if (error != 0)
2626                                         break;
2627                         }
2628                         if (error != 0) {
2629                                 dsl_dataset_rele(origin_head, FTAG);
2630                                 return (error);
2631                         }
2632                 }
2633                 if (drc->drc_keynvl != NULL) {
2634                         error = dsl_crypto_recv_raw_key_check(drc->drc_ds,
2635                             drc->drc_keynvl, tx);
2636                         if (error != 0) {
2637                                 dsl_dataset_rele(origin_head, FTAG);
2638                                 return (error);
2639                         }
2640                 }
2641
2642                 error = dsl_dataset_clone_swap_check_impl(drc->drc_ds,
2643                     origin_head, drc->drc_force, drc->drc_owner, tx);
2644                 if (error != 0) {
2645                         dsl_dataset_rele(origin_head, FTAG);
2646                         return (error);
2647                 }
2648                 error = dsl_dataset_snapshot_check_impl(origin_head,
2649                     drc->drc_tosnap, tx, B_TRUE, 1, drc->drc_cred);
2650                 dsl_dataset_rele(origin_head, FTAG);
2651                 if (error != 0)
2652                         return (error);
2653
2654                 error = dsl_destroy_head_check_impl(drc->drc_ds, 1);
2655         } else {
2656                 error = dsl_dataset_snapshot_check_impl(drc->drc_ds,
2657                     drc->drc_tosnap, tx, B_TRUE, 1, drc->drc_cred);
2658         }
2659         return (error);
2660 }
2661
2662 static void
2663 dmu_recv_end_sync(void *arg, dmu_tx_t *tx)
2664 {
2665         dmu_recv_cookie_t *drc = arg;
2666         dsl_pool_t *dp = dmu_tx_pool(tx);
2667         boolean_t encrypted = drc->drc_ds->ds_dir->dd_crypto_obj != 0;
2668
2669         spa_history_log_internal_ds(drc->drc_ds, "finish receiving",
2670             tx, "snap=%s", drc->drc_tosnap);
2671         drc->drc_ds->ds_objset->os_raw_receive = B_FALSE;
2672
2673         if (!drc->drc_newfs) {
2674                 dsl_dataset_t *origin_head;
2675
2676                 VERIFY0(dsl_dataset_hold(dp, drc->drc_tofs, FTAG,
2677                     &origin_head));
2678
2679                 if (drc->drc_force) {
2680                         /*
2681                          * Destroy any snapshots of drc_tofs (origin_head)
2682                          * after the origin (the snap before drc_ds).
2683                          */
2684                         uint64_t obj;
2685
2686                         obj = dsl_dataset_phys(origin_head)->ds_prev_snap_obj;
2687                         while (obj !=
2688                             dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj) {
2689                                 dsl_dataset_t *snap;
2690                                 VERIFY0(dsl_dataset_hold_obj(dp, obj, FTAG,
2691                                     &snap));
2692                                 ASSERT3P(snap->ds_dir, ==, origin_head->ds_dir);
2693                                 obj = dsl_dataset_phys(snap)->ds_prev_snap_obj;
2694                                 dsl_destroy_snapshot_sync_impl(snap,
2695                                     B_FALSE, tx);
2696                                 dsl_dataset_rele(snap, FTAG);
2697                         }
2698                 }
2699                 if (drc->drc_keynvl != NULL) {
2700                         dsl_crypto_recv_raw_key_sync(drc->drc_ds,
2701                             drc->drc_keynvl, tx);
2702                         nvlist_free(drc->drc_keynvl);
2703                         drc->drc_keynvl = NULL;
2704                 }
2705
2706                 VERIFY3P(drc->drc_ds->ds_prev, ==, origin_head->ds_prev);
2707
2708                 dsl_dataset_clone_swap_sync_impl(drc->drc_ds,
2709                     origin_head, tx);
2710                 dsl_dataset_snapshot_sync_impl(origin_head,
2711                     drc->drc_tosnap, tx);
2712
2713                 /* set snapshot's creation time and guid */
2714                 dmu_buf_will_dirty(origin_head->ds_prev->ds_dbuf, tx);
2715                 dsl_dataset_phys(origin_head->ds_prev)->ds_creation_time =
2716                     drc->drc_drrb->drr_creation_time;
2717                 dsl_dataset_phys(origin_head->ds_prev)->ds_guid =
2718                     drc->drc_drrb->drr_toguid;
2719                 dsl_dataset_phys(origin_head->ds_prev)->ds_flags &=
2720                     ~DS_FLAG_INCONSISTENT;
2721
2722                 dmu_buf_will_dirty(origin_head->ds_dbuf, tx);
2723                 dsl_dataset_phys(origin_head)->ds_flags &=
2724                     ~DS_FLAG_INCONSISTENT;
2725
2726                 drc->drc_newsnapobj =
2727                     dsl_dataset_phys(origin_head)->ds_prev_snap_obj;
2728
2729                 dsl_dataset_rele(origin_head, FTAG);
2730                 dsl_destroy_head_sync_impl(drc->drc_ds, tx);
2731
2732                 if (drc->drc_owner != NULL)
2733                         VERIFY3P(origin_head->ds_owner, ==, drc->drc_owner);
2734         } else {
2735                 dsl_dataset_t *ds = drc->drc_ds;
2736
2737                 dsl_dataset_snapshot_sync_impl(ds, drc->drc_tosnap, tx);
2738
2739                 /* set snapshot's creation time and guid */
2740                 dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
2741                 dsl_dataset_phys(ds->ds_prev)->ds_creation_time =
2742                     drc->drc_drrb->drr_creation_time;
2743                 dsl_dataset_phys(ds->ds_prev)->ds_guid =
2744                     drc->drc_drrb->drr_toguid;
2745                 dsl_dataset_phys(ds->ds_prev)->ds_flags &=
2746                     ~DS_FLAG_INCONSISTENT;
2747
2748                 dmu_buf_will_dirty(ds->ds_dbuf, tx);
2749                 dsl_dataset_phys(ds)->ds_flags &= ~DS_FLAG_INCONSISTENT;
2750                 if (dsl_dataset_has_resume_receive_state(ds)) {
2751                         (void) zap_remove(dp->dp_meta_objset, ds->ds_object,
2752                             DS_FIELD_RESUME_FROMGUID, tx);
2753                         (void) zap_remove(dp->dp_meta_objset, ds->ds_object,
2754                             DS_FIELD_RESUME_OBJECT, tx);
2755                         (void) zap_remove(dp->dp_meta_objset, ds->ds_object,
2756                             DS_FIELD_RESUME_OFFSET, tx);
2757                         (void) zap_remove(dp->dp_meta_objset, ds->ds_object,
2758                             DS_FIELD_RESUME_BYTES, tx);
2759                         (void) zap_remove(dp->dp_meta_objset, ds->ds_object,
2760                             DS_FIELD_RESUME_TOGUID, tx);
2761                         (void) zap_remove(dp->dp_meta_objset, ds->ds_object,
2762                             DS_FIELD_RESUME_TONAME, tx);
2763                 }
2764                 drc->drc_newsnapobj =
2765                     dsl_dataset_phys(drc->drc_ds)->ds_prev_snap_obj;
2766         }
2767         zvol_create_minors(dp->dp_spa, drc->drc_tofs, B_TRUE);
2768
2769         /*
2770          * Release the hold from dmu_recv_begin.  This must be done before
2771          * we return to open context, so that when we free the dataset's dnode
2772          * we can evict its bonus buffer. Since the dataset may be destroyed
2773          * at this point (and therefore won't have a valid pointer to the spa)
2774          * we release the key mapping manually here while we do have a valid
2775          * pointer, if it exists.
2776          */
2777         if (!drc->drc_raw && encrypted) {
2778                 (void) spa_keystore_remove_mapping(dmu_tx_pool(tx)->dp_spa,
2779                     drc->drc_ds->ds_object, drc->drc_ds);
2780         }
2781         dsl_dataset_disown(drc->drc_ds, 0, dmu_recv_tag);
2782         drc->drc_ds = NULL;
2783 }
2784
2785 static int
2786 add_ds_to_guidmap(const char *name, avl_tree_t *guid_map, uint64_t snapobj,
2787     boolean_t raw)
2788 {
2789         dsl_pool_t *dp;
2790         dsl_dataset_t *snapds;
2791         guid_map_entry_t *gmep;
2792         objset_t *os;
2793         ds_hold_flags_t dsflags = (raw) ? 0 : DS_HOLD_FLAG_DECRYPT;
2794         int err;
2795
2796         ASSERT(guid_map != NULL);
2797
2798         err = dsl_pool_hold(name, FTAG, &dp);
2799         if (err != 0)
2800                 return (err);
2801         gmep = kmem_alloc(sizeof (*gmep), KM_SLEEP);
2802         err = dsl_dataset_own_obj(dp, snapobj, dsflags, gmep, &snapds);
2803         if (err == 0) {
2804                 /*
2805                  * If this is a deduplicated raw send stream, we need
2806                  * to make sure that we can still read raw blocks from
2807                  * earlier datasets in the stream, so we set the
2808                  * os_raw_receive flag now.
2809                  */
2810                 if (raw) {
2811                         err = dmu_objset_from_ds(snapds, &os);
2812                         if (err != 0) {
2813                                 dsl_dataset_disown(snapds, dsflags, FTAG);
2814                                 dsl_pool_rele(dp, FTAG);
2815                                 kmem_free(gmep, sizeof (*gmep));
2816                                 return (err);
2817                         }
2818                         os->os_raw_receive = B_TRUE;
2819                 }
2820
2821                 gmep->raw = raw;
2822                 gmep->guid = dsl_dataset_phys(snapds)->ds_guid;
2823                 gmep->gme_ds = snapds;
2824                 avl_add(guid_map, gmep);
2825         } else {
2826                 kmem_free(gmep, sizeof (*gmep));
2827         }
2828
2829         dsl_pool_rele(dp, FTAG);
2830         return (err);
2831 }
2832
2833 static int dmu_recv_end_modified_blocks = 3;
2834
2835 static int
2836 dmu_recv_existing_end(dmu_recv_cookie_t *drc)
2837 {
2838 #ifdef _KERNEL
2839         /*
2840          * We will be destroying the ds; make sure its origin is unmounted if
2841          * necessary.
2842          */
2843         char name[ZFS_MAX_DATASET_NAME_LEN];
2844         dsl_dataset_name(drc->drc_ds, name);
2845         zfs_destroy_unmount_origin(name);
2846 #endif
2847
2848         return (dsl_sync_task(drc->drc_tofs,
2849             dmu_recv_end_check, dmu_recv_end_sync, drc,
2850             dmu_recv_end_modified_blocks, ZFS_SPACE_CHECK_NORMAL));
2851 }
2852
2853 static int
2854 dmu_recv_new_end(dmu_recv_cookie_t *drc)
2855 {
2856         return (dsl_sync_task(drc->drc_tofs,
2857             dmu_recv_end_check, dmu_recv_end_sync, drc,
2858             dmu_recv_end_modified_blocks, ZFS_SPACE_CHECK_NORMAL));
2859 }
2860
2861 int
2862 dmu_recv_end(dmu_recv_cookie_t *drc, void *owner)
2863 {
2864         int error;
2865
2866         drc->drc_owner = owner;
2867
2868         if (drc->drc_newfs)
2869                 error = dmu_recv_new_end(drc);
2870         else
2871                 error = dmu_recv_existing_end(drc);
2872
2873         if (error != 0) {
2874                 dmu_recv_cleanup_ds(drc);
2875                 nvlist_free(drc->drc_keynvl);
2876         } else if (drc->drc_guid_to_ds_map != NULL) {
2877                 (void) add_ds_to_guidmap(drc->drc_tofs, drc->drc_guid_to_ds_map,
2878                     drc->drc_newsnapobj, drc->drc_raw);
2879         }
2880         return (error);
2881 }
2882
2883 /*
2884  * Return TRUE if this objset is currently being received into.
2885  */
2886 boolean_t
2887 dmu_objset_is_receiving(objset_t *os)
2888 {
2889         return (os->os_dsl_dataset != NULL &&
2890             os->os_dsl_dataset->ds_owner == dmu_recv_tag);
2891 }
2892
2893 #if defined(_KERNEL)
2894 module_param(zfs_recv_queue_length, int, 0644);
2895 MODULE_PARM_DESC(zfs_recv_queue_length, "Maximum receive queue length");
2896 #endif