From: Wez Furlong Date: Tue, 18 Feb 2003 01:22:21 +0000 (+0000) Subject: Implement new filter API, stage 1. X-Git-Tag: RELEASE_0_5~927 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=32165a5546fc54ba31f25550fa105365c9e454c4;p=php Implement new filter API, stage 1. This breaks user-space filters (for the time being), and those weird convert.* filters in ext/standard/filters.c The filters stack has been separated into one chain for read and one chain for write. The user-space stream_filter_append() type functions currently only operate on the read chain. They need extending to work with the write chain too. --- diff --git a/ext/standard/file.c b/ext/standard/file.c index d218d74ce6..57ac4e0f0d 100644 --- a/ext/standard/file.c +++ b/ext/standard/file.c @@ -627,6 +627,7 @@ PHP_FUNCTION(stream_get_meta_data) } add_assoc_string(return_value, "stream_type", (char *)stream->ops->label, 1); +#if 0 /* TODO: needs updating for new filter API */ if (stream->filterhead) { php_stream_filter *filter; @@ -639,6 +640,7 @@ PHP_FUNCTION(stream_get_meta_data) add_assoc_zval(return_value, "filters", newval); } +#endif add_assoc_long(return_value, "unread_bytes", stream->writepos - stream->readpos); @@ -1266,9 +1268,9 @@ static void apply_filter_to_stream(int append, INTERNAL_FUNCTION_PARAMETERS) } if (append) { - php_stream_filter_append(stream, filter); + php_stream_filter_append(&stream->readfilters, filter); } else { - php_stream_filter_prepend(stream, filter); + php_stream_filter_prepend(&stream->readfilters, filter); } RETURN_TRUE; diff --git a/ext/standard/filters.c b/ext/standard/filters.c index 6dd0d1aa77..38d3abcb40 100644 --- a/ext/standard/filters.c +++ b/ext/standard/filters.c @@ -25,61 +25,40 @@ #include "ext/standard/file.h" #include "ext/standard/php_string.h" -/* {{{ common "no-opperation" methods */ -static int commonfilter_nop_flush(php_stream *stream, php_stream_filter *thisfilter, int closing TSRMLS_DC) -{ - return php_stream_filter_flush_next(stream, thisfilter, closing); -} - -static int commonfilter_nop_eof(php_stream *stream, php_stream_filter *thisfilter TSRMLS_DC) -{ - return php_stream_filter_eof_next(stream, thisfilter); -} -/* }}} */ - /* {{{ rot13 stream filter implementation */ static char rot13_from[] = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; static char rot13_to[] = "nopqrstuvwxyzabcdefghijklmNOPQRSTUVWXYZABCDEFGHIJKLM"; -static size_t strfilter_rot13_write(php_stream *stream, php_stream_filter *thisfilter, - const char *buf, size_t count TSRMLS_DC) +static php_stream_filter_status_t strfilter_rot13_filter( + php_stream *stream, + php_stream_filter *thisfilter, + php_stream_bucket_brigade *buckets_in, + php_stream_bucket_brigade *buckets_out, + size_t *bytes_consumed, + int flags + TSRMLS_DC) { - char rotbuf[1024]; - size_t chunk; - size_t wrote = 0; - - while (count > 0) { - chunk = count; - if (chunk > sizeof(rotbuf)) - chunk = sizeof(rotbuf); + php_stream_bucket *bucket; + size_t consumed = 0; - PHP_STRLCPY(rotbuf, buf, sizeof(rotbuf), chunk); - buf += chunk; - count -= chunk; - - php_strtr(rotbuf, chunk, rot13_from, rot13_to, 52); - wrote += php_stream_filter_write_next(stream, thisfilter, rotbuf, chunk); + while (buckets_in->head) { + bucket = php_stream_bucket_make_writeable(buckets_in->head TSRMLS_CC); + + php_strtr(bucket->buf, bucket->buflen, rot13_from, rot13_to, 52); + consumed += bucket->buflen; + + php_stream_bucket_append(buckets_out, bucket TSRMLS_CC); } - return wrote; -} - -static size_t strfilter_rot13_read(php_stream *stream, php_stream_filter *thisfilter, - char *buf, size_t count TSRMLS_DC) -{ - size_t read; - - read = php_stream_filter_read_next(stream, thisfilter, buf, count); - php_strtr(buf, read, rot13_from, rot13_to, 52); - - return read; + if (bytes_consumed) { + *bytes_consumed = consumed; + } + + return PSFS_PASS_ON; } static php_stream_filter_ops strfilter_rot13_ops = { - strfilter_rot13_write, - strfilter_rot13_read, - commonfilter_nop_flush, - commonfilter_nop_eof, + strfilter_rot13_filter, NULL, "string.rot13" }; @@ -99,88 +78,70 @@ static php_stream_filter_factory strfilter_rot13_factory = { static char lowercase[] = "abcdefghijklmnopqrstuvwxyz"; static char uppercase[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; -static size_t strfilter_toupper_write(php_stream *stream, php_stream_filter *thisfilter, - const char *buf, size_t count TSRMLS_DC) +static php_stream_filter_status_t strfilter_toupper_filter( + php_stream *stream, + php_stream_filter *thisfilter, + php_stream_bucket_brigade *buckets_in, + php_stream_bucket_brigade *buckets_out, + size_t *bytes_consumed, + int flags + TSRMLS_DC) { - char tmpbuf[1024]; - size_t chunk; - size_t wrote = 0; - - while (count > 0) { - chunk = count; - if (chunk > sizeof(tmpbuf)) - chunk = sizeof(tmpbuf); - - PHP_STRLCPY(tmpbuf, buf, sizeof(tmpbuf), chunk); - buf += chunk; - count -= chunk; + php_stream_bucket *bucket; + size_t consumed = 0; - php_strtr(tmpbuf, chunk, lowercase, uppercase, 26); - wrote += php_stream_filter_write_next(stream, thisfilter, tmpbuf, chunk); + while (buckets_in->head) { + bucket = php_stream_bucket_make_writeable(buckets_in->head TSRMLS_CC); + + php_strtr(bucket->buf, bucket->buflen, lowercase, uppercase, 26); + consumed += bucket->buflen; + + php_stream_bucket_append(buckets_out, bucket TSRMLS_CC); } - return wrote; -} - -static size_t strfilter_tolower_write(php_stream *stream, php_stream_filter *thisfilter, - const char *buf, size_t count TSRMLS_DC) -{ - char tmpbuf[1024]; - size_t chunk; - size_t wrote = 0; - - while (count > 0) { - chunk = count; - if (chunk > sizeof(tmpbuf)) - chunk = sizeof(tmpbuf); - - PHP_STRLCPY(tmpbuf, buf, sizeof(tmpbuf), chunk); - buf += chunk; - count -= chunk; - - php_strtr(tmpbuf, chunk, uppercase, lowercase, 26); - wrote += php_stream_filter_write_next(stream, thisfilter, tmpbuf, chunk); + if (bytes_consumed) { + *bytes_consumed = consumed; } - - return wrote; -} - -static size_t strfilter_toupper_read(php_stream *stream, php_stream_filter *thisfilter, - char *buf, size_t count TSRMLS_DC) -{ - size_t read; - - read = php_stream_filter_read_next(stream, thisfilter, buf, count); - php_strtr(buf, read, lowercase, uppercase, 26); - - return read; + + return PSFS_PASS_ON; } -static size_t strfilter_tolower_read(php_stream *stream, php_stream_filter *thisfilter, - char *buf, size_t count TSRMLS_DC) +static php_stream_filter_status_t strfilter_tolower_filter( + php_stream *stream, + php_stream_filter *thisfilter, + php_stream_bucket_brigade *buckets_in, + php_stream_bucket_brigade *buckets_out, + size_t *bytes_consumed, + int flags + TSRMLS_DC) { - size_t read; + php_stream_bucket *bucket; + size_t consumed = 0; - read = php_stream_filter_read_next(stream, thisfilter, buf, count); - php_strtr(buf, read, uppercase, lowercase, 26); + while (buckets_in->head) { + bucket = php_stream_bucket_make_writeable(buckets_in->head TSRMLS_CC); + + php_strtr(bucket->buf, bucket->buflen, uppercase, lowercase, 26); + consumed += bucket->buflen; + + php_stream_bucket_append(buckets_out, bucket TSRMLS_CC); + } - return read; + if (bytes_consumed) { + *bytes_consumed = consumed; + } + + return PSFS_PASS_ON; } static php_stream_filter_ops strfilter_toupper_ops = { - strfilter_toupper_write, - strfilter_toupper_read, - commonfilter_nop_flush, - commonfilter_nop_eof, + strfilter_toupper_filter, NULL, "string.toupper" }; static php_stream_filter_ops strfilter_tolower_ops = { - strfilter_tolower_write, - strfilter_tolower_read, - commonfilter_nop_flush, - commonfilter_nop_eof, + strfilter_tolower_filter, NULL, "string.tolower" }; @@ -1400,6 +1361,54 @@ static void php_convert_filter_dtor(php_convert_filter *inst) } } +static php_stream_filter_status_t strfilter_convert_filter( + php_stream *stream, + php_stream_filter *thisfilter, + php_stream_bucket_brigade *buckets_in, + php_stream_bucket_brigade *buckets_out, + size_t *bytes_consumed, + int flags + TSRMLS_DC) +{ + php_stream_bucket *bucket; + size_t consumed = 0; + php_conv_err_t err; + php_convert_filter *inst = (php_convert_filter *)thisfilter->abstract; + + while (thisfilter->buffer.head || buckets_in->head) { + /* take head off buffer first, then input brigade */ + bucket = thisfilter->buffer.head ? thisfilter->buffer.head : buckets_in->head; + php_stream_bucket_unlink(bucket TSRMLS_CC); + +#if 0 + err = php_conv_convert(inst->write_cd, ... ) + + /* update consumed by the number of bytes just used */ + + /* give output bucket to next in chain */ + php_stream_bucket_append(buckets_out, bucket TSRMLS_CC); + + if (only used part of buffer) { + bucket *left, *right; + php_stream_buffer_split(bucket, &left, &right, used_len TSRMLS_CC); + php_stream_buffer_delref(left); /* delete the part we consumed */ + php_stream_buffer_append(&filter->buffer, right TSRMLS_CC); + break; + } +#endif + } + + if (bytes_consumed) { + *bytes_consumed = consumed; + } + + return PSFS_PASS_ON; +} + + + + +#if 0 static size_t strfilter_convert_write(php_stream *stream, php_stream_filter *thisfilter, const char *buf, size_t count TSRMLS_DC) { @@ -1537,11 +1546,7 @@ static int strfilter_convert_flush(php_stream *stream, php_stream_filter *thisfi } return php_stream_filter_flush_next(stream, thisfilter, closing); } - -static int strfilter_convert_eof(php_stream *stream, php_stream_filter *thisfilter TSRMLS_DC) -{ - return php_stream_filter_eof_next(stream, thisfilter); -} +#endif static void strfilter_convert_dtor(php_stream_filter *thisfilter TSRMLS_DC) { @@ -1552,10 +1557,7 @@ static void strfilter_convert_dtor(php_stream_filter *thisfilter TSRMLS_DC) } static php_stream_filter_ops strfilter_convert_ops = { - strfilter_convert_write, - strfilter_convert_read, - strfilter_convert_flush, - strfilter_convert_eof, + strfilter_convert_filter, strfilter_convert_dtor, "convert.*" }; diff --git a/ext/standard/user_filters.c b/ext/standard/user_filters.c index f26c126b64..206f1ea691 100644 --- a/ext/standard/user_filters.c +++ b/ext/standard/user_filters.c @@ -19,6 +19,27 @@ /* $Id$ */ +/* + * TODO: Rewrite for buckets. + * Concept: + * The user defined filter class should implement a method named + * "filter" with the following proto: + * long filter(object brigade_in, object brigade_out, long &consumed, long flags); + * + * brigade_in and brigade_out are overloaded objects that wrap around + * the php_stream_bucket_brigades passed to the underlying filter method. + * The brigades have methods for retrieving the head of the brigade as + * an overloaded bucket object, a method for appending a + * bucket object to the end of the brigade, and a method for creating a new + * bucket at the end of the brigade. + * + * The bucket object has methods to unlink it from it's containing brigade, + * split into two buckets, and retrieve the buffer from a bucket. + * + * This approach means that there doesn't need to be very much magic between + * userspace and the real C interface. + */ + #include "php.h" #include "php_globals.h" #include "ext/standard/basic_functions.h" @@ -44,77 +65,14 @@ static int le_userfilters; /* define the base filter class */ -/* Descendants call this function to actually send the data on to the next - * filter (or the stream itself). - * The intention is to invoke it as parent::write($data) - * */ -PHP_FUNCTION(user_filter_write) -{ - char *data; - int data_len; - size_t wrote = 0; - php_stream_filter *filter; - - GET_FILTER_FROM_OBJ(); - - if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s", &data, &data_len) == FAILURE) { - RETURN_FALSE; - } - - wrote = php_stream_filter_write_next(filter->stream, filter, data, data_len); - - RETURN_LONG(wrote); -} - -PHP_FUNCTION(user_filter_read) -{ - long data_to_read; - char *data; - size_t didread = 0; - php_stream_filter *filter; - - RETVAL_FALSE; - - GET_FILTER_FROM_OBJ(); - - if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "l", &data_to_read) == FAILURE) { - RETURN_FALSE; - } - - data = emalloc(data_to_read + 1); - didread = php_stream_filter_read_next(filter->stream, filter, data, data_to_read); - - if (didread > 0) { - data = erealloc(data, didread + 1); - RETURN_STRINGL(data, didread, 0); - } else { - efree(data); - RETURN_FALSE; - } -} - -PHP_FUNCTION(user_filter_flush) -{ - zend_bool closing; - php_stream_filter *filter; - - GET_FILTER_FROM_OBJ(); - - if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "b", &closing) == FAILURE) { - RETURN_FALSE; - } - - RETURN_LONG(php_stream_filter_flush_next(filter->stream, filter, closing)); -} - PHP_FUNCTION(user_filter_nop) { } static zend_function_entry user_filter_class_funcs[] = { - PHP_NAMED_FE(write, PHP_FN(user_filter_write), NULL) - PHP_NAMED_FE(read, PHP_FN(user_filter_read), NULL) - PHP_NAMED_FE(flush, PHP_FN(user_filter_flush), NULL) + PHP_NAMED_FE(write, PHP_FN(user_filter_nop), NULL) + PHP_NAMED_FE(read, PHP_FN(user_filter_nop), NULL) + PHP_NAMED_FE(flush, PHP_FN(user_filter_nop), NULL) PHP_NAMED_FE(oncreate, PHP_FN(user_filter_nop), NULL) PHP_NAMED_FE(onclose, PHP_FN(user_filter_nop), NULL) { NULL, NULL, NULL } @@ -140,106 +98,42 @@ PHP_MINIT_FUNCTION(user_filters) return SUCCESS; } -static size_t userfilter_write(php_stream *stream, php_stream_filter *thisfilter, - const char *buf, size_t count TSRMLS_DC) +static void userfilter_dtor(php_stream_filter *thisfilter TSRMLS_DC) { - size_t wrote = 0; zval *obj = (zval*)thisfilter->abstract; zval func_name; zval *retval = NULL; - zval **args[1]; - zval *zbuf; - int call_result; - - ZVAL_STRINGL(&func_name, "write", sizeof("write")-1, 0); - - ALLOC_INIT_ZVAL(zbuf); - ZVAL_STRINGL(zbuf, (char*)buf, count, 1); + zval **tmp; - args[0] = &zbuf; + ZVAL_STRINGL(&func_name, "onclose", sizeof("onclose")-1, 0); - call_result = call_user_function_ex(NULL, + call_user_function_ex(NULL, &obj, &func_name, &retval, - 1, args, + 0, NULL, 0, NULL TSRMLS_CC); - if (call_result == SUCCESS && retval != NULL) { - convert_to_long(retval); - wrote = Z_LVAL_P(retval); - } else if (call_result == FAILURE) { - php_error_docref(NULL TSRMLS_CC, E_WARNING, "failed to call user-filter write function!?"); - } - - /* beware of buffer overruns */ - if (wrote > count) { - php_error_docref(NULL TSRMLS_CC, E_WARNING, - "wrote %d bytes more data than requested (%d written, %d max)", - wrote - count, - wrote, - count); - wrote = count; - } - if (retval) zval_ptr_dtor(&retval); - if (zbuf) - zval_ptr_dtor(&zbuf); - - return wrote; -} - -static size_t userfilter_read(php_stream *stream, php_stream_filter *thisfilter, - char *buf, size_t count TSRMLS_DC) -{ - size_t didread = 0; - zval *obj = (zval*)thisfilter->abstract; - zval func_name; - zval *retval = NULL; - zval **args[1]; - zval *zcount; - int call_result; - - ZVAL_STRINGL(&func_name, "read", sizeof("read")-1, 0); - - ALLOC_INIT_ZVAL(zcount); - ZVAL_LONG(zcount, count); - args[0] = &zcount; - - call_result = call_user_function_ex(NULL, - &obj, - &func_name, - &retval, - 1, args, - 0, NULL TSRMLS_CC); - - if (call_result == SUCCESS && retval != NULL) { - convert_to_string(retval); - didread = Z_STRLEN_P(retval); - - if (didread > count) { - php_error_docref(NULL TSRMLS_CC, E_WARNING, - "read %d bytes more data than requested (%d read, %d max) - excess data will be lost", - didread - count, didread, count); - didread = count; - } - if (didread > 0) - memcpy(buf, Z_STRVAL_P(retval), didread); - } else if (call_result == FAILURE) { - php_error_docref(NULL TSRMLS_CC, E_WARNING, "failed to call read function!"); - } - - if (retval) - zval_ptr_dtor(&retval); + if (SUCCESS == zend_hash_index_find(Z_OBJPROP_P(obj), 0, (void**)&tmp)) { + zend_list_delete(Z_LVAL_PP(tmp)); + FREE_ZVAL(*tmp); + } - zval_ptr_dtor(&zcount); - - return didread; + /* kill the object */ + zval_ptr_dtor(&obj); } -static int userfilter_flush(php_stream *stream, php_stream_filter *thisfilter, int closing TSRMLS_DC) +php_stream_filter_status_t userfilter_filter( + php_stream *stream, + php_stream_filter *thisfilter, + php_stream_bucket_brigade *buckets_in, + php_stream_bucket_brigade *buckets_out, + size_t *bytes_consumed, + int flags + TSRMLS_DC) { int ret = EOF; zval *obj = (zval*)thisfilter->abstract; @@ -249,10 +143,10 @@ static int userfilter_flush(php_stream *stream, php_stream_filter *thisfilter, i zval *zcount; int call_result; - ZVAL_STRINGL(&func_name, "flush", sizeof("flush")-1, 0); + ZVAL_STRINGL(&func_name, "filter", sizeof("filter")-1, 0); ALLOC_INIT_ZVAL(zcount); - ZVAL_BOOL(zcount, closing); + ZVAL_BOOL(zcount, flags & PSFS_FLAG_FLUSH_CLOSE); args[0] = &zcount; call_result = call_user_function_ex(NULL, @@ -266,55 +160,18 @@ static int userfilter_flush(php_stream *stream, php_stream_filter *thisfilter, i convert_to_long(retval); ret = Z_LVAL_P(retval); } else if (call_result == FAILURE) { - php_error_docref(NULL TSRMLS_CC, E_WARNING, "failed to call flush function"); + php_error_docref(NULL TSRMLS_CC, E_WARNING, "failed to call filter function"); } if (retval) zval_ptr_dtor(&retval); zval_ptr_dtor(&zcount); - return ret; -} - -static int userfilter_eof(php_stream *stream, php_stream_filter *thisfilter TSRMLS_DC) -{ - /* TODO: does this actually ever get called!? */ - return php_stream_filter_eof_next(stream, thisfilter); -} - -static void userfilter_dtor(php_stream_filter *thisfilter TSRMLS_DC) -{ - zval *obj = (zval*)thisfilter->abstract; - zval func_name; - zval *retval = NULL; - zval **tmp; - - ZVAL_STRINGL(&func_name, "onclose", sizeof("onclose")-1, 0); - - call_user_function_ex(NULL, - &obj, - &func_name, - &retval, - 0, NULL, - 0, NULL TSRMLS_CC); - - if (retval) - zval_ptr_dtor(&retval); - - if (SUCCESS == zend_hash_index_find(Z_OBJPROP_P(obj), 0, (void**)&tmp)) { - zend_list_delete(Z_LVAL_PP(tmp)); - FREE_ZVAL(*tmp); - } - - /* kill the object */ - zval_ptr_dtor(&obj); + return PSFS_ERR_FATAL; } static php_stream_filter_ops userfilter_ops = { - userfilter_write, - userfilter_read, - userfilter_flush, - userfilter_eof, + userfilter_filter, userfilter_dtor, "user-filter" }; @@ -356,9 +213,6 @@ static php_stream_filter *user_filter_factory_create(const char *filtername, fdat->ce = *(zend_class_entry**)fdat->ce; #endif - /* the class *must* be a descendant of the user-space filter - * base class, otherwise it will never work */ - /* TODO: make this sanity check */ } filter = php_stream_filter_alloc(&userfilter_ops, NULL, 0); diff --git a/main/php_streams.h b/main/php_streams.h index 89ba00ba27..5e22792682 100755 --- a/main/php_streams.h +++ b/main/php_streams.h @@ -97,8 +97,8 @@ typedef struct _php_stream_wrapper php_stream_wrapper; typedef struct _php_stream_context php_stream_context; typedef struct _php_stream_filter php_stream_filter; -#include "streams/context.h" -#include "streams/filter_api.h" +#include "streams/php_stream_context.h" +#include "streams/php_stream_filter_api.h" typedef struct _php_stream_statbuf { #if defined(NETWARE) && defined(CLIB_STAT_PATCH) @@ -174,9 +174,8 @@ struct _php_stream { php_stream_ops *ops; void *abstract; /* convenience pointer for abstraction */ - php_stream_filter *filterhead; - php_stream_filter *filtertail; - + php_stream_filter_chain readfilters, writefilters; + php_stream_wrapper *wrapper; /* which wrapper was used to open the stream */ void *wrapperthis; /* convenience pointer for a instance of a wrapper */ zval *wrapperdata; /* fgetwrapperdata retrieves this */ @@ -287,6 +286,7 @@ PHPAPI char *_php_stream_get_line(php_stream *stream, char *buf, size_t maxlen, #define php_stream_gets(stream, buf, maxlen) _php_stream_get_line((stream), (buf), (maxlen), NULL TSRMLS_CC) #define php_stream_get_line(stream, buf, maxlen, retlen) _php_stream_get_line((stream), (buf), (maxlen), (retlen) TSRMLS_CC) +PHPAPI char *php_stream_get_record(php_stream *stream, size_t maxlen, size_t *returned_len, char *delim, size_t delim_len TSRMLS_DC); /* CAREFUL! this is equivalent to puts NOT fputs! */ PHPAPI int _php_stream_puts(php_stream *stream, char *buf TSRMLS_DC); @@ -347,8 +347,8 @@ PHPAPI size_t _php_stream_copy_to_mem(php_stream *src, char **buf, size_t maxlen PHPAPI size_t _php_stream_passthru(php_stream * src STREAMS_DC TSRMLS_DC); #define php_stream_passthru(stream) _php_stream_passthru((stream) STREAMS_CC TSRMLS_CC) -#include "streams/plain_wrapper.h" -#include "streams/userspace.h" +#include "streams/php_stream_plain_wrapper.h" +#include "streams/php_stream_userspace.h" /* coerce the stream into some other form */ /* cast as a stdio FILE * */ diff --git a/main/streams/cast.c b/main/streams/cast.c index d940a1d0ad..95606667b2 100644 --- a/main/streams/cast.c +++ b/main/streams/cast.c @@ -173,7 +173,7 @@ PHPAPI int _php_stream_cast(php_stream *stream, int castas, void **ret, int show * first, to avoid doubling up the layers of stdio with an fopencookie */ if (php_stream_is(stream, PHP_STREAM_IS_STDIO) && stream->ops->cast && - stream->filterhead == NULL && + !php_stream_is_filtered(stream) && stream->ops->cast(stream, castas, ret TSRMLS_CC) == SUCCESS) { goto exit_success; @@ -235,7 +235,7 @@ PHPAPI int _php_stream_cast(php_stream *stream, int castas, void **ret, int show } } - if (stream->filterhead) { + if (php_stream_is_filtered(stream)) { php_error_docref(NULL TSRMLS_CC, E_WARNING, "cannot cast a filtered stream on this system"); return FAILURE; } else if (stream->ops->cast && stream->ops->cast(stream, castas, ret TSRMLS_CC) == SUCCESS) { diff --git a/main/streams/filter.c b/main/streams/filter.c index 5739f0f314..529270571f 100644 --- a/main/streams/filter.c +++ b/main/streams/filter.c @@ -46,6 +46,177 @@ PHPAPI int php_stream_filter_unregister_factory(const char *filterpattern TSRMLS return zend_hash_del(&stream_filters_hash, (char*)filterpattern, strlen(filterpattern)); } +/* Buckets */ + +PHPAPI php_stream_bucket *php_stream_bucket_new(php_stream *stream, char *buf, size_t buflen, int own_buf, int buf_persistent TSRMLS_DC) +{ + int is_persistent = php_stream_is_persistent(stream); + php_stream_bucket *bucket; + + bucket = (php_stream_bucket*)pemalloc(sizeof(php_stream_bucket), is_persistent); + + if (bucket == NULL) { + return NULL; + } + + bucket->next = bucket->prev = NULL; + + if (is_persistent && !buf_persistent) { + /* all data in a persistent bucket must also be persistent */ + bucket->buf = pemalloc(buflen, 1); + + if (bucket->buf == NULL) { + pefree(bucket, 1); + return NULL; + } + + memcpy(bucket->buf, buf, buflen); + bucket->buflen = buflen; + bucket->own_buf = 1; + } else { + bucket->buf = buf; + bucket->buflen = buflen; + bucket->own_buf = own_buf; + } + bucket->is_persistent = is_persistent; + bucket->refcount = 1; + + return bucket; +} + +/* Given a bucket, returns a version of that bucket with a writeable buffer. + * If the original bucket has a refcount of 1 and owns its buffer, then it + * is returned unchanged. + * Otherwise, a copy of the buffer is made. + * In both cases, the original bucket is unlinked from its brigade. + * If a copy is made, the original bucket is delref'd. + * */ +PHPAPI php_stream_bucket *php_stream_bucket_make_writeable(php_stream_bucket *bucket TSRMLS_DC) +{ + php_stream_bucket *retval; + + php_stream_bucket_unlink(bucket TSRMLS_CC); + + if (bucket->refcount == 1 && bucket->own_buf) { + return bucket; + } + + retval = (php_stream_bucket*)pemalloc(sizeof(php_stream_bucket), bucket->is_persistent); + memcpy(retval, bucket, sizeof(*retval)); + + retval->buf = pemalloc(retval->buflen, retval->is_persistent); + memcpy(retval->buf, bucket->buf, retval->buflen); + + retval->refcount = 1; + retval->own_buf = 1; + + php_stream_bucket_delref(bucket); + + return retval; +} + +PHPAPI int php_stream_bucket_split(php_stream_bucket *in, php_stream_bucket **left, php_stream_bucket **right, size_t length TSRMLS_DC) +{ + *left = (php_stream_bucket*)pecalloc(1, sizeof(php_stream_bucket), in->is_persistent); + *right = (php_stream_bucket*)pecalloc(1, sizeof(php_stream_bucket), in->is_persistent); + + if (*left == NULL || *right == NULL) { + goto exit_fail; + } + + (*left)->buf = pemalloc(length, in->is_persistent); + (*left)->buflen = length; + memcpy((*left)->buf, in->buf, length); + (*left)->refcount = 1; + (*left)->own_buf = 1; + (*left)->is_persistent = in->is_persistent; + + (*right)->buflen = in->buflen - length; + (*right)->buf = pemalloc((*right)->buflen, in->is_persistent); + memcpy((*right)->buf, in->buf + length, (*right)->buflen); + (*right)->refcount = 1; + (*right)->own_buf = 1; + (*right)->is_persistent = in->is_persistent; + + return SUCCESS; + +exit_fail: + if (*right) { + if ((*right)->buf) { + pefree((*right)->buf, in->is_persistent); + } + pefree(*right, in->is_persistent); + } + if (*left) { + if ((*left)->buf) { + pefree((*left)->buf, in->is_persistent); + } + pefree(*left, in->is_persistent); + } + return FAILURE; +} + +PHPAPI void php_stream_bucket_delref(php_stream_bucket *bucket TSRMLS_DC) +{ + if (--bucket->refcount == 0) { + if (bucket->own_buf) { + pefree(bucket->buf, bucket->is_persistent); + } + pefree(bucket, bucket->is_persistent); + } +} + +PHPAPI void php_stream_bucket_prepend(php_stream_bucket_brigade *brigade, php_stream_bucket *bucket TSRMLS_DC) +{ + bucket->next = brigade->head; + bucket->prev = NULL; + + if (brigade->head) { + brigade->head->prev = bucket; + } else { + brigade->tail = bucket; + } + brigade->head = bucket; + bucket->brigade = brigade; +} + +PHPAPI void php_stream_bucket_append(php_stream_bucket_brigade *brigade, php_stream_bucket *bucket TSRMLS_DC) +{ + bucket->prev = brigade->tail; + bucket->next = NULL; + + if (brigade->tail) { + brigade->tail->next = bucket; + } else { + brigade->head = bucket; + } + brigade->tail = bucket; + bucket->brigade = brigade; +} + +PHPAPI void php_stream_bucket_unlink(php_stream_bucket *bucket TSRMLS_DC) +{ + if (bucket->prev) { + bucket->prev->next = bucket->next; + } else { + bucket->brigade->head = bucket->next; + } + if (bucket->next) { + bucket->next->prev = bucket->prev; + } else { + bucket->brigade->tail = bucket->prev; + } + bucket->brigade = NULL; + bucket->next = bucket->prev = NULL; +} + + + + + + + + /* We allow very simple pattern matching for filter factories: * if "charset.utf-8/sjis" is requested, we search first for an exact * match. If that fails, we try "charset.*". @@ -106,46 +277,44 @@ PHPAPI void php_stream_filter_free(php_stream_filter *filter TSRMLS_DC) pefree(filter, filter->is_persistent); } -PHPAPI void php_stream_filter_prepend(php_stream *stream, php_stream_filter *filter) +PHPAPI void php_stream_filter_prepend(php_stream_filter_chain *chain, php_stream_filter *filter) { - filter->next = stream->filterhead; + filter->next = chain->head; filter->prev = NULL; - if (stream->filterhead) { - stream->filterhead->prev = filter; + if (chain->head) { + chain->head->prev = filter; } else { - stream->filtertail = filter; + chain->tail = filter; } - stream->filterhead = filter; - filter->stream = stream; + chain->head = filter; + filter->chain = chain; } -PHPAPI void php_stream_filter_append(php_stream *stream, php_stream_filter *filter) +PHPAPI void php_stream_filter_append(php_stream_filter_chain *chain, php_stream_filter *filter) { - filter->prev = stream->filtertail; + filter->prev = chain->tail; filter->next = NULL; - if (stream->filtertail) { - stream->filtertail->next = filter; + if (chain->tail) { + chain->tail->next = filter; } else { - stream->filterhead = filter; + chain->head = filter; } - stream->filtertail = filter; - filter->stream = stream; + chain->tail = filter; + filter->chain = chain; } -PHPAPI php_stream_filter *php_stream_filter_remove(php_stream *stream, php_stream_filter *filter, int call_dtor TSRMLS_DC) +PHPAPI php_stream_filter *php_stream_filter_remove(php_stream_filter *filter, int call_dtor TSRMLS_DC) { - assert(stream == filter->stream); - if (filter->prev) { filter->prev->next = filter->next; } else { - stream->filterhead = filter->next; + filter->chain->head = filter->next; } if (filter->next) { filter->next->prev = filter->prev; } else { - stream->filtertail = filter->prev; + filter->chain->tail = filter->prev; } if (call_dtor) { php_stream_filter_free(filter TSRMLS_CC); diff --git a/main/streams/filter_api.h b/main/streams/filter_api.h deleted file mode 100644 index 535820427b..0000000000 --- a/main/streams/filter_api.h +++ /dev/null @@ -1,85 +0,0 @@ -/* - +----------------------------------------------------------------------+ - | PHP Version 4 | - +----------------------------------------------------------------------+ - | Copyright (c) 1997-2003 The PHP Group | - +----------------------------------------------------------------------+ - | This source file is subject to version 2.02 of the PHP license, | - | that is bundled with this package in the file LICENSE, and is | - | available at through the world-wide-web at | - | http://www.php.net/license/2_02.txt. | - | If you did not receive a copy of the PHP license and are unable to | - | obtain it through the world-wide-web, please send a note to | - | license@php.net so we can mail you a copy immediately. | - +----------------------------------------------------------------------+ - | Author: Wez Furlong (wez@thebrainroom.com) | - +----------------------------------------------------------------------+ - */ - -/* $Id$ */ - -typedef struct _php_stream_filter_ops { - size_t (*write)(php_stream *stream, php_stream_filter *thisfilter, - const char *buf, size_t count TSRMLS_DC); - size_t (*read)(php_stream *stream, php_stream_filter *thisfilter, - char *buf, size_t count TSRMLS_DC); - int (*flush)(php_stream *stream, php_stream_filter *thisfilter, int closing TSRMLS_DC); - int (*eof)(php_stream *stream, php_stream_filter *thisfilter TSRMLS_DC); - void (*dtor)(php_stream_filter *thisfilter TSRMLS_DC); - const char *label; -} php_stream_filter_ops; - -struct _php_stream_filter { - php_stream_filter_ops *fops; - void *abstract; /* for use by filter implementation */ - php_stream_filter *next; - php_stream_filter *prev; - int is_persistent; - php_stream *stream; -}; - -#define php_stream_filter_write_next(stream, thisfilter, buf, size) \ - (thisfilter)->next ? (thisfilter)->next->fops->write((stream), (thisfilter)->next, (buf), (size) TSRMLS_CC) \ - : (stream)->ops->write((stream), (buf), (size) TSRMLS_CC) - -#define php_stream_filter_read_next(stream, thisfilter, buf, size) \ - (thisfilter)->next ? (thisfilter)->next->fops->read((stream), (thisfilter)->next, (buf), (size) TSRMLS_CC) \ - : (stream)->ops->read((stream), (buf), (size) TSRMLS_CC) - -#define php_stream_filter_flush_next(stream, thisfilter, closing) \ - (thisfilter)->next ? (thisfilter)->next->fops->flush((stream), (thisfilter)->next, (closing) TSRMLS_CC) \ - : (stream)->ops->flush((stream) TSRMLS_CC) - -#define php_stream_filter_eof_next(stream, thisfilter) \ - (thisfilter)->next ? (thisfilter)->next->fops->eof((stream), (thisfilter)->next TSRMLS_CC) \ - : (stream)->ops->read((stream), NULL, 0 TSRMLS_CC) == EOF ? 1 : 0 - -/* stack filter onto a stream */ -PHPAPI void php_stream_filter_prepend(php_stream *stream, php_stream_filter *filter); -PHPAPI void php_stream_filter_append(php_stream *stream, php_stream_filter *filter); -PHPAPI php_stream_filter *php_stream_filter_remove(php_stream *stream, php_stream_filter *filter, int call_dtor TSRMLS_DC); -PHPAPI void php_stream_filter_free(php_stream_filter *filter TSRMLS_DC); -PHPAPI php_stream_filter *_php_stream_filter_alloc(php_stream_filter_ops *fops, void *abstract, int persistent STREAMS_DC TSRMLS_DC); -PHPAPI char *php_stream_get_record(php_stream *stream, size_t maxlen, size_t *returned_len, char *delim, size_t delim_len TSRMLS_DC); -#define php_stream_filter_alloc(fops, thisptr, persistent) _php_stream_filter_alloc((fops), (thisptr), (persistent) STREAMS_CC TSRMLS_CC) -#define php_stream_filter_alloc_rel(fops, thisptr, persistent) _php_stream_filter_alloc((fops), (thisptr), (persistent) STREAMS_REL_CC TSRMLS_CC) - -#define php_stream_filter_remove_head(stream, call_dtor) php_stream_filter_remove((stream), (stream)->filterhead, (call_dtor) TSRMLS_CC) -#define php_stream_filter_remove_tail(stream, call_dtor) php_stream_filter_remove((stream), (stream)->filtertail, (call_dtor) TSRMLS_CC) - -typedef struct _php_stream_filter_factory { - php_stream_filter *(*create_filter)(const char *filtername, const char *filterparams, int filterparamslen, int persistent TSRMLS_DC); -} php_stream_filter_factory; - -PHPAPI int php_stream_filter_register_factory(const char *filterpattern, php_stream_filter_factory *factory TSRMLS_DC); -PHPAPI int php_stream_filter_unregister_factory(const char *filterpattern TSRMLS_DC); -PHPAPI php_stream_filter *php_stream_filter_create(const char *filtername, const char *filterparams, int filterparamslen, int persistent TSRMLS_DC); - -/* - * Local variables: - * tab-width: 4 - * c-basic-offset: 4 - * End: - * vim600: sw=4 ts=4 fdm=marker - * vim<600: sw=4 ts=4 - */ diff --git a/main/streams/context.h b/main/streams/php_stream_context.h similarity index 100% rename from main/streams/context.h rename to main/streams/php_stream_context.h diff --git a/main/streams/php_stream_filter_api.h b/main/streams/php_stream_filter_api.h new file mode 100644 index 0000000000..d8130d9f1d --- /dev/null +++ b/main/streams/php_stream_filter_api.h @@ -0,0 +1,139 @@ +/* + +----------------------------------------------------------------------+ + | PHP Version 4 | + +----------------------------------------------------------------------+ + | Copyright (c) 1997-2003 The PHP Group | + +----------------------------------------------------------------------+ + | This source file is subject to version 2.02 of the PHP license, | + | that is bundled with this package in the file LICENSE, and is | + | available at through the world-wide-web at | + | http://www.php.net/license/2_02.txt. | + | If you did not receive a copy of the PHP license and are unable to | + | obtain it through the world-wide-web, please send a note to | + | license@php.net so we can mail you a copy immediately. | + +----------------------------------------------------------------------+ + | Author: Wez Furlong (wez@thebrainroom.com) | + | With suggestions from: | + | Moriyoshi Koizumi | + | Sara Golemon | + +----------------------------------------------------------------------+ + */ + +/* $Id$ */ + +/* The filter API works on the principle of "Bucket-Brigades". This is + * partially inspired by the Apache 2 method of doing things, although + * it is intentially a light-weight implementation. + * + * Each stream can have a chain of filters for reading and another for writing. + * + * When data is written to the stream, is is placed into a bucket and placed at + * the start of the input brigade. + * + * The first filter in the chain is invoked on the brigade and (depending on + * it's return value), the next filter is invoked and so on. + * */ + +typedef struct _php_stream_bucket php_stream_bucket; +typedef struct _php_stream_bucket_brigade php_stream_bucket_brigade; + +struct _php_stream_bucket { + php_stream_bucket *next, *prev; + php_stream_bucket_brigade *brigade; + + char *buf; + size_t buflen; + /* if non-zero, buf should be pefreed when the bucket is destroyed */ + int own_buf; + int is_persistent; + + /* destroy this struct when refcount falls to zero */ + int refcount; +}; + +struct _php_stream_bucket_brigade { + php_stream_bucket *head, *tail; +}; + +typedef enum { + PSFS_ERR_FATAL, /* error in data stream */ + PSFS_FEED_ME, /* filter needs more data; stop processing chain until more is available */ + PSFS_PASS_ON, /* filter generated output buckets; pass them on to next in chain */ +} php_stream_filter_status_t; + +/* Buckets API. */ +PHPAPI php_stream_bucket *php_stream_bucket_new(php_stream *stream, char *buf, size_t buflen, int own_buf, int buf_persistent TSRMLS_DC); +PHPAPI int php_stream_bucket_split(php_stream_bucket *in, php_stream_bucket **left, php_stream_bucket **right, size_t length TSRMLS_DC); +PHPAPI void php_stream_bucket_delref(php_stream_bucket *bucket TSRMLS_DC); +#define php_stream_bucket_addref(bucket) (bucket)->refcount++ +PHPAPI void php_stream_bucket_prepend(php_stream_bucket_brigade *brigade, php_stream_bucket *bucket TSRMLS_DC); +PHPAPI void php_stream_bucket_append(php_stream_bucket_brigade *brigade, php_stream_bucket *bucket TSRMLS_DC); +PHPAPI void php_stream_bucket_unlink(php_stream_bucket *bucket TSRMLS_DC); +PHPAPI php_stream_bucket *php_stream_bucket_make_writeable(php_stream_bucket *bucket TSRMLS_DC); + +#define PSFS_FLAG_NORMAL 0 /* regular read/write */ +#define PSFS_FLAG_FLUSH_INC 1 /* an incremental flush */ +#define PSFS_FLAG_FLUSH_CLOSE 2 /* final flush prior to closing */ + +typedef struct _php_stream_filter_ops { + + php_stream_filter_status_t (*filter)( + php_stream *stream, + php_stream_filter *thisfilter, + php_stream_bucket_brigade *buckets_in, + php_stream_bucket_brigade *buckets_out, + size_t *bytes_consumed, + int flags + TSRMLS_DC); + + void (*dtor)(php_stream_filter *thisfilter TSRMLS_DC); + + const char *label; + +} php_stream_filter_ops; + +typedef struct _php_stream_filter_chain { + php_stream_filter *head, *tail; +} php_stream_filter_chain; + +struct _php_stream_filter { + php_stream_filter_ops *fops; + void *abstract; /* for use by filter implementation */ + php_stream_filter *next; + php_stream_filter *prev; + int is_persistent; + + /* link into stream and chain */ + php_stream_filter_chain *chain; + + /* buffered buckets */ + php_stream_bucket_brigade buffer; +}; + +/* stack filter onto a stream */ +PHPAPI void php_stream_filter_prepend(php_stream_filter_chain *chain, php_stream_filter *filter); +PHPAPI void php_stream_filter_append(php_stream_filter_chain *chain, php_stream_filter *filter); +PHPAPI php_stream_filter *php_stream_filter_remove(php_stream_filter *filter, int call_dtor TSRMLS_DC); +PHPAPI void php_stream_filter_free(php_stream_filter *filter TSRMLS_DC); +PHPAPI php_stream_filter *_php_stream_filter_alloc(php_stream_filter_ops *fops, void *abstract, int persistent STREAMS_DC TSRMLS_DC); +#define php_stream_filter_alloc(fops, thisptr, persistent) _php_stream_filter_alloc((fops), (thisptr), (persistent) STREAMS_CC TSRMLS_CC) +#define php_stream_filter_alloc_rel(fops, thisptr, persistent) _php_stream_filter_alloc((fops), (thisptr), (persistent) STREAMS_REL_CC TSRMLS_CC) + +#define php_stream_is_filtered(stream) ((stream)->readfilters.head || (stream)->writefilters.head) + +typedef struct _php_stream_filter_factory { + php_stream_filter *(*create_filter)(const char *filtername, const char *filterparams, int filterparamslen, int persistent TSRMLS_DC); +} php_stream_filter_factory; + +PHPAPI int php_stream_filter_register_factory(const char *filterpattern, php_stream_filter_factory *factory TSRMLS_DC); +PHPAPI int php_stream_filter_unregister_factory(const char *filterpattern TSRMLS_DC); +PHPAPI php_stream_filter *php_stream_filter_create(const char *filtername, const char *filterparams, int filterparamslen, int persistent TSRMLS_DC); + +/* + * Local variables: + * tab-width: 4 + * c-basic-offset: 4 + * End: + * vim600: sw=4 ts=4 fdm=marker + * vim<600: sw=4 ts=4 + */ diff --git a/main/streams/plain_wrapper.h b/main/streams/php_stream_plain_wrapper.h similarity index 100% rename from main/streams/plain_wrapper.h rename to main/streams/php_stream_plain_wrapper.h diff --git a/main/streams/userspace.h b/main/streams/php_stream_userspace.h similarity index 100% rename from main/streams/userspace.h rename to main/streams/php_stream_userspace.h diff --git a/main/streams/streams.c b/main/streams/streams.c index 6184d03829..ee33cdefed 100755 --- a/main/streams/streams.c +++ b/main/streams/streams.c @@ -310,8 +310,11 @@ fprintf(stderr, "stream_free: %s:%p[%s] preserve_handle=%d release_cast=%d remov if (close_options & PHP_STREAM_FREE_RELEASE_STREAM) { - while (stream->filterhead) { - php_stream_filter_remove_head(stream, 1); + while (stream->readfilters.head) { + php_stream_filter_remove(stream->readfilters.head, 1); + } + while (stream->writefilters.head) { + php_stream_filter_remove(stream->writefilters.head, 1); } if (stream->wrapper && stream->wrapper->wops && stream->wrapper->wops->stream_closer) { @@ -363,44 +366,129 @@ static void php_stream_fill_read_buffer(php_stream *stream, size_t size TSRMLS_D { /* allocate/fill the buffer */ - /* is there enough data in the buffer ? */ - if (stream->writepos - stream->readpos < (off_t)size) { - size_t justread = 0; - - /* ignore eof here; the underlying state might have changed */ - - /* no; so lets fetch more data */ - - /* reduce buffer memory consumption if possible, to avoid a realloc */ - if (stream->readbuf && stream->readbuflen - stream->writepos < stream->chunk_size) { - memmove(stream->readbuf, stream->readbuf + stream->readpos, stream->readbuflen - stream->readpos); - stream->writepos -= stream->readpos; - stream->readpos = 0; - } + if (stream->readfilters.head) { + char *chunk_buf; + int err_flag = 0; + php_stream_bucket_brigade brig_in = { NULL, NULL }, brig_out = { NULL, NULL }; + php_stream_bucket_brigade *brig_inp = &brig_in, *brig_outp = &brig_out, *brig_swap; + + /* allocate a buffer for reading chunks */ + chunk_buf = emalloc(stream->chunk_size); + + while (!err_flag && (stream->writepos - stream->readpos < (off_t)size)) { + size_t justread = 0; + int flags; + php_stream_bucket *bucket; + php_stream_filter_status_t status; + php_stream_filter *filter; + + /* read a chunk into a bucket */ + justread = stream->ops->read(stream, chunk_buf, stream->chunk_size TSRMLS_CC); + if (justread > 0) { + bucket = php_stream_bucket_new(stream, chunk_buf, justread, 0, 0 TSRMLS_CC); + + /* after this call, bucket is owned by the brigade */ + php_stream_bucket_append(brig_inp, bucket); + + flags = PSFS_FLAG_NORMAL; + } else { + flags = stream->eof ? PSFS_FLAG_FLUSH_CLOSE : PSFS_FLAG_FLUSH_INC; + } - /* grow the buffer if required */ - if (stream->readbuflen - stream->writepos < stream->chunk_size) { - stream->readbuflen += stream->chunk_size; - stream->readbuf = perealloc(stream->readbuf, stream->readbuflen, - stream->is_persistent); + /* wind the handle... */ + for (filter = stream->readfilters.head; filter; filter = filter->next) { + status = filter->fops->filter(stream, filter, brig_inp, brig_outp, NULL, flags TSRMLS_CC); + + if (status != PSFS_PASS_ON) { + break; + } + + /* brig_out becomes brig_in. + * brig_in will always be empty here, as the filter MUST attach any un-consumed buckets + * to its own brigade */ + brig_swap = brig_inp; + brig_inp = brig_outp; + brig_outp = brig_swap; + memset(brig_outp, 0, sizeof(*brig_outp)); + } + + switch (status) { + case PSFS_PASS_ON: + /* we get here when the last filter in the chain has data to pass on. + * in this situation, we are passing the brig_in brigade into the + * stream read buffer */ + while (brig_inp->head) { + bucket = brig_inp->head; + /* grow buffer to hold this bucket + * TODO: this can fail for persistent streams */ + if (stream->readbuflen - stream->writepos < bucket->buflen) { + stream->readbuflen += bucket->buflen; + stream->readbuf = perealloc(stream->readbuf, stream->readbuflen, + stream->is_persistent); + } + memcpy(stream->readbuf + stream->writepos, bucket->buf, bucket->buflen); + stream->writepos += bucket->buflen; + + php_stream_bucket_unlink(bucket TSRMLS_CC); + php_stream_bucket_delref(bucket TSRMLS_CC); + } + + break; + + case PSFS_FEED_ME: + /* when a filter needs feeding, there is no brig_out to deal with. + * we simply continue the loop; if the caller needs more data, + * we will read again, otherwise out job is done here */ + if (justread == 0) { + /* there is no data */ + err_flag = 1; + break; + } + continue; + + case PSFS_ERR_FATAL: + /* some fatal error. Theoretically, the stream is borked, so all + * further reads should fail. */ + err_flag = 1; + break; + } + + if (justread == 0) { + break; + } } - - if (stream->filterhead) { - justread = stream->filterhead->fops->read(stream, stream->filterhead, - stream->readbuf + stream->writepos, - stream->readbuflen - stream->writepos - TSRMLS_CC); - } else { + + efree(chunk_buf); + + } else { + /* is there enough data in the buffer ? */ + if (stream->writepos - stream->readpos < (off_t)size) { + size_t justread = 0; + + /* reduce buffer memory consumption if possible, to avoid a realloc */ + if (stream->readbuf && stream->readbuflen - stream->writepos < stream->chunk_size) { + memmove(stream->readbuf, stream->readbuf + stream->readpos, stream->readbuflen - stream->readpos); + stream->writepos -= stream->readpos; + stream->readpos = 0; + } + + /* grow the buffer if required + * TODO: this can fail for persistent streams */ + if (stream->readbuflen - stream->writepos < stream->chunk_size) { + stream->readbuflen += stream->chunk_size; + stream->readbuf = perealloc(stream->readbuf, stream->readbuflen, + stream->is_persistent); + } + justread = stream->ops->read(stream, stream->readbuf + stream->writepos, stream->readbuflen - stream->writepos TSRMLS_CC); - } - if (justread != (size_t)-1) { - stream->writepos += justread; + if (justread != (size_t)-1) { + stream->writepos += justread; + } } } - } PHPAPI size_t _php_stream_read(php_stream *stream, char *buf, size_t size TSRMLS_DC) @@ -431,14 +519,8 @@ PHPAPI size_t _php_stream_read(php_stream *stream, char *buf, size_t size TSRMLS break; } - if (stream->flags & PHP_STREAM_FLAG_NO_BUFFER || stream->chunk_size == 1) { - if (stream->filterhead) { - toread = stream->filterhead->fops->read(stream, stream->filterhead, - buf, size - TSRMLS_CC); - } else { - toread = stream->ops->read(stream, buf, size TSRMLS_CC); - } + if (!stream->readfilters.head && (stream->flags & PHP_STREAM_FLAG_NO_BUFFER || stream->chunk_size == 1)) { + toread = stream->ops->read(stream, buf, size TSRMLS_CC); } else { php_stream_fill_read_buffer(stream, size TSRMLS_CC); @@ -715,38 +797,18 @@ PHPAPI char *php_stream_get_record(php_stream *stream, size_t maxlen, size_t *re } } -PHPAPI int _php_stream_flush(php_stream *stream, int closing TSRMLS_DC) -{ - int ret = 0; - - if (stream->filterhead) - stream->filterhead->fops->flush(stream, stream->filterhead, closing TSRMLS_CC); - - if (stream->ops->flush) { - ret = stream->ops->flush(stream TSRMLS_CC); - } - - return ret; -} - -PHPAPI size_t _php_stream_write(php_stream *stream, const char *buf, size_t count TSRMLS_DC) +/* Writes a buffer directly to a stream, using multiple of the chunk size */ +static size_t _php_stream_write_buffer(php_stream *stream, const char *buf, size_t count TSRMLS_DC) { size_t didwrite = 0, towrite, justwrote; - - assert(stream); - if (buf == NULL || count == 0 || stream->ops->write == NULL) - return 0; while (count > 0) { towrite = count; if (towrite > stream->chunk_size) towrite = stream->chunk_size; - if (stream->filterhead) { - justwrote = stream->filterhead->fops->write(stream, stream->filterhead, buf, towrite TSRMLS_CC); - } else { - justwrote = stream->ops->write(stream, buf, towrite TSRMLS_CC); - } + justwrote = stream->ops->write(stream, buf, towrite TSRMLS_CC); + if (justwrote > 0) { buf += justwrote; count -= justwrote; @@ -754,7 +816,7 @@ PHPAPI size_t _php_stream_write(php_stream *stream, const char *buf, size_t coun /* Only screw with the buffer if we can seek, otherwise we lose data * buffered from fifos and sockets */ - if (stream->ops->seek && (stream->flags & PHP_STREAM_FLAG_NO_SEEK) == 0) { + if (stream->ops->seek && (stream->flags & PHP_STREAM_FLAG_NO_SEEK) == 0 && !php_stream_is_filtered(stream)) { stream->position += justwrote; stream->writepos = 0; stream->readpos = 0; @@ -764,6 +826,100 @@ PHPAPI size_t _php_stream_write(php_stream *stream, const char *buf, size_t coun } } return didwrite; + +} + +/* push some data through the write filter chain. + * buf may be NULL, if flags are set to indicate a flush. + * This may trigger a real write to the stream. + * Returns the number of bytes consumed from buf by the first filter in the chain. + * */ +static size_t _php_stream_write_filtered(php_stream *stream, const char *buf, size_t count, int flags TSRMLS_DC) +{ + size_t consumed = 0; + php_stream_bucket *bucket; + php_stream_bucket_brigade brig_in = { NULL, NULL }, brig_out = { NULL, NULL }; + php_stream_bucket_brigade *brig_inp = &brig_in, *brig_outp = &brig_out, *brig_swap; + php_stream_filter_status_t status; + php_stream_filter *filter; + + if (buf) { + bucket = php_stream_bucket_new(stream, (char *)buf, count, 0, 0 TSRMLS_CC); + php_stream_bucket_append(&brig_in, bucket); + } + + for (filter = stream->writefilters.head; filter; filter = filter->next) { + /* for our return value, we are interested in the number of bytes consumed from + * the first filter in the chain */ + status = filter->fops->filter(stream, filter, brig_inp, brig_outp, + filter == stream->writefilters.head ? &consumed : NULL, flags TSRMLS_CC); + + if (status != PSFS_PASS_ON) { + break; + } + /* brig_out becomes brig_in. + * brig_in will always be empty here, as the filter MUST attach any un-consumed buckets + * to its own brigade */ + brig_swap = brig_inp; + brig_inp = brig_outp; + brig_outp = brig_swap; + memset(brig_outp, 0, sizeof(*brig_outp)); + } + + switch (status) { + case PSFS_PASS_ON: + /* filter chain generated some output; push it through to the + * underlying stream */ + while (brig_inp->head) { + bucket = brig_inp->head; + _php_stream_write_buffer(stream, bucket->buf, bucket->buflen TSRMLS_CC); + /* Potential error situation - eg: no space on device. Perhaps we should keep this brigade + * hanging around and try to write it later. + * At the moment, we just drop it on the floor + * */ + + php_stream_bucket_unlink(bucket TSRMLS_CC); + php_stream_bucket_delref(bucket TSRMLS_CC); + } + break; + case PSFS_FEED_ME: + /* need more data before we can push data through to the stream */ + break; + + case PSFS_ERR_FATAL: + /* some fatal error. Theoretically, the stream is borked, so all + * further writes should fail. */ + break; + } + + return consumed; +} + +PHPAPI int _php_stream_flush(php_stream *stream, int closing TSRMLS_DC) +{ + int ret = 0; + + if (stream->writefilters.head) { + _php_stream_write_filtered(stream, NULL, 0, closing ? PSFS_FLAG_FLUSH_CLOSE : PSFS_FLAG_FLUSH_INC TSRMLS_CC); + } + + if (stream->ops->flush) { + ret = stream->ops->flush(stream TSRMLS_CC); + } + + return ret; +} + +PHPAPI size_t _php_stream_write(php_stream *stream, const char *buf, size_t count TSRMLS_DC) +{ + if (buf == NULL || count == 0 || stream->ops->write == NULL) + return 0; + + if (stream->writefilters.head) { + return _php_stream_write_filtered(stream, buf, count, PSFS_FLAG_NORMAL TSRMLS_CC); + } else { + return _php_stream_write_buffer(stream, buf, count TSRMLS_CC); + } } PHPAPI size_t _php_stream_printf(php_stream *stream TSRMLS_DC, const char *fmt, ...) @@ -825,8 +981,9 @@ PHPAPI int _php_stream_seek(php_stream *stream, off_t offset, int whence TSRMLS_ if (stream->ops->seek && (stream->flags & PHP_STREAM_FLAG_NO_SEEK) == 0) { int ret; - if (stream->filterhead) - stream->filterhead->fops->flush(stream, stream->filterhead, 0 TSRMLS_CC); + if (stream->writefilters.head) { + _php_stream_flush(stream, 0 TSRMLS_CC); + } switch(whence) { case SEEK_CUR: @@ -910,7 +1067,7 @@ PHPAPI size_t _php_stream_passthru(php_stream * stream STREAMS_DC TSRMLS_DC) #ifdef HAVE_MMAP if (!php_stream_is(stream, PHP_STREAM_IS_SOCKET) - && stream->filterhead == NULL + && !php_stream_is_filtered(stream) && php_stream_tell(stream) == 0 && SUCCESS == php_stream_cast(stream, PHP_STREAM_AS_FD, (void*)&fd, 0)) { @@ -975,7 +1132,7 @@ PHPAPI size_t _php_stream_copy_to_mem(php_stream *src, char **buf, size_t maxlen * buffering layer. * */ if ( php_stream_is(src, PHP_STREAM_IS_STDIO) && - src->filterhead == NULL && + !php_stream_is_filtered(src) && php_stream_tell(src) == 0 && SUCCESS == php_stream_cast(src, PHP_STREAM_AS_FD, (void**)&srcfd, 0)) { @@ -1057,7 +1214,7 @@ PHPAPI size_t _php_stream_copy_to_stream(php_stream *src, php_stream *dest, size * buffering layer. * */ if ( php_stream_is(src, PHP_STREAM_IS_STDIO) && - src->filterhead == NULL && + !php_stream_is_filtered(src) && php_stream_tell(src) == 0 && SUCCESS == php_stream_cast(src, PHP_STREAM_AS_FD, (void**)&srcfd, 0)) {