]> granicus.if.org Git - jq/commitdiff
Add Streaming parser (--stream)
authorNicolas Williams <nico@cryptonector.com>
Tue, 23 Dec 2014 05:06:27 +0000 (23:06 -0600)
committerNicolas Williams <nico@cryptonector.com>
Sat, 27 Dec 2014 05:05:56 +0000 (23:05 -0600)
Streaming means that outputs are produced as soon as possible.  With the
`foreach` syntax one can write programs which reduce portions of the
streaming parse of a large input (reduce into proper JSON values, for
example), and discard the rest, processing incrementally.

This:

    $ jq -c --stream .

should produce the same output as this:

    $ jq -c '. as $dot | path(..) as $p | $dot | getpath($p) | [$p,.]'

The output of `jq --stream .` should be a sequence of`[[<path>],<leaf>]`
and `[[<path>]]` values.  The latter indicate that the array/object at
that path ended.

Scalars and empty arrays and objects are leaf values for this purpose.

For example, a truncated input produces a path as soon as possible, then
later the error:

    $ printf '[0,\n'|./jq -c --stream .
    [[0],0]
    parse error: Unfinished JSON term at EOF at line 3, column 0
    $

builtin.c
docs/content/3.manual/manual.yml
execute.c
jq.h
jv.h
jv_parse.c
lexer.l
main.c
tests/run
tests/torture/input0.json [new file with mode: 0644]

index 4ffdc3bf2fd0f062740a1e39539c9a65012caf3d..9a5660a709ad7d524f37670e1bd448b7ce079f73 100644 (file)
--- a/builtin.c
+++ b/builtin.c
@@ -862,6 +862,18 @@ static jv f_modulemeta(jq_state *jq, jv a) {
   return load_module_meta(jq, a);
 }
 
