]> granicus.if.org Git - vim/commitdiff
patch 8.0.0957: a terminal job can deadlock when sending many keys v8.0.0957
authorBram Moolenaar <Bram@vim.org>
Fri, 18 Aug 2017 18:50:30 +0000 (20:50 +0200)
committerBram Moolenaar <Bram@vim.org>
Fri, 18 Aug 2017 18:50:30 +0000 (20:50 +0200)
Problem:    When term_sendkeys() sends many keys it may get stuck in writing
            to the job.
Solution:   Make the write non-blocking, buffer keys to be sent.

src/channel.c
src/proto/channel.pro
src/structs.h
src/terminal.c
src/testdir/test_terminal.vim
src/version.c

index e8030798ca8df510c776afaca51d8a691bae5232..fab35899a4a1bba4e0a61ebe0f22da0cb2113479 100644 (file)
@@ -1373,7 +1373,7 @@ can_write_buf_line(channel_T *channel)
 }
 
 /*
- * Write any lines to the input channel.
+ * Write any buffer lines to the input channel.
  */
     static void
 channel_write_in(channel_T *channel)
@@ -1445,6 +1445,25 @@ channel_buffer_free(buf_T *buf)
        }
 }
 
+/*
+ * Write any lines waiting to be written to "channel".
+ */
+    static void
+channel_write_input(channel_T *channel)
+{
+    chanpart_T *in_part = &channel->ch_part[PART_IN];
+
+    if (in_part->ch_writeque.wq_next != NULL)
+       channel_send(channel, PART_IN, (char_u *)"", 0, "channel_write_input");
+    else if (in_part->ch_bufref.br_buf != NULL)
+    {
+       if (in_part->ch_buf_append)
+           channel_write_new_lines(in_part->ch_bufref.br_buf);
+       else
+           channel_write_in(channel);
+    }
+}
+
 /*
  * Write any lines waiting to be written to a channel.
  */
@@ -1454,17 +1473,7 @@ channel_write_any_lines(void)
     channel_T  *channel;
 
     for (channel = first_channel; channel != NULL; channel = channel->ch_next)
-    {
-       chanpart_T  *in_part = &channel->ch_part[PART_IN];
-
-       if (in_part->ch_bufref.br_buf != NULL)
-       {
-           if (in_part->ch_buf_append)
-               channel_write_new_lines(in_part->ch_bufref.br_buf);
-           else
-               channel_write_in(channel);
-       }
-    }
+       channel_write_input(channel);
 }
 
 /*
@@ -2984,7 +2993,9 @@ channel_fill_wfds(int maxfd_arg, fd_set *wfds)
     {
        chanpart_T  *in_part = &ch->ch_part[PART_IN];
 
-       if (in_part->ch_fd != INVALID_FD && in_part->ch_bufref.br_buf != NULL)
+       if (in_part->ch_fd != INVALID_FD
+               && (in_part->ch_bufref.br_buf != NULL
+                   || in_part->ch_writeque.wq_next != NULL))
        {
            FD_SET((int)in_part->ch_fd, wfds);
            if ((int)in_part->ch_fd >= maxfd)
@@ -3529,6 +3540,31 @@ channel_handle_events(void)
 }
 # endif
 
+/*
+ * Set "channel"/"part" to non-blocking.
+ */
+    void
+channel_set_nonblock(channel_T *channel, ch_part_T part)
+{
+    chanpart_T *ch_part = &channel->ch_part[part];
+    int fd = ch_part->ch_fd;
+
+    if (fd != INVALID_FD)
+    {
+#ifdef _WIN32
+       if (part == PART_SOCK)
+       {
+           u_long      val = 1;
+
+           ioctlsocket(fd, FIONBIO, &val);
+       }
+       else
+#endif
+           fcntl(fd, F_SETFL, O_NONBLOCK);
+       ch_part->ch_nonblocking = TRUE;
+    }
+}
+
 /*
  * Write "buf" (NUL terminated string) to "channel"/"part".
  * When "fun" is not NULL an error message might be given.
@@ -3538,14 +3574,16 @@ channel_handle_events(void)
 channel_send(
        channel_T *channel,
        ch_part_T part,
-       char_u    *buf,
-       int       len,
+       char_u    *buf_arg,
+       int       len_arg,
        char      *fun)
 {
     int                res;
     sock_T     fd;
+    chanpart_T *ch_part = &channel->ch_part[part];
+    int                did_use_queue = FALSE;
 
-    fd = channel->ch_part[part].ch_fd;
+    fd = ch_part->ch_fd;
     if (fd == INVALID_FD)
     {
        if (!channel->ch_error && fun != NULL)
@@ -3561,29 +3599,145 @@ channel_send(
     {
        ch_log_lead("SEND ", channel);
        fprintf(log_fd, "'");
-       ignored = (int)fwrite(buf, len, 1, log_fd);
+       ignored = (int)fwrite(buf_arg, len_arg, 1, log_fd);
        fprintf(log_fd, "'\n");
        fflush(log_fd);
        did_log_msg = TRUE;
     }
 
-    if (part == PART_SOCK)
-       res = sock_write(fd, (char *)buf, len);
-    else
-       res = fd_write(fd, (char *)buf, len);
-    if (res != len)
+    for (;;)
     {
-       if (!channel->ch_error && fun != NULL)
+       writeq_T    *wq = &ch_part->ch_writeque;
+       char_u      *buf;
+       int         len;
+
+       if (wq->wq_next != NULL)
        {
-           ch_error(channel, "%s(): write failed", fun);
-           EMSG2(_("E631: %s(): write failed"), fun);
+           /* first write what was queued */
+           buf = wq->wq_next->wq_ga.ga_data;
+           len = wq->wq_next->wq_ga.ga_len;
+           did_use_queue = TRUE;
+       }
+       else
+       {
+           if (len_arg == 0)
+               /* nothing to write, called from channel_select_check() */
+               return OK;
+           buf = buf_arg;
+           len = len_arg;
        }
