]> granicus.if.org Git - apache/blob - modules/http2/h2_stream.c
backport of mod_http2 v1.3.2 minus event conn-status fixup
[apache] / modules / http2 / h2_stream.c
1 /* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <assert.h>
17 #include <stddef.h>
18
19 #include <httpd.h>
20 #include <http_core.h>
21 #include <http_connection.h>
22 #include <http_log.h>
23
24 #include <nghttp2/nghttp2.h>
25
26 #include "h2_private.h"
27 #include "h2_conn.h"
28 #include "h2_config.h"
29 #include "h2_h2.h"
30 #include "h2_filter.h"
31 #include "h2_mplx.h"
32 #include "h2_push.h"
33 #include "h2_request.h"
34 #include "h2_response.h"
35 #include "h2_session.h"
36 #include "h2_stream.h"
37 #include "h2_task.h"
38 #include "h2_ctx.h"
39 #include "h2_task_input.h"
40 #include "h2_task.h"
41 #include "h2_util.h"
42
43
44 #define H2_STREAM_IN(lvl,s,msg) \
45     do { \
46         if (APLOG_C_IS_LEVEL((s)->session->c,lvl)) \
47         h2_util_bb_log((s)->session->c,(s)->id,lvl,msg,(s)->bbin); \
48     } while(0)
49     
50
51 static int state_transition[][7] = {
52     /*  ID OP RL RR CI CO CL */
53 /*ID*/{  1, 0, 0, 0, 0, 0, 0 },
54 /*OP*/{  1, 1, 0, 0, 0, 0, 0 },
55 /*RL*/{  0, 0, 1, 0, 0, 0, 0 },
56 /*RR*/{  0, 0, 0, 1, 0, 0, 0 },
57 /*CI*/{  1, 1, 0, 0, 1, 0, 0 },
58 /*CO*/{  1, 1, 0, 0, 0, 1, 0 },
59 /*CL*/{  1, 1, 0, 0, 1, 1, 1 },
60 };
61
62 static int set_state(h2_stream *stream, h2_stream_state_t state)
63 {
64     int allowed = state_transition[state][stream->state];
65     if (allowed) {
66         stream->state = state;
67         return 1;
68     }
69     
70     ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, stream->session->c, APLOGNO(03081)
71                   "h2_stream(%ld-%d): invalid state transition from %d to %d", 
72                   stream->session->id, stream->id, stream->state, state);
73     return 0;
74 }
75
76 static int close_input(h2_stream *stream) 
77 {
78     switch (stream->state) {
79         case H2_STREAM_ST_CLOSED_INPUT:
80         case H2_STREAM_ST_CLOSED:
81             return 0; /* ignore, idempotent */
82         case H2_STREAM_ST_CLOSED_OUTPUT:
83             /* both closed now */
84             set_state(stream, H2_STREAM_ST_CLOSED);
85             break;
86         default:
87             /* everything else we jump to here */
88             set_state(stream, H2_STREAM_ST_CLOSED_INPUT);
89             break;
90     }
91     return 1;
92 }
93
94 static int input_closed(h2_stream *stream) 
95 {
96     switch (stream->state) {
97         case H2_STREAM_ST_OPEN:
98         case H2_STREAM_ST_CLOSED_OUTPUT:
99             return 0;
100         default:
101             return 1;
102     }
103 }
104
105 static int close_output(h2_stream *stream) 
106 {
107     switch (stream->state) {
108         case H2_STREAM_ST_CLOSED_OUTPUT:
109         case H2_STREAM_ST_CLOSED:
110             return 0; /* ignore, idempotent */
111         case H2_STREAM_ST_CLOSED_INPUT:
112             /* both closed now */
113             set_state(stream, H2_STREAM_ST_CLOSED);
114             break;
115         default:
116             /* everything else we jump to here */
117             set_state(stream, H2_STREAM_ST_CLOSED_OUTPUT);
118             break;
119     }
120     return 1;
121 }
122
123 static int input_open(const h2_stream *stream) 
124 {
125     switch (stream->state) {
126         case H2_STREAM_ST_OPEN:
127         case H2_STREAM_ST_CLOSED_OUTPUT:
128             return 1;
129         default:
130             return 0;
131     }
132 }
133
134 static int output_open(h2_stream *stream) 
135 {
136     switch (stream->state) {
137         case H2_STREAM_ST_OPEN:
138         case H2_STREAM_ST_CLOSED_INPUT:
139             return 1;
140         default:
141             return 0;
142     }
143 }
144
145 static h2_sos *h2_sos_mplx_create(h2_stream *stream, h2_response *response);
146
147 h2_stream *h2_stream_create(int id, apr_pool_t *pool, h2_session *session)
148 {
149     h2_stream *stream = apr_pcalloc(pool, sizeof(h2_stream));
150     stream->id        = id;
151     stream->state     = H2_STREAM_ST_IDLE;
152     stream->pool      = pool;
153     stream->session   = session;
154     return stream;
155 }
156
157 h2_stream *h2_stream_open(int id, apr_pool_t *pool, h2_session *session)
158 {
159     h2_stream *stream = h2_stream_create(id, pool, session);
160     set_state(stream, H2_STREAM_ST_OPEN);
161     stream->request   = h2_request_create(id, pool, 
162         h2_config_geti(session->config, H2_CONF_SER_HEADERS));
163     
164     ap_log_cerror(APLOG_MARK, APLOG_DEBUG, 0, session->c, APLOGNO(03082)
165                   "h2_stream(%ld-%d): opened", session->id, stream->id);
166     return stream;
167 }
168
169 apr_status_t h2_stream_destroy(h2_stream *stream)
170 {
171     AP_DEBUG_ASSERT(stream);
172     if (stream->pool) {
173         apr_pool_destroy(stream->pool);
174     }
175     return APR_SUCCESS;
176 }
177
178 void h2_stream_cleanup(h2_stream *stream)
179 {
180     h2_session_stream_destroy(stream->session, stream);
181     /* stream is gone */
182 }
183
184 apr_pool_t *h2_stream_detach_pool(h2_stream *stream)
185 {
186     apr_pool_t *pool = stream->pool;
187     stream->pool = NULL;
188     return pool;
189 }
190
191 void h2_stream_rst(h2_stream *stream, int error_code)
192 {
193     stream->rst_error = error_code;
194     close_input(stream);
195     close_output(stream);
196     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
197                   "h2_stream(%ld-%d): reset, error=%d", 
198                   stream->session->id, stream->id, error_code);
199 }
200
201 struct h2_response *h2_stream_get_response(h2_stream *stream)
202 {
203     return stream->sos? stream->sos->response : NULL;
204 }
205
206 apr_status_t h2_stream_set_response(h2_stream *stream, h2_response *response,
207                                     apr_bucket_brigade *bb)
208 {
209     apr_status_t status = APR_SUCCESS;
210     h2_sos *sos;
211     
212     if (!output_open(stream)) {
213         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
214                       "h2_stream(%ld-%d): output closed", 
215                       stream->session->id, stream->id);
216         return APR_ECONNRESET;
217     }
218     
219     sos = h2_sos_mplx_create(stream, response);
220     if (sos->response->sos_filter) {
221         sos = h2_filter_sos_create(sos->response->sos_filter, sos); 
222     }
223     stream->sos = sos;
224     
225     status = stream->sos->buffer(stream->sos, bb);
226     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->c,
227                   "h2_stream(%ld-%d): set_response(%d)", 
228                   stream->session->id, stream->id, stream->sos->response->http_status);
229     return status;
230 }
231
232 apr_status_t h2_stream_set_request(h2_stream *stream, request_rec *r)
233 {
234     apr_status_t status;
235     AP_DEBUG_ASSERT(stream);
236     if (stream->rst_error) {
237         return APR_ECONNRESET;
238     }
239     set_state(stream, H2_STREAM_ST_OPEN);
240     status = h2_request_rwrite(stream->request, r);
241     stream->request->serialize = h2_config_geti(h2_config_rget(r), 
242                                                 H2_CONF_SER_HEADERS);
243
244     return status;
245 }
246
247 void h2_stream_set_h2_request(h2_stream *stream, int initiated_on,
248                               const h2_request *req)
249 {
250     h2_request_copy(stream->pool, stream->request, req);
251     stream->initiated_on = initiated_on;
252     stream->request->eoh = 0;
253 }
254
255 apr_status_t h2_stream_add_header(h2_stream *stream,
256                                   const char *name, size_t nlen,
257                                   const char *value, size_t vlen)
258 {
259     AP_DEBUG_ASSERT(stream);
260     if (h2_stream_is_scheduled(stream)) {
261         return h2_request_add_trailer(stream->request, stream->pool,
262                                       name, nlen, value, vlen);
263     }
264     else {
265         if (!input_open(stream)) {
266             return APR_ECONNRESET;
267         }
268         return h2_request_add_header(stream->request, stream->pool,
269                                      name, nlen, value, vlen);
270     }
271 }
272
273 apr_status_t h2_stream_schedule(h2_stream *stream, int eos, int push_enabled, 
274                                 h2_stream_pri_cmp *cmp, void *ctx)
275 {
276     apr_status_t status;
277     AP_DEBUG_ASSERT(stream);
278     AP_DEBUG_ASSERT(stream->session);
279     AP_DEBUG_ASSERT(stream->session->mplx);
280     
281     if (!output_open(stream)) {
282         return APR_ECONNRESET;
283     }
284     if (stream->scheduled) {
285         return APR_EINVAL;
286     }
287     if (eos) {
288         close_input(stream);
289     }
290     
291     /* Seeing the end-of-headers, we have everything we need to 
292      * start processing it.
293      */
294     status = h2_request_end_headers(stream->request, stream->pool, 
295                                     eos, push_enabled);
296     if (status == APR_SUCCESS) {
297         if (!eos) {
298             stream->request->body = 1;
299             stream->bbin = apr_brigade_create(stream->pool, 
300                                               stream->session->c->bucket_alloc);
301         }
302         stream->input_remaining = stream->request->content_length;
303         
304         status = h2_mplx_process(stream->session->mplx, stream->id, 
305                                  stream->request, cmp, ctx);
306         stream->scheduled = 1;
307         
308         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
309                       "h2_stream(%ld-%d): scheduled %s %s://%s%s",
310                       stream->session->id, stream->id,
311                       stream->request->method, stream->request->scheme,
312                       stream->request->authority, stream->request->path);
313     }
314     else {
315         h2_stream_rst(stream, H2_ERR_INTERNAL_ERROR);
316         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
317                       "h2_stream(%ld-%d): RST=2 (internal err) %s %s://%s%s",
318                       stream->session->id, stream->id,
319                       stream->request->method, stream->request->scheme,
320                       stream->request->authority, stream->request->path);
321     }
322     
323     return status;
324 }
325
326 int h2_stream_is_scheduled(const h2_stream *stream)
327 {
328     return stream->scheduled;
329 }
330
331 static apr_status_t h2_stream_input_flush(h2_stream *stream)
332 {
333     apr_status_t status = APR_SUCCESS;
334     if (stream->bbin && !APR_BRIGADE_EMPTY(stream->bbin)) {
335
336         status = h2_mplx_in_write(stream->session->mplx, stream->id, stream->bbin);
337         if (status != APR_SUCCESS) {
338             ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, stream->session->mplx->c,
339                           "h2_stream(%ld-%d): flushing input data",
340                           stream->session->id, stream->id);
341         }
342     }
343     return status;
344 }
345
346 static apr_status_t input_flush(apr_bucket_brigade *bb, void *ctx) 
347 {
348     (void)bb;
349     return h2_stream_input_flush(ctx);
350 }
351
352 static apr_status_t input_add_data(h2_stream *stream,
353                                    const char *data, size_t len)
354 {
355     return apr_brigade_write(stream->bbin, input_flush, stream, data, len);
356 }
357
358 apr_status_t h2_stream_close_input(h2_stream *stream)
359 {
360     apr_status_t status = APR_SUCCESS;
361     
362     AP_DEBUG_ASSERT(stream);
363     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
364                   "h2_stream(%ld-%d): closing input",
365                   stream->session->id, stream->id);
366                   
367     if (stream->rst_error) {
368         return APR_ECONNRESET;
369     }
370     
371     H2_STREAM_IN(APLOG_TRACE2, stream, "close_pre");
372     if (close_input(stream) && stream->bbin) {
373         status = h2_stream_input_flush(stream);
374         if (status == APR_SUCCESS) {
375             status = h2_mplx_in_close(stream->session->mplx, stream->id);
376         }
377     }
378     H2_STREAM_IN(APLOG_TRACE2, stream, "close_post");
379     return status;
380 }
381
382 apr_status_t h2_stream_write_data(h2_stream *stream,
383                                   const char *data, size_t len)
384 {
385     apr_status_t status = APR_SUCCESS;
386     
387     AP_DEBUG_ASSERT(stream);
388     if (input_closed(stream) || !stream->request->eoh || !stream->bbin) {
389         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
390                       "h2_stream(%ld-%d): writing denied, closed=%d, eoh=%d, bbin=%d", 
391                       stream->session->id, stream->id, input_closed(stream),
392                       stream->request->eoh, !!stream->bbin);
393         return APR_EINVAL;
394     }
395
396     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
397                   "h2_stream(%ld-%d): add %ld input bytes", 
398                   stream->session->id, stream->id, (long)len);
399
400     H2_STREAM_IN(APLOG_TRACE2, stream, "write_data_pre");
401     if (!stream->request->chunked) {
402         stream->input_remaining -= len;
403         if (stream->input_remaining < 0) {
404             ap_log_cerror(APLOG_MARK, APLOG_WARNING, 0, stream->session->c,
405                           APLOGNO(02961) 
406                           "h2_stream(%ld-%d): got %ld more content bytes than announced "
407                           "in content-length header: %ld", 
408                           stream->session->id, stream->id,
409                           (long)stream->request->content_length, 
410                           -(long)stream->input_remaining);
411             h2_stream_rst(stream, H2_ERR_PROTOCOL_ERROR);
412             return APR_ECONNABORTED;
413         }
414     }
415     
416     status = input_add_data(stream, data, len);
417     if (status == APR_SUCCESS) {
418         status = h2_stream_input_flush(stream);
419     }
420     H2_STREAM_IN(APLOG_TRACE2, stream, "write_data_post");
421     return status;
422 }
423
424 void h2_stream_set_suspended(h2_stream *stream, int suspended)
425 {
426     AP_DEBUG_ASSERT(stream);
427     stream->suspended = !!suspended;
428     ap_log_cerror(APLOG_MARK, APLOG_TRACE2, 0, stream->session->c,
429                   "h2_stream(%ld-%d): suspended=%d",
430                   stream->session->id, stream->id, stream->suspended);
431 }
432
433 int h2_stream_is_suspended(const h2_stream *stream)
434 {
435     AP_DEBUG_ASSERT(stream);
436     return stream->suspended;
437 }
438
439 apr_status_t h2_stream_prep_read(h2_stream *stream, 
440                                  apr_off_t *plen, int *peos)
441 {
442     if (stream->rst_error) {
443         return APR_ECONNRESET;
444     }
445
446     if (!stream->sos) {
447         return APR_EGENERAL;
448     }
449     return stream->sos->prep_read(stream->sos, plen, peos);
450 }
451
452 apr_status_t h2_stream_readx(h2_stream *stream, 
453                              h2_io_data_cb *cb, void *ctx,
454                              apr_off_t *plen, int *peos)
455 {
456     if (stream->rst_error) {
457         return APR_ECONNRESET;
458     }
459     if (!stream->sos) {
460         return APR_EGENERAL;
461     }
462     return stream->sos->readx(stream->sos, cb, ctx, plen, peos);
463 }
464
465 apr_status_t h2_stream_read_to(h2_stream *stream, apr_bucket_brigade *bb, 
466                                apr_off_t *plen, int *peos)
467 {
468     if (stream->rst_error) {
469         return APR_ECONNRESET;
470     }
471     if (!stream->sos) {
472         return APR_EGENERAL;
473     }
474     return stream->sos->read_to(stream->sos, bb, plen, peos);
475 }
476
477 int h2_stream_input_is_open(const h2_stream *stream) 
478 {
479     return input_open(stream);
480 }
481
482 int h2_stream_needs_submit(const h2_stream *stream)
483 {
484     switch (stream->state) {
485         case H2_STREAM_ST_OPEN:
486         case H2_STREAM_ST_CLOSED_INPUT:
487         case H2_STREAM_ST_CLOSED_OUTPUT:
488         case H2_STREAM_ST_CLOSED:
489             return !stream->submitted;
490         default:
491             return 0;
492     }
493 }
494
495 apr_status_t h2_stream_submit_pushes(h2_stream *stream)
496 {
497     apr_status_t status = APR_SUCCESS;
498     apr_array_header_t *pushes;
499     int i;
500     
501     pushes = h2_push_collect_update(stream, stream->request, 
502                                     h2_stream_get_response(stream));
503     if (pushes && !apr_is_empty_array(pushes)) {
504         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, stream->session->c,
505                       "h2_stream(%ld-%d): found %d push candidates",
506                       stream->session->id, stream->id, pushes->nelts);
507         for (i = 0; i < pushes->nelts; ++i) {
508             h2_push *push = APR_ARRAY_IDX(pushes, i, h2_push*);
509             h2_stream *s = h2_session_push(stream->session, stream, push);
510             if (!s) {
511                 status = APR_ECONNRESET;
512                 break;
513             }
514         }
515     }
516     return status;
517 }
518
519 apr_table_t *h2_stream_get_trailers(h2_stream *stream)
520 {
521     return stream->sos? stream->sos->get_trailers(stream->sos) : NULL;
522 }
523
524 const h2_priority *h2_stream_get_priority(h2_stream *stream)
525 {
526     h2_response *response = h2_stream_get_response(stream);
527     
528     if (stream->initiated_on && response) {
529         const char *ctype = apr_table_get(response->headers, "content-type");
530         if (ctype) {
531             /* FIXME: Not good enough, config needs to come from request->server */
532             return h2_config_get_priority(stream->session->config, ctype);
533         }
534     }
535     return NULL;
536 }
537
538 /*******************************************************************************
539  * h2_sos_mplx
540  ******************************************************************************/
541
542 typedef struct h2_sos_mplx {
543     h2_mplx *m;
544     apr_bucket_brigade *bb;
545     apr_table_t *trailers;
546 } h2_sos_mplx;
547
548 #define H2_SOS_MPLX_OUT(lvl,msos,msg) \
549     do { \
550         if (APLOG_C_IS_LEVEL((msos)->m->c,lvl)) \
551         h2_util_bb_log((msos)->m->c,(msos)->m->id,lvl,msg,(msos)->bb); \
552     } while(0)
553     
554
555 static apr_status_t h2_sos_mplx_read_to(h2_sos *sos, apr_bucket_brigade *bb, 
556                                         apr_off_t *plen, int *peos)
557 {
558     h2_sos_mplx *msos = sos->ctx;
559     apr_status_t status = APR_SUCCESS;
560     apr_table_t *trailers = NULL;
561
562     H2_SOS_MPLX_OUT(APLOG_TRACE2, msos, "h2_sos_mplx read_to_pre");
563     
564     if (APR_BRIGADE_EMPTY(msos->bb)) {
565         apr_off_t tlen = *plen;
566         int eos;
567         status = h2_mplx_out_read_to(msos->m, sos->stream->id, 
568                                      msos->bb, &tlen, &eos, &trailers);
569     }
570     
571     if (status == APR_SUCCESS && !APR_BRIGADE_EMPTY(msos->bb)) {
572         status = h2_transfer_brigade(bb, msos->bb, sos->stream->pool, 
573                                      plen, peos);
574     }
575     else {
576         *plen = 0;
577         *peos = 0;
578     }
579
580     if (trailers) {
581         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, msos->m->c,
582                       "h2_stream(%ld-%d): read_to, saving trailers",
583                       msos->m->id, sos->stream->id);
584         msos->trailers = trailers;
585     }
586     
587     if (status == APR_SUCCESS && !*peos && !*plen) {
588         status = APR_EAGAIN;
589     }
590     H2_SOS_MPLX_OUT(APLOG_TRACE2, msos, "h2_sos_mplx read_to_post");
591     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, msos->m->c,
592                   "h2_stream(%ld-%d): read_to, len=%ld eos=%d",
593                   msos->m->id, sos->stream->id, (long)*plen, *peos);
594     return status;
595 }
596
597 static apr_status_t h2_sos_mplx_prep_read(h2_sos *sos, apr_off_t *plen, int *peos)
598 {
599     h2_sos_mplx *msos = sos->ctx;
600     apr_status_t status = APR_SUCCESS;
601     const char *src;
602     apr_table_t *trailers = NULL;
603     int test_read = (*plen == 0);
604     
605     H2_SOS_MPLX_OUT(APLOG_TRACE2, msos, "h2_sos_mplx prep_read_pre");
606     if (!APR_BRIGADE_EMPTY(msos->bb)) {
607         src = "stream";
608         status = h2_util_bb_avail(msos->bb, plen, peos);
609         if (!test_read && status == APR_SUCCESS && !*peos && !*plen) {
610             apr_brigade_cleanup(msos->bb);
611             return h2_sos_mplx_prep_read(sos, plen, peos);
612         }
613     }
614     else {
615         src = "mplx";
616         status = h2_mplx_out_readx(msos->m, sos->stream->id, 
617                                    NULL, NULL, plen, peos, &trailers);
618         if (trailers) {
619             msos->trailers = trailers;
620         }    
621     }
622     
623     if (!test_read && status == APR_SUCCESS && !*peos && !*plen) {
624         status = APR_EAGAIN;
625     }
626     
627     H2_SOS_MPLX_OUT(APLOG_TRACE2, msos, "h2_sos_mplx prep_read_post");
628     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, msos->m->c,
629                   "h2_stream(%ld-%d): prep_read %s, len=%ld eos=%d, trailers=%s",
630                   msos->m->id, sos->stream->id, src, (long)*plen, *peos,
631                   msos->trailers? "yes" : "no");
632     return status;
633 }
634
635 static apr_status_t h2_sos_mplx_readx(h2_sos *sos, h2_io_data_cb *cb, void *ctx,
636                                       apr_off_t *plen, int *peos)
637 {
638     h2_sos_mplx *msos = sos->ctx;
639     apr_status_t status = APR_SUCCESS;
640     apr_table_t *trailers = NULL;
641     const char *src;
642     
643     H2_SOS_MPLX_OUT(APLOG_TRACE2, msos, "h2_sos_mplx readx_pre");
644     *peos = 0;
645     if (!APR_BRIGADE_EMPTY(msos->bb)) {
646         apr_off_t origlen = *plen;
647         
648         src = "stream";
649         status = h2_util_bb_readx(msos->bb, cb, ctx, plen, peos);
650         if (status == APR_SUCCESS && !*peos && !*plen) {
651             apr_brigade_cleanup(msos->bb);
652             *plen = origlen;
653             return h2_sos_mplx_readx(sos, cb, ctx, plen, peos);
654         }
655     }
656     else {
657         src = "mplx";
658         status = h2_mplx_out_readx(msos->m, sos->stream->id, 
659                                    cb, ctx, plen, peos, &trailers);
660     }
661     
662     if (trailers) {
663         ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, msos->m->c,
664                       "h2_stream(%ld-%d): readx, saving trailers",
665                       msos->m->id, sos->stream->id);
666         msos->trailers = trailers;
667     }
668     
669     if (status == APR_SUCCESS && !*peos && !*plen) {
670         status = APR_EAGAIN;
671     }
672     
673     H2_SOS_MPLX_OUT(APLOG_TRACE2, msos, "h2_stream readx_post");
674     ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, msos->m->c,
675                   "h2_stream(%ld-%d): readx %s, len=%ld eos=%d",
676                   msos->m->id, sos->stream->id, src, (long)*plen, *peos);
677     
678     return status;
679 }
680
681 static apr_table_t *h2_sos_mplx_get_trailers(h2_sos *sos)
682 {
683     h2_sos_mplx *msos = sos->ctx;
684
685     return msos->trailers;
686 }
687
688 static apr_status_t h2_sos_mplx_buffer(h2_sos *sos, apr_bucket_brigade *bb) 
689 {
690     h2_sos_mplx *msos = sos->ctx;
691     apr_status_t status = APR_SUCCESS;
692
693     if (bb && !APR_BRIGADE_EMPTY(bb)) {
694         apr_size_t move_all = INT_MAX;
695         /* we can move file handles from h2_mplx into this h2_stream as many
696          * as we want, since the lifetimes are the same and we are not freeing
697          * the ones in h2_mplx->io before this stream is done. */
698         H2_SOS_MPLX_OUT(APLOG_TRACE2, msos, "h2_sos_mplx set_response_pre");
699         status = h2_util_move(msos->bb, bb, 16 * 1024, &move_all,  
700                               "h2_stream_set_response");
701         H2_SOS_MPLX_OUT(APLOG_TRACE2, msos, "h2_sos_mplx set_response_post");
702     }
703     return status;
704 }
705
706 static h2_sos *h2_sos_mplx_create(h2_stream *stream, h2_response *response)
707 {
708     h2_sos *sos;
709     h2_sos_mplx *msos;
710     
711     msos = apr_pcalloc(stream->pool, sizeof(*msos));
712     msos->m = stream->session->mplx;
713     msos->bb = apr_brigade_create(stream->pool, msos->m->c->bucket_alloc);
714
715     sos = apr_pcalloc(stream->pool, sizeof(*sos));
716     sos->stream = stream;
717     sos->response = response;
718     
719     sos->ctx = msos;
720     sos->buffer = h2_sos_mplx_buffer;
721     sos->prep_read = h2_sos_mplx_prep_read;
722     sos->readx = h2_sos_mplx_readx;
723     sos->read_to = h2_sos_mplx_read_to;
724     sos->get_trailers = h2_sos_mplx_get_trailers;
725     
726     sos->response = response;
727
728     return sos;
729 }
730