]> granicus.if.org Git - zfs/commitdiff
Batch free zpl_posix_acl_release
authorChunwei Chen <david.chen@osnexus.com>
Fri, 28 Oct 2016 20:37:00 +0000 (13:37 -0700)
committerBrian Behlendorf <behlendorf1@llnl.gov>
Mon, 7 Nov 2016 19:04:44 +0000 (11:04 -0800)
Currently every calls to zpl_posix_acl_release will schedule a delayed task,
and each delayed task will add a timer. This used to be fine except for
possibly bad performance impact.

However, in Linux 4.8, a new timer wheel implementation[1] is introduced. In
this new implementation, the larger the delay, the less accuracy the timer is.
So when we have a flood of timer from zpl_posix_acl_release, they will expire
at the same time. Couple with the fact that task_expire will do linear search
with lock held. This causes an extreme amount of contention inside interrupt
and would actually lockup the system.

We fix this by doing batch free to prevent a flood of delayed task. Every call
to zpl_posix_acl_release will put the posix_acl to be freed on a lockless
list. Every batch window, 1 sec, the zpl_posix_acl_free will fire up and free
every posix_acl that passed the grace period on the list. This way, we only
have one delayed task every second.

[1] https://lwn.net/Articles/646950/

Signed-off-by: Chunwei Chen <david.chen@osnexus.com>
include/linux/vfs_compat.h
module/zfs/zfs_vfsops.c
module/zfs/zpl_xattr.c

index 989c237a34e5c815cb59481e1b87ab1b4ba8ca1e..6ed5075a38c8b7519dbe26c9a6b04ddfd52b9a5d 100644 (file)
@@ -212,10 +212,7 @@ lseek_execute(
 
 #else
 
-static inline void
-zpl_posix_acl_free(void *arg) {
-       kfree(arg);
-}
+void zpl_posix_acl_release_impl(struct posix_acl *);
 
 static inline void
 zpl_posix_acl_release(struct posix_acl *acl)
@@ -223,10 +220,8 @@ zpl_posix_acl_release(struct posix_acl *acl)
        if ((acl == NULL) || (acl == ACL_NOT_CACHED))
                return;
 
-       if (atomic_dec_and_test(&acl->a_refcount)) {
-               taskq_dispatch_delay(system_taskq, zpl_posix_acl_free, acl,
-                   TQ_SLEEP, ddi_get_lbolt() + 60*HZ);
-       }
+       if (atomic_dec_and_test(&acl->a_refcount))
+               zpl_posix_acl_release_impl(acl);
 }
 
 static inline void
index eb73f3b600c7b729397ba90b9a130d1556756f90..5417f2422809a6d83e21807546043e8f277cd33e 100644 (file)
@@ -1919,7 +1919,10 @@ zfs_init(void)
 void
 zfs_fini(void)
 {
-       taskq_wait_outstanding(system_taskq, 0);
+       /*
+        * we don't use outstanding because zpl_posix_acl_free might add more.
+        */
+       taskq_wait(system_taskq);
        unregister_filesystem(&zpl_fs_type);
        zfs_znode_fini();
        zfsctl_fini();
index 471cb16c67ef8af06cac20c15ac0642d05566a66..f3a04f733e50c5242666380b5a524dec15034b12 100644 (file)
@@ -1441,3 +1441,103 @@ zpl_xattr_handler(const char *name)
 
        return (NULL);
 }
+
+#if !defined(HAVE_POSIX_ACL_RELEASE) || defined(HAVE_POSIX_ACL_RELEASE_GPL_ONLY)
+struct acl_rel_struct {
+       struct acl_rel_struct *next;
+       struct posix_acl *acl;
+       clock_t time;
+};
+
+#define        ACL_REL_GRACE   (60*HZ)
+#define        ACL_REL_WINDOW  (1*HZ)
+#define        ACL_REL_SCHED   (ACL_REL_GRACE+ACL_REL_WINDOW)
+
+/*
+ * Lockless multi-producer single-consumer fifo list.
+ * Nodes are added to tail and removed from head. Tail pointer is our
+ * synchronization point. It always points to the next pointer of the last
+ * node, or head if list is empty.
+ */
+static struct acl_rel_struct *acl_rel_head = NULL;
+static struct acl_rel_struct **acl_rel_tail = &acl_rel_head;
+
+static void
+zpl_posix_acl_free(void *arg)
+{
+       struct acl_rel_struct *freelist = NULL;
+       struct acl_rel_struct *a;
+       clock_t new_time;
+       boolean_t refire = B_FALSE;
+
+       ASSERT3P(acl_rel_head, !=, NULL);
+       while (acl_rel_head) {
+               a = acl_rel_head;
+               if (ddi_get_lbolt() - a->time >= ACL_REL_GRACE) {
+                       /*
+                        * If a is the last node we need to reset tail, but we
+                        * need to use cmpxchg to make sure it is still the
+                        * last node.
+                        */
+                       if (acl_rel_tail == &a->next) {
+                               acl_rel_head = NULL;
+                               if (cmpxchg(&acl_rel_tail, &a->next,
+                                   &acl_rel_head) == &a->next) {
+                                       ASSERT3P(a->next, ==, NULL);
+                                       a->next = freelist;
+                                       freelist = a;
+                                       break;
+                               }
+                       }
+                       /*
+                        * a is not last node, make sure next pointer is set
+                        * by the adder and advance the head.
+                        */
+                       while (ACCESS_ONCE(a->next) == NULL)
+                               cpu_relax();
+                       acl_rel_head = a->next;
+                       a->next = freelist;
+                       freelist = a;
+               } else {
+                       /*
+                        * a is still in grace period. We are responsible to
+                        * reschedule the free task, since adder will only do
+                        * so if list is empty.
+                        */
+                       new_time = a->time + ACL_REL_SCHED;
+                       refire = B_TRUE;
+                       break;
+               }
+       }
+
+       if (refire)
+               taskq_dispatch_delay(system_taskq, zpl_posix_acl_free, NULL,
+                   TQ_SLEEP, new_time);
+
+       while (freelist) {
+               a = freelist;
+               freelist = a->next;
+               kfree(a->acl);
+               kmem_free(a, sizeof (struct acl_rel_struct));
+       }
+}
+
+void
+zpl_posix_acl_release_impl(struct posix_acl *acl)
+{
+       struct acl_rel_struct *a, **prev;
+
+       a = kmem_alloc(sizeof (struct acl_rel_struct), KM_SLEEP);
+       a->next = NULL;
+       a->acl = acl;
+       a->time = ddi_get_lbolt();
+       /* atomically points tail to us and get the previous tail */
+       prev = xchg(&acl_rel_tail, &a->next);
+       ASSERT3P(*prev, ==, NULL);
+       *prev = a;
+       /* if it was empty before, schedule the free task */
+       if (prev == &acl_rel_head)
+               taskq_dispatch_delay(system_taskq, zpl_posix_acl_free, NULL,
+                   TQ_SLEEP, ddi_get_lbolt() + ACL_REL_SCHED);
+}
+#endif