-       channel->ch_error = TRUE;
-       return FAIL;
-    }
 
-    channel->ch_error = FALSE;
-    return OK;
+       if (part == PART_SOCK)
+           res = sock_write(fd, (char *)buf, len);
+       else
+           res = fd_write(fd, (char *)buf, len);
+       if (res < 0 && (errno == EWOULDBLOCK
+#ifdef EAGAIN
+                       || errno == EAGAIN
+#endif
+                   ))
+           res = 0; /* nothing got written */
+
+       if (res >= 0 && ch_part->ch_nonblocking)
+       {
+           writeq_T *entry = wq->wq_next;
+
+           if (did_use_queue)
+               ch_log(channel, "Sent %d bytes now", res);
+           if (res == len)
+           {
+               /* Wrote all the buf[len] bytes. */
+               if (entry != NULL)
+               {
+                   /* Remove the entry from the write queue. */
+                   ga_clear(&entry->wq_ga);
+                   wq->wq_next = entry->wq_next;
+                   if (wq->wq_next == NULL)
+                       wq->wq_prev = NULL;
+                   else
+                       wq->wq_next->wq_prev = NULL;
+                   continue;
+               }
+               if (did_use_queue)
+                   ch_log(channel, "Write queue empty");
+           }
+           else
+           {
+               /* Wrote only buf[res] bytes, can't write more now. */
+               if (entry != NULL)
+               {
+                   if (res > 0)
+                   {
+                       /* Remove the bytes that were written. */
+                       mch_memmove(entry->wq_ga.ga_data,
+                                   (char *)entry->wq_ga.ga_data + res,
+                                   len - res);
+                       entry->wq_ga.ga_len -= res;
+                   }
+                   buf = buf_arg;
+                   len = len_arg;
+               }
+               else
+               {
+                   buf += res;
+                   len -= res;
+               }
+               ch_log(channel, "Adding %d bytes to the write queue", len);
+
+               /* Append the not written bytes of the argument to the write
+                * buffer.  Limit entries to 4000 bytes. */
+               if (wq->wq_prev != NULL
+                       && wq->wq_prev->wq_ga.ga_len + len < 4000)
+               {
+                   writeq_T *last = wq->wq_prev;
+
+                   /* append to the last entry */
+                   if (ga_grow(&last->wq_ga, len) == OK)
+                   {
+                       mch_memmove((char *)last->wq_ga.ga_data
+                                                         + last->wq_ga.ga_len,
+                                   buf, len);
+                       last->wq_ga.ga_len += len;
+                   }
+               }
+               else
+               {
+                   writeq_T *last = (writeq_T *)alloc((int)sizeof(writeq_T));
+
+                   if (last != NULL)
+                   {
+               ch_log(channel, "Creating new entry");
+                       last->wq_prev = wq->wq_prev;
+                       last->wq_next = NULL;
+                       if (wq->wq_prev == NULL)
+                           wq->wq_next = last;
+                       else
+                           wq->wq_prev->wq_next = last;
+                       wq->wq_prev = last;
+                       ga_init2(&last->wq_ga, 1, 1000);
+                       if (ga_grow(&last->wq_ga, len) == OK)
+                       {
+                           mch_memmove(last->wq_ga.ga_data, buf, len);
+                           last->wq_ga.ga_len = len;
+                       }
+                   }
+               }
+           }
+       }
+       else if (res != len)
+       {
+           if (!channel->ch_error && fun != NULL)
+           {
+               ch_error(channel, "%s(): write failed", fun);
+               EMSG2(_("E631: %s(): write failed"), fun);
+           }
+           channel->ch_error = TRUE;
+           return FAIL;
+       }
+
+       channel->ch_error = FALSE;
+       return OK;
+    }
 }
 
 /*
@@ -3873,13 +4027,7 @@ channel_select_check(int ret_in, void *rfds_in, void *wfds_in)
        if (ret > 0 && in_part->ch_fd != INVALID_FD
                                            && FD_ISSET(in_part->ch_fd, wfds))
        {
-           if (in_part->ch_buf_append)
-           {
-               if (in_part->ch_bufref.br_buf != NULL)
-                   channel_write_new_lines(in_part->ch_bufref.br_buf);
-           }
-           else
-               channel_write_in(channel);
+           channel_write_input(channel);
            --ret;
        }
     }
index 8989116730340da2b6d5c96258381b689a80db9c..f9a70140987c6fb41ee62f436e93ba02623f2105 100644 (file)
@@ -35,6 +35,7 @@ char_u *channel_read_block(channel_T *channel, ch_part_T part, int timeout);
 void common_channel_read(typval_T *argvars, typval_T *rettv, int raw);
 channel_T *channel_fd2channel(sock_T fd, ch_part_T *partp);
 void channel_handle_events(void);
+void channel_set_nonblock(channel_T *channel, ch_part_T part);
 int channel_send(channel_T *channel, ch_part_T part, char_u *buf, int len, char *fun);
 void ch_expr_common(typval_T *argvars, typval_T *rettv, int eval);
 void ch_raw_common(typval_T *argvars, typval_T *rettv, int eval);
index 40dfd9595687e153741971f086be94e6ce6d6f7a..444b30df3d8fe14f9f9e2d4d521d87d45a75085a 100644 (file)
@@ -1196,6 +1196,7 @@ typedef struct partial_S partial_T;
 
 typedef struct jobvar_S job_T;
 typedef struct readq_S readq_T;
+typedef struct writeq_S writeq_T;
 typedef struct jsonq_S jsonq_T;
 typedef struct cbq_S cbq_T;
 typedef struct channel_S channel_T;
@@ -1512,6 +1513,13 @@ struct readq_S
     readq_T    *rq_prev;
 };
 
+struct writeq_S
+{
+    garray_T   wq_ga;
+    writeq_T   *wq_next;
+    writeq_T   *wq_prev;
+};
+
 struct jsonq_S
 {
     typval_T   *jq_value;
@@ -1601,6 +1609,8 @@ typedef struct {
 #endif
     int                ch_block_write; /* for testing: 0 when not used, -1 when write
                                 * does not block, 1 simulate blocking */