+static jv f_input(jq_state *jq, jv input) {
+  jv_free(input);
+  jq_input_cb cb;
+  void *data;
+  jq_get_input_cb(jq, &cb, &data);
+  jv v = cb(jq, data);
+  if (jv_is_valid(v) || jv_invalid_has_msg(jv_copy(v)))
+    return v;
+  return jv_invalid_with_msg(jv_string("break"));
+}
+
+
 
 #define LIBM_DD(name) \
   {(cfunction_ptr)f_ ## name, "_" #name, 1},
@@ -912,6 +924,7 @@ static const struct cfunction function_list[] = {
   {(cfunction_ptr)f_env, "env", 1},
   {(cfunction_ptr)f_match, "_match_impl", 4},
   {(cfunction_ptr)f_modulemeta, "modulemeta", 1},
+  {(cfunction_ptr)f_input, "_input", 1},
 };
 #undef LIBM_DD
 
@@ -1014,6 +1027,7 @@ static const char* const jq_builtins[] = {
   "def nulls: select(type == \"null\");",
   "def values: select(. != null);",
   "def scalars: select(. == null or . == true or . == false or type == \"number\" or type == \"string\");",
+  "def scalars_or_empty: select(. == null or . == true or . == false or type == \"number\" or type == \"string\" or ((type==\"array\" or type==\"object\") and length==0));",
   "def leaf_paths: paths(scalars);",
   "def join($x): reduce .[] as $i (\"\"; . + (if . == \"\" then $i else $x + $i end));",
   "def flatten: reduce .[] as $i ([]; if $i | type == \"array\" then . + ($i | flatten) else . + [$i] end);",
index 00d776cdccf8a7ea06cacafb5c5b988ba95df1c9..862f261b75af8135e6b95df0e2f2d6bb3f25cc1d 100644 (file)
@@ -103,6 +103,17 @@ sections:
         RS.  This more also parses the output of jq without the `--seq`
         option.
 
+      * `--stream`:
+
+        Parse the input in streaming fashion, outputing arrays of path
+        and leaf values (scalars and empty arrays or empty objects).
+        For example, `"a"` becomes `[[],"a"]`, and `[[],"a",["b"]]`
+        becomes `[[0],[]]`, `[[1],"a"]`, and `[[1,0],"b"]`.
+
+        This is useful for processing very large inputs.  Use this in
+        conjunction with filtering and the `reduce` and `foreach` syntax
+        to reduce large inputs incrementally.
+
       * `--slurp`/`-s`:
 
         Instead of running the filter for each JSON object in the
@@ -2205,6 +2216,29 @@ sections:
             input: '1'
             output: ['[1,2,4,8,16,32,64]']
 
+  - title: 'I/O'
+    body: |
+
+      At this time jq has minimal support for I/O, mostly in the
+      form of control over when inputs are read.  Two builtins functions
+      are provided for this, `input` and `inputs`, that read from the
+      same sources (e.g., `stdin`, files named on the command-line) as
+      jq itself.  These two builtins, and jq's own reading actions, can
+      be interleaved with each other.
+
+      - title: "`input`"
+        body: |
+
+          Outputs one new input.
+
+      - title: "`inputs`"
+        body: |
+
+          Outputs all remaining inputs, one by one.
+
+          This is primarily useful for reductions over a program's
+          inputs.
+
   - title: Assignment
     body: |
 
index 231bd2189785f347a3e56a4eb1f5eccc291786f6..556b65db9cf3ee0215dc86d919019bd9801d37f9 100644 (file)
--- a/execute.c
+++ b/execute.c
@@ -39,6 +39,8 @@ struct jq_state {
   int initial_execution;
 
   jv attrs;
+  jq_input_cb input_cb;
+  void *input_cb_data;
 };
 
 struct closure {
@@ -1037,3 +1039,13 @@ jv jq_get_attr(jq_state *jq, jv attr) {
 void jq_dump_disassembly(jq_state *jq, int indent) {
   dump_disassembly(indent, jq->bc);
 }
+
+void jq_set_input_cb(jq_state *jq, jq_input_cb cb, void *data) {
+  jq->input_cb = cb;
+  jq->input_cb_data = data;
+}
+
+void jq_get_input_cb(jq_state *jq, jq_input_cb *cb, void **data) {
+  *cb = jq->input_cb;
+  *data = jq->input_cb_data;
+}
diff --git a/jq.h b/jq.h
index e3730cc605a8c5c2a9ebec9b4b627038f736dd4c..e410fddec707f26c638d924824ae7c0a54e12b55 100644 (file)
--- a/jq.h
+++ b/jq.h
@@ -22,6 +22,10 @@ void jq_start(jq_state *, jv value, int);
 jv jq_next(jq_state *);
 void jq_teardown(jq_state **);
 
+typedef jv (*jq_input_cb)(jq_state *, void *);
+void jq_set_input_cb(jq_state *, jq_input_cb, void *);
+void jq_get_input_cb(jq_state *, jq_input_cb *, void **);
+
 void jq_set_attrs(jq_state *, jv);
 jv jq_get_attrs(jq_state *);
 jv jq_get_jq_origin(jq_state *);
diff --git a/jv.h b/jv.h
index 67833ad1401bc9c8839e40202d04ca17b150a5c0..9e7077b426c4c09a043c4b8d485cf9d11e604fe5 100644 (file)
--- a/jv.h
+++ b/jv.h
@@ -170,7 +170,11 @@ void jv_dump(jv, int flags);
 void jv_show(jv, int flags);
 jv jv_dump_string(jv, int flags);
 
-#define JV_PARSE_SEQ 1
+enum {
+  JV_PARSE_SEQ              = 1,
+  JV_PARSE_STREAMING        = 2,
+  JV_PARSE_STREAM_ERRORS    = 4,
+};
 
 jv jv_parse(const char* string);
 jv jv_parse_sized(const char* string, int length);
@@ -183,6 +187,7 @@ jv jv_load_file(const char *, int);
 struct jv_parser;
 struct jv_parser* jv_parser_new(int);
 void jv_parser_set_buf(struct jv_parser*, const char*, int, int);
+int jv_parser_remaining(struct jv_parser*);
 jv jv_parser_next(struct jv_parser*);
 void jv_parser_free(struct jv_parser*);
 
index 3ed04f78650a530d82195d72e874755c7832f1f5..490c5778ad26e3d3f4c0a0c124da5b7cd3bd41a8 100644 (file)
@@ -17,6 +17,15 @@ typedef const char* presult;
 #define pfunc presult
 #endif
 
+enum last_seen {
+  JV_LAST_NONE = 0,
+  JV_LAST_OPEN_ARRAY = '[',
+  JV_LAST_OPEN_OBJECT = '{',
+  JV_LAST_COLON = ':',
+  JV_LAST_COMMA = ',',
+  JV_LAST_VALUE = 'V',
+};
+
 struct jv_parser {
   const char* curr_buf;
   int curr_buf_length;
@@ -26,10 +35,13 @@ struct jv_parser {
 
   int flags;
 
-  jv* stack;
-  int stackpos;
-  int stacklen;
-  jv next;
+  jv* stack;                   // parser
+  int stackpos;                // parser
+  int stacklen;                // both (optimization; it's really pathlen for streaming)
+  jv path;                     // streamer
+  enum last_seen last_seen;    // streamer
+  jv output;                   // streamer
+  jv next;                     // both
   
   char* tokenbuf;
   int tokenpos;
@@ -49,10 +61,18 @@ struct jv_parser {
 };
 
 
-static void parser_init(struct jv_parser* p) {
-  p->flags = 0;
+static void parser_init(struct jv_parser* p, int flags) {
+  p->flags = flags;
+  if ((p->flags & JV_PARSE_STREAMING)) {
+    p->path = jv_array();
+  } else {
+    p->path = jv_invalid();
+    p->flags &= ~(JV_PARSE_STREAM_ERRORS);
+  }
   p->stack = 0;
   p->stacklen = p->stackpos = 0;
+  p->last_seen = JV_LAST_NONE;
+  p->output = jv_invalid();
   p->next = jv_invalid();
   p->tokenbuf = 0;
   p->tokenlen = p->tokenpos = 0;
@@ -66,6 +86,13 @@ static void parser_init(struct jv_parser* p) {
 }
 
 static void parser_reset(struct jv_parser* p) {
+  if ((p->flags & JV_PARSE_STREAMING)) {
+    jv_free(p->path);
+    p->path = jv_array();
+  }
+  p->last_seen = JV_LAST_NONE;
+  jv_free(p->output);
+  p->output = jv_invalid();
   jv_free(p->next);
   p->next = jv_invalid();
   for (int i=0; i<p->stackpos; i++) 
@@ -77,13 +104,23 @@ static void parser_reset(struct jv_parser* p) {
 
 static void parser_free(struct jv_parser* p) {
   parser_reset(p);
+  jv_free(p->path);
   jv_mem_free(p->stack);
   jv_mem_free(p->tokenbuf);
   jvp_dtoa_context_free(&p->dtoa);
 }
 
 static pfunc value(struct jv_parser* p, jv val) {
-  if (jv_is_valid(p->next)) return "Expected separator between values";
+  if ((p->flags & JV_PARSE_STREAMING)) {
+    if (jv_is_valid(p->next) || p->last_seen == JV_LAST_VALUE)
+      return "Expected separator between values";
+    if (p->stacklen > 0)
+      p->last_seen = JV_LAST_VALUE;
+    else
+      p->last_seen = JV_LAST_NONE;
+  } else {
+    if (jv_is_valid(p->next)) return "Expected separator between values";
+  }
   jv_free(p->next);
   p->next = val;
   return 0;
@@ -99,7 +136,7 @@ static void push(struct jv_parser* p, jv v) {
   p->stack[p->stackpos++] = v;
 }
 
-static pfunc token(struct jv_parser* p, char ch) {
+static pfunc parse_token(struct jv_parser* p, char ch) {
   switch (ch) {
   case '[':
     if (jv_is_valid(p->next)) return "Expected separator between values";
@@ -182,6 +219,159 @@ static pfunc token(struct jv_parser* p, char ch) {
   return 0;
 }
 
+static pfunc stream_token(struct jv_parser* p, char ch) {
+  jv_kind k;
+  jv last;
+
+  switch (ch) {
+  case '[':
+    if (jv_is_valid(p->next))
+      return "Expected a separator between values";
+    p->path = jv_array_append(p->path, jv_number(0)); // push
+    p->last_seen = JV_LAST_OPEN_ARRAY;
+    p->stacklen++;
+    break;
+
+  case '{':
+    if (p->last_seen == JV_LAST_VALUE)
+      return "Expected a separator between values";
+    // Push object key: null, since we don't know it yet
+    p->path = jv_array_append(p->path, jv_null()); // push
+    p->last_seen = JV_LAST_OPEN_OBJECT;
+    p->stacklen++;
+    break;
+
+  case ':':
+    if (p->stacklen == 0 || jv_get_kind(jv_array_get(jv_copy(p->path), p->stacklen - 1)) == JV_KIND_NUMBER)
+      return "':' not as part of an object";
+    if (!jv_is_valid(p->next) || p->last_seen == JV_LAST_NONE)
+      return "Expected string key before ':'";
+    if (jv_get_kind(p->next) != JV_KIND_STRING)
+      return "Object keys must be strings";
+    if (p->last_seen != JV_LAST_VALUE)
+      return "':' should follow a key";
+    p->last_seen = JV_LAST_COLON;
+    p->path = jv_array_set(p->path, p->stacklen - 1, p->next);
+    p->next = jv_invalid();
+    break;
+
+  case ',':
+    if (p->last_seen != JV_LAST_VALUE)
+      return "Expected value before ','";
+    if (p->stacklen == 0)
+      return "',' not as part of an object or array";
+    last = jv_array_get(jv_copy(p->path), p->stacklen - 1);
+    k = jv_get_kind(last);
+    if (k == JV_KIND_NUMBER) {
+      int idx = jv_number_value(last);
+
+      if (jv_is_valid(p->next)) {
+        p->output = JV_ARRAY(jv_copy(p->path), p->next);
+        p->next = jv_invalid();
+      }
+      p->path = jv_array_set(p->path, p->stacklen - 1, jv_number(idx + 1));
+      p->last_seen = JV_LAST_COMMA;
+    } else if (k == JV_KIND_STRING) {
+      if (jv_is_valid(p->next)) {
+        p->output = JV_ARRAY(jv_copy(p->path), p->next);
+        p->next = jv_invalid();
+      }
+      p->path = jv_array_set(p->path, p->stacklen - 1, jv_true()); // ready for another name:value pair
+      p->last_seen = JV_LAST_COMMA;
+    } else {
+      assert(k == JV_KIND_NULL);
+      // this case hits on input like {,}
+      // make sure to handle input like {"a", "b"} and {"a":, ...}
+      jv_free(last);
+      return "Objects must consist of key:value pairs";
+    }
+    jv_free(last);
+    break;
+
+  case ']':
+    if (p->stacklen == 0)
+      return "Unmatched ']' at the top-level";
+    if (p->last_seen == JV_LAST_COMMA)
+      return "Expected another array element";
+    if (p->last_seen == JV_LAST_OPEN_ARRAY)
+      assert(!jv_is_valid(p->next));
+
+    last = jv_array_get(jv_copy(p->path), p->stacklen - 1);
+    k = jv_get_kind(last);
+    jv_free(last);
+
+    if (k != JV_KIND_NUMBER)
+      return "Unmatched ']' in the middle of an object";
+    if (jv_is_valid(p->next)) {
+      p->output = JV_ARRAY(jv_copy(p->path), p->next, jv_true());
+      p->next = jv_invalid();
+    } else if (p->last_seen != JV_LAST_OPEN_ARRAY) {
+      p->output = JV_ARRAY(jv_copy(p->path));
+    }
+
+    p->path = jv_array_slice(p->path, 0, --(p->stacklen)); // pop
+    //assert(!jv_is_valid(p->next));
+    jv_free(p->next);
+    p->next = jv_invalid();
+
+    if (p->last_seen == JV_LAST_OPEN_ARRAY)
+      p->output = JV_ARRAY(jv_copy(p->path), jv_array()); // Empty arrays are leaves
+
+    if (p->stacklen == 0)
+      p->last_seen = JV_LAST_NONE;
+    else
+      p->last_seen = JV_LAST_VALUE;
+    break;
+
+  case '}':
+    if (p->stacklen == 0)
+      return "Unmatched '}' at the top-level";
+    if (p->last_seen == JV_LAST_COMMA)
+      return "Expected another key:value pair";
+    if (p->last_seen == JV_LAST_OPEN_OBJECT)
+      assert(!jv_is_valid(p->next));
+
+    last = jv_array_get(jv_copy(p->path), p->stacklen - 1);
+    k = jv_get_kind(last);
+    jv_free(last);
+    if (k == JV_KIND_NUMBER)
+      return "Unmatched '}' in the middle of an array";
+
+    if (jv_is_valid(p->next)) {
+      if (k != JV_KIND_STRING)
+        return "Objects must consist of key:value pairs";
+      p->output = JV_ARRAY(jv_copy(p->path), p->next, jv_true());
+      p->next = jv_invalid();
+    } else {
+      // Perhaps {"a":[]}
+      if (p->last_seen == JV_LAST_COLON)
+        // Looks like {"a":}
+        return "Missing value in key:value pair";
+      if (p->last_seen == JV_LAST_COMMA)
+        // Looks like {"a":0,}
+        return "Expected another key-value pair";
+      if (p->last_seen == JV_LAST_OPEN_ARRAY)
+        return "Unmatched '}' in the middle of an array";
+      if (p->last_seen != JV_LAST_VALUE && p->last_seen != JV_LAST_OPEN_OBJECT)
+        return "Unmatched '}'";
+      if (p->last_seen != JV_LAST_OPEN_OBJECT)
+        p->output = JV_ARRAY(jv_copy(p->path));
+    }
+    p->path = jv_array_slice(p->path, 0, --(p->stacklen)); // pop
+    jv_free(p->next);
+    p->next = jv_invalid();
+
+    if (p->last_seen == JV_LAST_OPEN_OBJECT)
+      p->output = JV_ARRAY(jv_copy(p->path), jv_object()); // Empty arrays are leaves
+
+    if (p->stacklen == 0)
+      p->last_seen = JV_LAST_NONE;
+    else
+      p->last_seen = JV_LAST_VALUE;
+    break;
+  }
+  return 0;
+}
 
 static void tokenadd(struct jv_parser* p, char c) {
   assert(p->tokenpos <= p->tokenlen);
@@ -191,6 +381,7 @@ static void tokenadd(struct jv_parser* p, char c) {
   }
   assert(p->tokenpos < p->tokenlen);
   p->tokenbuf[p->tokenpos++] = c;
+  p->tokenbuf[p->tokenpos] = '\0'; // for debugging
 }
 
 static int unhex4(char* hex) {
@@ -329,7 +520,7 @@ static chclass classify(char c) {
 
 static const presult OK = "output produced";
 
-static int check_done(struct jv_parser* p, jv* out) {
+static int parse_check_done(struct jv_parser* p, jv* out) {
   if (p->stackpos == 0 && jv_is_valid(p->next)) {
     *out = p->next;
     p->next = jv_invalid();
@@ -339,6 +530,50 @@ static int check_done(struct jv_parser* p, jv* out) {
   }
 }
 
+static int stream_check_done(struct jv_parser* p, jv* out) {
+  if (p->stacklen == 0 && jv_is_valid(p->next)) {
+    *out = JV_ARRAY(jv_copy(p->path),p->next);
+    p->next = jv_invalid();
+    return 1;
+  } else if (jv_is_valid(p->output)) {
+    if (jv_array_length(jv_copy(p->output)) > 2) {
+      // At end of an array or object, necessitating one more output by
+      // which to indicate this
+      *out = jv_array_slice(jv_copy(p->output), 0, 2);
+      p->output = jv_array_slice(p->output, 0, 1);      // arrange one more output
+    } else {
+      // No further processing needed
+      *out = p->output;
+      p->output = jv_invalid();
+    }
+    return 1;
+  } else {
+    return 0;
+  }
+}
+
+static int parse_check_truncation(struct jv_parser* p, jv out) {
+  return ((p->flags & JV_PARSE_SEQ) && !p->last_ch_was_ws && jv_get_kind(out) == JV_KIND_NUMBER);
+}
+
+static int stream_check_truncation(struct jv_parser* p, jv out) {
+  if (!jv_is_valid(out))
+    return 0;
+  jv v = jv_array_get(jv_copy(out), 1);
+  jv_kind k = jv_get_kind(v);
+  jv_free(v);
+  return (k == JV_KIND_NUMBER || k == JV_KIND_TRUE || k == JV_KIND_FALSE || k == JV_KIND_NULL);
+}
+
+#define check_done(p, out) \
+   (((p)->flags & JV_PARSE_STREAMING) ? stream_check_done((p), (out)) : parse_check_done((p), (out)))
+
+#define token(p, ch) \
+   (((p)->flags & JV_PARSE_STREAMING) ? stream_token((p), (ch)) : parse_token((p), (ch)))
+
+#define check_truncation(p, o) \
+    (((p)->flags & JV_PARSE_STREAMING) ? stream_check_truncation((p), (o)) : parse_check_truncation((p), (o)))
+
 static pfunc scan(struct jv_parser* p, char ch, jv* out) {
   p->column++;
   if (ch == '\n') {
@@ -348,7 +583,7 @@ static pfunc scan(struct jv_parser* p, char ch, jv* out) {
   if (ch == '\036' /* ASCII RS; see draft-ietf-json-sequence-07 */) {
     TRY(check_literal(p));
     if (p->st == JV_PARSER_NORMAL && check_done(p, out)) {
-      if ((p->flags & JV_PARSE_SEQ) && !p->last_ch_was_ws && jv_get_kind(*out) == JV_KIND_NUMBER) {
+      if (check_truncation(p, *out)) {
         jv_free(*out);
         *out = jv_invalid();
         return "Potentially truncated top-level numeric value";
@@ -404,7 +639,7 @@ static pfunc scan(struct jv_parser* p, char ch, jv* out) {
 
 struct jv_parser* jv_parser_new(int flags) {
   struct jv_parser* p = jv_mem_alloc(sizeof(struct jv_parser));
-  parser_init(p);
+  parser_init(p, flags);
   p->flags = flags;
   return p;
 }
@@ -416,6 +651,12 @@ void jv_parser_free(struct jv_parser* p) {
 
 static const unsigned char UTF8_BOM[] = {0xEF,0xBB,0xBF};
 
+int jv_parser_remaining(struct jv_parser* p) {
+  if (p->curr_buf == 0)
+    return 0;
+  return (p->curr_buf_length - p->curr_buf_pos);
+}
+
 void jv_parser_set_buf(struct jv_parser* p, const char* buf, int length, int is_partial) {
   assert((p->curr_buf == 0 || p->curr_buf_pos == p->curr_buf_length)
          && "previous buffer not exhausted");
@@ -441,10 +682,25 @@ void jv_parser_set_buf(struct jv_parser* p, const char* buf, int length, int is_
   p->curr_buf_is_partial = is_partial;
 }
 
+static jv make_error(struct jv_parser*, const char *, ...) JV_PRINTF_LIKE(2, 3);
+
+static jv make_error(struct jv_parser* p, const char *fmt, ...) {
+  va_list ap;
+  va_start(ap, fmt);
+  jv e = jv_string_vfmt(fmt, ap);
+  va_end(ap);
+  if ((p->flags & JV_PARSE_STREAM_ERRORS))
+    return JV_ARRAY(e, jv_copy(p->path));
+  return jv_invalid_with_msg(e);
+}
+
 jv jv_parser_next(struct jv_parser* p) {
-  assert(p->curr_buf && "a buffer must be provided");
+  if (!p->curr_buf)
+    return jv_invalid(); // Need a buffer
   if (p->bom_strip_position == 0xff) return jv_invalid_with_msg(jv_string("Malformed BOM"));
-  jv value;
+  jv value = jv_invalid();
+  if ((p->flags & JV_PARSE_STREAMING) && stream_check_done(p, &value))
+    return value;
   char ch;
   presult msg = 0;
   while (!msg && p->curr_buf_pos < p->curr_buf_length) {
@@ -456,18 +712,21 @@ jv jv_parser_next(struct jv_parser* p) {
   if (msg == OK) {
     return value;
   } else if (msg) {
-    parser_reset(p);
     if (ch != '\036' && (p->flags & JV_PARSE_SEQ)) {
       // Skip to the next RS
       p->st = JV_PARSER_WAITING_FOR_RS;
-      return jv_invalid_with_msg(jv_string_fmt("%s at line %d, column %d (need RS to resync)", msg, p->line, p->column));
+      value = make_error(p, "%s at line %d, column %d (need RS to resync)", msg, p->line, p->column);
+      parser_reset(p);
+      return value;
     }
+    value = make_error(p, "%s at line %d, column %d", msg, p->line, p->column);
+    parser_reset(p);
     if (!(p->flags & JV_PARSE_SEQ)) {
       // We're not parsing a JSON text sequence; throw this buffer away.
       p->curr_buf = 0;
       p->curr_buf_pos = 0;
     } // Else ch must be RS; don't clear buf so we can start parsing again after this ch
-    return jv_invalid_with_msg(jv_string_fmt("%s at line %d, column %d", msg, p->line, p->column));
+    return value;
   } else if (p->curr_buf_is_partial) {
     assert(p->curr_buf_pos == p->curr_buf_length);
     // need another buffer
@@ -477,28 +736,36 @@ jv jv_parser_next(struct jv_parser* p) {
     // at EOF
     if (p->st != JV_PARSER_WAITING_FOR_RS) {
       if (p->st != JV_PARSER_NORMAL) {
+        value = make_error(p, "Unfinished string at EOF at line %d, column %d", p->line, p->column);
         parser_reset(p);
         p->st = JV_PARSER_WAITING_FOR_RS;
-        return jv_invalid_with_msg(jv_string("Unfinished string"));
+        return value;
       }
       if ((msg = check_literal(p))) {
+        value = make_error(p, "%s at EOF at line %d, column %d", msg, p->line, p->column);
         parser_reset(p);
         p->st = JV_PARSER_WAITING_FOR_RS;
-        return jv_invalid_with_msg(jv_string(msg));
+        return value;
       }
-      if (p->stackpos != 0) {
+      if (((p->flags & JV_PARSE_STREAMING) && p->stacklen != 0) ||
+          (!(p->flags & JV_PARSE_STREAMING) && p->stackpos != 0)) {
+        value = make_error(p, "Unfinished JSON term at EOF at line %d, column %d", p->line, p->column);
         parser_reset(p);
         p->st = JV_PARSER_WAITING_FOR_RS;
-        return jv_invalid_with_msg(jv_string("Unfinished JSON term"));
+        return value;
       }
     }
     // p->next is either invalid (nothing here but no syntax error)
     // or valid (this is the value). either way it's the thing to return
-    value = p->next;
+    if ((p->flags & JV_PARSE_STREAMING) && jv_is_valid(p->next)) {
+      value = JV_ARRAY(jv_copy(p->path), p->next); // except in streaming mode we've got to make it [path,value]
+    } else {
+      value = p->next;
+    }
     p->next = jv_invalid();
     if ((p->flags & JV_PARSE_SEQ) && !p->last_ch_was_ws && jv_get_kind(value) == JV_KIND_NUMBER) {
       jv_free(value);
-      return jv_invalid_with_msg(jv_string("Potentially truncated top-level numeric value"));
+      return make_error(p, "Potentially truncated top-level numeric value at EOF at line %d, column %d", p->line, p->column);
     }
     return value;
   }
@@ -506,7 +773,7 @@ jv jv_parser_next(struct jv_parser* p) {
 
 jv jv_parse_sized(const char* string, int length) {
   struct jv_parser parser;
-  parser_init(&parser);
+  parser_init(&parser, 0);
   jv_parser_set_buf(&parser, string, length, 0);
   jv value = jv_parser_next(&parser);
   if (jv_is_valid(value)) {
diff --git a/lexer.l b/lexer.l
index 7813850f0431f7c5617d6432f1f09464a0c70dbd..70ad050bcded0455f8f9961d3066cabf69ef1385 100644 (file)
--- a/lexer.l
+++ b/lexer.l
@@ -100,7 +100,7 @@ struct lexer_param;
   }
   (\\[^u(]|\\u[a-zA-Z0-9]{0,4})+ {
     /* pass escapes to the json parser */
-    jv escapes = jv_string_fmt("\"%.*s\"", yyleng, yytext);
+    jv escapes = jv_string_fmt("\"%.*s\"", (int)yyleng, yytext);
     yylval->literal = jv_parse_sized(jv_string_value(escapes), jv_string_length_bytes(jv_copy(escapes)));
     jv_free(escapes);
     return QQSTRING_TEXT;
diff --git a/main.c b/main.c
index 2d022bb3d92353a0625da13e8bc7bc5e7bf8e098..09c834a0b0f4864ec35bf31a738988773ea75da7 100644 (file)
--- a/main.c
+++ b/main.c
@@ -149,6 +149,7 @@ static int process(jq_state *jq, jv value, int flags) {
   return ret;
 }
 
+// XXX Move all this into the next_input_state struct
 FILE* current_input;
 const char** input_filenames = NULL;
 int ninput_files;
@@ -175,6 +176,7 @@ static int read_more(char* buf, size_t size) {
     }
   }
 
+  buf[0] = 0;
   if (current_input) {
     if (!fgets(buf, size, current_input))
       buf[0] = 0;
@@ -182,10 +184,61 @@ static int read_more(char* buf, size_t size) {
   return next_input_idx == ninput_files && (!current_input || feof(current_input));
 }
 
+struct next_input_state {
+  struct jv_parser *parser;
+  jv slurped;
+  char buf[4096];
+};
+
+// Blocks to read one more input from stdin and/or given files
+// When slurping, it returns just one value
+static jv next_input(jq_state *jq, void *data) {
+  struct next_input_state *state = data;
+  int is_last = 0;
+  jv value = jv_invalid(); // need more input
+  do {
+    if (options & RAW_INPUT) {
+      is_last = read_more(state->buf, sizeof(state->buf));
+      if (state->buf[0] == '\0')
+        continue;
+      int len = strlen(state->buf); // Raw input doesn't support NULs
+      if (len > 0) {
+        if (options & SLURP) {
+          state->slurped = jv_string_concat(state->slurped, jv_string(state->buf));
+        } else if (jv_is_valid(value)) {
+          if (state->buf[len-1] == '\n') {
+            // whole line
+            state->buf[len-1] = 0;
+            return jv_string_concat(value, jv_string(state->buf));
+          }
+          value = jv_string_concat(value, jv_string(state->buf));
+        }
+      }
+    } else {
+      if (jv_parser_remaining(state->parser) == 0) {
+        is_last = read_more(state->buf, sizeof(state->buf));
+        jv_parser_set_buf(state->parser, state->buf, strlen(state->buf), !is_last); // NULs also not supported here
+      }
+      value = jv_parser_next(state->parser);
+      if (options & SLURP) {
+        if (jv_is_valid(value)) {
+          state->slurped = jv_array_append(state->slurped, value);
+          value = jv_invalid();
+        } else if (jv_invalid_has_msg(jv_copy(value)))
+          return value;
+      } else if (jv_is_valid(value) || jv_invalid_has_msg(jv_copy(value))) {
+        return value;
+      }
+    }
+  } while (!is_last);
+  return value;
+}
+
 int main(int argc, char* argv[]) {
   jq_state *jq = NULL;
   int ret = 0;
   int compiled = 0;
+  int parser_flags = 0;
   char *t = NULL;
 
   if (argc) progname = argv[0];
@@ -293,6 +346,14 @@ int main(int argc, char* argv[]) {
         options |= SEQ;
         if (!short_opts) continue;
       }
+      if (isoption(argv[i], 0, "stream", &short_opts)) {
+        parser_flags |= JV_PARSE_STREAMING;
+        if (!short_opts) continue;
+      }
+      if (isoption(argv[i], 0, "stream-errors", &short_opts)) {
+        parser_flags |= JV_PARSE_STREAM_ERRORS;
+        if (!short_opts) continue;
+      }
       if (isoption(argv[i], 'e', "exit-status", &short_opts)) {
         options |= EXIT_STATUS;
         if (!short_opts) continue;
@@ -476,66 +537,57 @@ int main(int argc, char* argv[]) {
     printf("\n");
   }
 
+  // XXX Refactor this and input_filenames[] and related setup into a
+  // function to setup struct next_input_state.
+  if ((options & SEQ))
+    parser_flags |= JV_PARSE_SEQ;
+
+  struct next_input_state input_state;
+  input_state.parser = jv_parser_new(parser_flags);
+  if ((options & RAW_INPUT) && (options & SLURP))
+    input_state.slurped = jv_string("");
+  else if ((options & SLURP))
+    input_state.slurped = jv_array();
+  else
+    input_state.slurped = jv_invalid();
+
+  // Let jq program read from inputs
+  jq_set_input_cb(jq, next_input, &input_state);
+
   if (options & PROVIDE_NULL) {
     ret = process(jq, jv_null(), jq_flags);
   } else {
-    jv slurped;
-    if (options & SLURP) {
-      if (options & RAW_INPUT) {
-        slurped = jv_string("");
-      } else {
-        slurped = jv_array();
+    jv value;
+    while (jv_is_valid((value = next_input(jq, &input_state))) || jv_invalid_has_msg(jv_copy(value))) {
+      if (jv_is_valid(value)) {
+        ret = process(jq, value, jq_flags);
+        continue;
       }
-    }
-    struct jv_parser* parser = jv_parser_new((options & SEQ) ? JV_PARSE_SEQ : 0);
-    char buf[4096];
-    int is_last = 0;
-    do {
-      is_last = read_more(buf, sizeof(buf));
-      if (options & RAW_INPUT) {
-        int len = strlen(buf);
-        if (len > 0) {
-          if (options & SLURP) {
-            slurped = jv_string_concat(slurped, jv_string(buf));
-          } else {
-            if (buf[len-1] == '\n') buf[len-1] = 0;
-            ret = process(jq, jv_string(buf), jq_flags);
-          }
-        }
-      } else {
-        jv_parser_set_buf(parser, buf, strlen(buf), !is_last);
-        jv value;
-        while (jv_is_valid(value = jv_parser_next(parser)) || jv_invalid_has_msg(jv_copy(value))) {
-          if (!jv_is_valid(value)) {
-            jv msg = jv_invalid_get_msg(value);
-            if (!(options & SEQ)) {
-              // We used to treat parse errors as fatal...
-              ret = 4;
-              fprintf(stderr, "parse error: %s\n", jv_string_value(msg));
-              jv_free(msg);
-              break;
-            }
-            fprintf(stderr, "ignoring parse error: %s\n", jv_string_value(msg));
-            jv_free(msg);
-            // ...but with --seq we attempt to recover.
-            continue;
-          }
-          if (options & SLURP) {
-            slurped = jv_array_append(slurped, value);
-          } else {
-            ret = process(jq, value, jq_flags);
-            value = jv_invalid();
-          }
-        }
+
+      // Parse error
+      jv msg = jv_invalid_get_msg(value);
+      if (!(options & SEQ)) {
+        // --seq -> errors are not fatal
+        ret = 4;
+        fprintf(stderr, "parse error: %s\n", jv_string_value(msg));
+        jv_free(msg);
+        break;
       }
-    } while (!is_last);
-    jv_parser_free(parser);
-    if (ret != 0)
-      goto out;
+      fprintf(stderr, "ignoring parse error: %s\n", jv_string_value(msg));
+      jv_free(msg);
+    }
     if (options & SLURP) {
-      ret = process(jq, slurped, jq_flags);
+      ret = process(jq, input_state.slurped, jq_flags);
+      input_state.slurped = jv_invalid();
     }
   }
+
+  jv_free(input_state.slurped);
+  jv_parser_free(input_state.parser);
+
+  if (ret != 0)
+    goto out;
+
   if ((options & IN_PLACE)) {
     FILE *devnull;
 #ifdef WIN32
index c39e7fa7880e90826c7701d941ad643472cc11fb..ab63268217c163674774b153ce27de72c85b7b64 100755 (executable)
--- a/tests/run
+++ b/tests/run
@@ -20,7 +20,6 @@ if [ -z "$d" ]; then
     exit 0
 fi
 
-
 ## Test constant folding
 
 # String constant folding (addition only)
@@ -101,14 +100,14 @@ cmp $d/out $d/expected
 # Note that here jq sees no inputs at all but it still succeeds because
 # --seq ignores parse errors
 cat > $d/expected <<EOF
-ignoring parse error: Unfinished string
+ignoring parse error: Unfinished string at EOF at line 1, column 4
 EOF
 printf '"foo'|./jq -ce --seq . > $d/out 2>&1
 cmp $d/out $d/expected
 
 # Numeric values truncated by EOF are ignored
 cat > $d/expected <<EOF
-ignoring parse error: Potentially truncated top-level numeric value
+ignoring parse error: Potentially truncated top-level numeric value at EOF at line 1, column 1
 EOF
 printf '1'|./jq -ce --seq . > $d/out 2>&1
 cmp $d/out $d/expected
@@ -118,6 +117,12 @@ EOF
 printf '1\n'|./jq -ces --seq '. == [1]' >/dev/null 2> $d/out
 cmp $d/out $d/expected
 
+## Test streaming parser
+
+$VALGRIND $Q ./jq -c '. as $d|path(..) as $p|$d|getpath($p)|scalars_or_empty|[$p,.]' < "$PWD/tests/torture/input0.json" > $d/out0
+$VALGRIND $Q ./jq --stream -c '.|select(length==2)' < "$PWD/tests/torture/input0.json" > $d/out1
+diff $d/out0 $d/out1
+
 ## Test library/module system
 
 mods=$PWD/tests/modules
diff --git a/tests/torture/input0.json b/tests/torture/input0.json
new file mode 100644 (file)
index 0000000..8dd046b
--- /dev/null
@@ -0,0 +1,7 @@
+0 12 true false null [] {} [{}] [[]]
+[0,01,[12,22,[34,[45,56],7]],[]]
+{"a":[1]}
+{"a":[{}]}
+{"a":[{},[],[[]]]}
+{"a":[{"b":[]}]}
+{"a":[{"b":[]},{},[2]]}