+    int                ch_nonblocking; /* write() is non-blocking */
+    writeq_T   ch_writeque;    /* header for write queue */
 
     cbq_T      ch_cb_head;     /* dummy node for per-request callbacks */
     char_u     *ch_callback;   /* call when a msg is not handled */
index 5878db793b74def7558f09972e07075cc1b90d73..e8e187b8924a1551920a9fa4d0f0e3457e081b98 100644 (file)
@@ -400,6 +400,10 @@ term_start(typval_T *argvar, jobopt_T *opt, int forceit)
        vterm_get_size(term->tl_vterm, &term->tl_rows, &term->tl_cols);
        term_report_winsize(term, term->tl_rows, term->tl_cols);
 
+       /* Make sure we don't get stuck on sending keys to the job, it leads to
+        * a deadlock if the job is waiting for Vim to read. */
+       channel_set_nonblock(term->tl_job->jv_channel, PART_IN);
+
        if (old_curbuf != NULL)
        {
            --curbuf->b_nwindows;
index fe3d7df6781ca0f212ca43a7561d541f4ae7e982..c75d07f2b3e8540659ad87928a1b135079fd8c7a 100644 (file)
@@ -450,3 +450,16 @@ func Test_terminal_list_args()
   exe buf . 'bwipe!'
   call assert_equal("", bufname(buf))
 endfunction
+
+func Test_terminal_noblock()
+  let buf = term_start(&shell)
+
+  for c in ['a','b','c','d','e','f','g','h','i','j','k']
+    call term_sendkeys(buf, 'echo ' . repeat(c, 5000) . "\<cr>")
+  endfor
+
+  let g:job = term_getjob(buf)
+  call Stop_shell_in_terminal(buf)
+  call term_wait(buf)
+  bwipe
+endfunc
index f02af88c9fbf2f3977656e18ef23cf810c352bd7..9d12b1efb2913417375378b2293fddf35c2471d8 100644 (file)
@@ -769,6 +769,8 @@ static char *(features[]) =
 
 static int included_patches[] =
 {   /* Add new patch number below this line */
+/**/
+    957,
 /**/
     956,
 /**/