]> granicus.if.org Git - icinga2/blob - third-party/hiredis/hiredis.c
Use hiredis 0.13.3
[icinga2] / third-party / hiredis / hiredis.c
1 /*
2  * Copyright (c) 2009-2011, Salvatore Sanfilippo <antirez at gmail dot com>
3  * Copyright (c) 2010-2014, Pieter Noordhuis <pcnoordhuis at gmail dot com>
4  * Copyright (c) 2015, Matt Stancliff <matt at genges dot com>,
5  *                     Jan-Erik Rediger <janerik at fnordig dot com>
6  *
7  * All rights reserved.
8  *
9  * Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions are met:
11  *
12  *   * Redistributions of source code must retain the above copyright notice,
13  *     this list of conditions and the following disclaimer.
14  *   * Redistributions in binary form must reproduce the above copyright
15  *     notice, this list of conditions and the following disclaimer in the
16  *     documentation and/or other materials provided with the distribution.
17  *   * Neither the name of Redis nor the names of its contributors may be used
18  *     to endorse or promote products derived from this software without
19  *     specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
22  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
24  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
25  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
26  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
27  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
28  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
29  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
30  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31  * POSSIBILITY OF SUCH DAMAGE.
32  */
33
34 #include "fmacros.h"
35 #include <string.h>
36 #include <stdlib.h>
37 #include <unistd.h>
38 #include <assert.h>
39 #include <errno.h>
40 #include <ctype.h>
41
42 #include "hiredis.h"
43 #include "net.h"
44 #include "sds.h"
45
46 static redisReply *createReplyObject(int type);
47 static void *createStringObject(const redisReadTask *task, char *str, size_t len);
48 static void *createArrayObject(const redisReadTask *task, int elements);
49 static void *createIntegerObject(const redisReadTask *task, long long value);
50 static void *createNilObject(const redisReadTask *task);
51
52 /* Default set of functions to build the reply. Keep in mind that such a
53  * function returning NULL is interpreted as OOM. */
54 static redisReplyObjectFunctions defaultFunctions = {
55     createStringObject,
56     createArrayObject,
57     createIntegerObject,
58     createNilObject,
59     freeReplyObject
60 };
61
62 /* Create a reply object */
63 static redisReply *createReplyObject(int type) {
64     redisReply *r = calloc(1,sizeof(*r));
65
66     if (r == NULL)
67         return NULL;
68
69     r->type = type;
70     return r;
71 }
72
73 /* Free a reply object */
74 void freeReplyObject(void *reply) {
75     redisReply *r = reply;
76     size_t j;
77
78     if (r == NULL)
79         return;
80
81     switch(r->type) {
82     case REDIS_REPLY_INTEGER:
83         break; /* Nothing to free */
84     case REDIS_REPLY_ARRAY:
85         if (r->element != NULL) {
86             for (j = 0; j < r->elements; j++)
87                 if (r->element[j] != NULL)
88                     freeReplyObject(r->element[j]);
89             free(r->element);
90         }
91         break;
92     case REDIS_REPLY_ERROR:
93     case REDIS_REPLY_STATUS:
94     case REDIS_REPLY_STRING:
95         if (r->str != NULL)
96             free(r->str);
97         break;
98     }
99     free(r);
100 }
101
102 static void *createStringObject(const redisReadTask *task, char *str, size_t len) {
103     redisReply *r, *parent;
104     char *buf;
105
106     r = createReplyObject(task->type);
107     if (r == NULL)
108         return NULL;
109
110     buf = malloc(len+1);
111     if (buf == NULL) {
112         freeReplyObject(r);
113         return NULL;
114     }
115
116     assert(task->type == REDIS_REPLY_ERROR  ||
117            task->type == REDIS_REPLY_STATUS ||
118            task->type == REDIS_REPLY_STRING);
119
120     /* Copy string value */
121     memcpy(buf,str,len);
122     buf[len] = '\0';
123     r->str = buf;
124     r->len = len;
125
126     if (task->parent) {
127         parent = task->parent->obj;
128         assert(parent->type == REDIS_REPLY_ARRAY);
129         parent->element[task->idx] = r;
130     }
131     return r;
132 }
133
134 static void *createArrayObject(const redisReadTask *task, int elements) {
135     redisReply *r, *parent;
136
137     r = createReplyObject(REDIS_REPLY_ARRAY);
138     if (r == NULL)
139         return NULL;
140
141     if (elements > 0) {
142         r->element = calloc(elements,sizeof(redisReply*));
143         if (r->element == NULL) {
144             freeReplyObject(r);
145             return NULL;
146         }
147     }
148
149     r->elements = elements;
150
151     if (task->parent) {
152         parent = task->parent->obj;
153         assert(parent->type == REDIS_REPLY_ARRAY);
154         parent->element[task->idx] = r;
155     }
156     return r;
157 }
158
159 static void *createIntegerObject(const redisReadTask *task, long long value) {
160     redisReply *r, *parent;
161
162     r = createReplyObject(REDIS_REPLY_INTEGER);
163     if (r == NULL)
164         return NULL;
165
166     r->integer = value;
167
168     if (task->parent) {
169         parent = task->parent->obj;
170         assert(parent->type == REDIS_REPLY_ARRAY);
171         parent->element[task->idx] = r;
172     }
173     return r;
174 }
175
176 static void *createNilObject(const redisReadTask *task) {
177     redisReply *r, *parent;
178
179     r = createReplyObject(REDIS_REPLY_NIL);
180     if (r == NULL)
181         return NULL;
182
183     if (task->parent) {
184         parent = task->parent->obj;
185         assert(parent->type == REDIS_REPLY_ARRAY);
186         parent->element[task->idx] = r;
187     }
188     return r;
189 }
190
191 /* Return the number of digits of 'v' when converted to string in radix 10.
192  * Implementation borrowed from link in redis/src/util.c:string2ll(). */
193 static uint32_t countDigits(uint64_t v) {
194   uint32_t result = 1;
195   for (;;) {
196     if (v < 10) return result;
197     if (v < 100) return result + 1;
198     if (v < 1000) return result + 2;
199     if (v < 10000) return result + 3;
200     v /= 10000U;
201     result += 4;
202   }
203 }
204
205 /* Helper that calculates the bulk length given a certain string length. */
206 static size_t bulklen(size_t len) {
207     return 1+countDigits(len)+2+len+2;
208 }
209
210 int redisvFormatCommand(char **target, const char *format, va_list ap) {
211     const char *c = format;
212     char *cmd = NULL; /* final command */
213     int pos; /* position in final command */
214     sds curarg, newarg; /* current argument */
215     int touched = 0; /* was the current argument touched? */
216     char **curargv = NULL, **newargv = NULL;
217     int argc = 0;
218     int totlen = 0;
219     int error_type = 0; /* 0 = no error; -1 = memory error; -2 = format error */
220     int j;
221
222     /* Abort if there is not target to set */
223     if (target == NULL)
224         return -1;
225
226     /* Build the command string accordingly to protocol */
227     curarg = sdsempty();
228     if (curarg == NULL)
229         return -1;
230
231     while(*c != '\0') {
232         if (*c != '%' || c[1] == '\0') {
233             if (*c == ' ') {
234                 if (touched) {
235                     newargv = realloc(curargv,sizeof(char*)*(argc+1));
236                     if (newargv == NULL) goto memory_err;
237                     curargv = newargv;
238                     curargv[argc++] = curarg;
239                     totlen += bulklen(sdslen(curarg));
240
241                     /* curarg is put in argv so it can be overwritten. */
242                     curarg = sdsempty();
243                     if (curarg == NULL) goto memory_err;
244                     touched = 0;
245                 }
246             } else {
247                 newarg = sdscatlen(curarg,c,1);
248                 if (newarg == NULL) goto memory_err;
249                 curarg = newarg;
250                 touched = 1;
251             }
252         } else {
253             char *arg;
254             size_t size;
255
256             /* Set newarg so it can be checked even if it is not touched. */
257             newarg = curarg;
258
259             switch(c[1]) {
260             case 's':
261                 arg = va_arg(ap,char*);
262                 size = strlen(arg);
263                 if (size > 0)
264                     newarg = sdscatlen(curarg,arg,size);
265                 break;
266             case 'b':
267                 arg = va_arg(ap,char*);
268                 size = va_arg(ap,size_t);
269                 if (size > 0)
270                     newarg = sdscatlen(curarg,arg,size);
271                 break;
272             case '%':
273                 newarg = sdscat(curarg,"%");
274                 break;
275             default:
276                 /* Try to detect printf format */
277                 {
278                     static const char intfmts[] = "diouxX";
279                     static const char flags[] = "#0-+ ";
280                     char _format[16];
281                     const char *_p = c+1;
282                     size_t _l = 0;
283                     va_list _cpy;
284
285                     /* Flags */
286                     while (*_p != '\0' && strchr(flags,*_p) != NULL) _p++;
287
288                     /* Field width */
289                     while (*_p != '\0' && isdigit(*_p)) _p++;
290
291                     /* Precision */
292                     if (*_p == '.') {
293                         _p++;
294                         while (*_p != '\0' && isdigit(*_p)) _p++;
295                     }
296
297                     /* Copy va_list before consuming with va_arg */
298                     va_copy(_cpy,ap);
299
300                     /* Integer conversion (without modifiers) */
301                     if (strchr(intfmts,*_p) != NULL) {
302                         va_arg(ap,int);
303                         goto fmt_valid;
304                     }
305
306                     /* Double conversion (without modifiers) */
307                     if (strchr("eEfFgGaA",*_p) != NULL) {
308                         va_arg(ap,double);
309                         goto fmt_valid;
310                     }
311
312                     /* Size: char */
313                     if (_p[0] == 'h' && _p[1] == 'h') {
314                         _p += 2;
315                         if (*_p != '\0' && strchr(intfmts,*_p) != NULL) {
316                             va_arg(ap,int); /* char gets promoted to int */
317                             goto fmt_valid;
318                         }
319                         goto fmt_invalid;
320                     }
321
322                     /* Size: short */
323                     if (_p[0] == 'h') {
324                         _p += 1;
325                         if (*_p != '\0' && strchr(intfmts,*_p) != NULL) {
326                             va_arg(ap,int); /* short gets promoted to int */
327                             goto fmt_valid;
328                         }
329                         goto fmt_invalid;
330                     }
331
332                     /* Size: long long */
333                     if (_p[0] == 'l' && _p[1] == 'l') {
334                         _p += 2;
335                         if (*_p != '\0' && strchr(intfmts,*_p) != NULL) {
336                             va_arg(ap,long long);
337                             goto fmt_valid;
338                         }
339                         goto fmt_invalid;
340                     }
341
342                     /* Size: long */
343                     if (_p[0] == 'l') {
344                         _p += 1;
345                         if (*_p != '\0' && strchr(intfmts,*_p) != NULL) {
346                             va_arg(ap,long);
347                             goto fmt_valid;
348                         }
349                         goto fmt_invalid;
350                     }
351
352                 fmt_invalid:
353                     va_end(_cpy);
354                     goto format_err;
355
356                 fmt_valid:
357                     _l = (_p+1)-c;
358                     if (_l < sizeof(_format)-2) {
359                         memcpy(_format,c,_l);
360                         _format[_l] = '\0';
361                         newarg = sdscatvprintf(curarg,_format,_cpy);
362
363                         /* Update current position (note: outer blocks
364                          * increment c twice so compensate here) */
365                         c = _p-1;
366                     }
367
368                     va_end(_cpy);
369                     break;
370                 }
371             }
372
373             if (newarg == NULL) goto memory_err;
374             curarg = newarg;
375
376             touched = 1;
377             c++;
378         }
379         c++;
380     }
381
382     /* Add the last argument if needed */
383     if (touched) {
384         newargv = realloc(curargv,sizeof(char*)*(argc+1));
385         if (newargv == NULL) goto memory_err;
386         curargv = newargv;
387         curargv[argc++] = curarg;
388         totlen += bulklen(sdslen(curarg));
389     } else {
390         sdsfree(curarg);
391     }
392
393     /* Clear curarg because it was put in curargv or was free'd. */
394     curarg = NULL;
395
396     /* Add bytes needed to hold multi bulk count */
397     totlen += 1+countDigits(argc)+2;
398
399     /* Build the command at protocol level */
400     cmd = malloc(totlen+1);
401     if (cmd == NULL) goto memory_err;
402
403     pos = sprintf(cmd,"*%d\r\n",argc);
404     for (j = 0; j < argc; j++) {
405         pos += sprintf(cmd+pos,"$%zu\r\n",sdslen(curargv[j]));
406         memcpy(cmd+pos,curargv[j],sdslen(curargv[j]));
407         pos += sdslen(curargv[j]);
408         sdsfree(curargv[j]);
409         cmd[pos++] = '\r';
410         cmd[pos++] = '\n';
411     }
412     assert(pos == totlen);
413     cmd[pos] = '\0';
414
415     free(curargv);
416     *target = cmd;
417     return totlen;
418
419 format_err:
420     error_type = -2;
421     goto cleanup;
422
423 memory_err:
424     error_type = -1;
425     goto cleanup;
426
427 cleanup:
428     if (curargv) {
429         while(argc--)
430             sdsfree(curargv[argc]);
431         free(curargv);
432     }
433
434     sdsfree(curarg);
435
436     /* No need to check cmd since it is the last statement that can fail,
437      * but do it anyway to be as defensive as possible. */
438     if (cmd != NULL)
439         free(cmd);
440
441     return error_type;
442 }
443
444 /* Format a command according to the Redis protocol. This function
445  * takes a format similar to printf:
446  *
447  * %s represents a C null terminated string you want to interpolate
448  * %b represents a binary safe string
449  *
450  * When using %b you need to provide both the pointer to the string
451  * and the length in bytes as a size_t. Examples:
452  *
453  * len = redisFormatCommand(target, "GET %s", mykey);
454  * len = redisFormatCommand(target, "SET %s %b", mykey, myval, myvallen);
455  */
456 int redisFormatCommand(char **target, const char *format, ...) {
457     va_list ap;
458     int len;
459     va_start(ap,format);
460     len = redisvFormatCommand(target,format,ap);
461     va_end(ap);
462
463     /* The API says "-1" means bad result, but we now also return "-2" in some
464      * cases.  Force the return value to always be -1. */
465     if (len < 0)
466         len = -1;
467
468     return len;
469 }
470
471 /* Format a command according to the Redis protocol using an sds string and
472  * sdscatfmt for the processing of arguments. This function takes the
473  * number of arguments, an array with arguments and an array with their
474  * lengths. If the latter is set to NULL, strlen will be used to compute the
475  * argument lengths.
476  */
477 int redisFormatSdsCommandArgv(sds *target, int argc, const char **argv,
478                               const size_t *argvlen)
479 {
480     sds cmd;
481     unsigned long long totlen;
482     int j;
483     size_t len;
484
485     /* Abort on a NULL target */
486     if (target == NULL)
487         return -1;
488
489     /* Calculate our total size */
490     totlen = 1+countDigits(argc)+2;
491     for (j = 0; j < argc; j++) {
492         len = argvlen ? argvlen[j] : strlen(argv[j]);
493         totlen += bulklen(len);
494     }
495
496     /* Use an SDS string for command construction */
497     cmd = sdsempty();
498     if (cmd == NULL)
499         return -1;
500
501     /* We already know how much storage we need */
502     cmd = sdsMakeRoomFor(cmd, totlen);
503     if (cmd == NULL)
504         return -1;
505
506     /* Construct command */
507     cmd = sdscatfmt(cmd, "*%i\r\n", argc);
508     for (j=0; j < argc; j++) {
509         len = argvlen ? argvlen[j] : strlen(argv[j]);
510         cmd = sdscatfmt(cmd, "$%T\r\n", len);
511         cmd = sdscatlen(cmd, argv[j], len);
512         cmd = sdscatlen(cmd, "\r\n", sizeof("\r\n")-1);
513     }
514
515     assert(sdslen(cmd)==totlen);
516
517     *target = cmd;
518     return totlen;
519 }
520
521 void redisFreeSdsCommand(sds cmd) {
522     sdsfree(cmd);
523 }
524
525 /* Format a command according to the Redis protocol. This function takes the
526  * number of arguments, an array with arguments and an array with their
527  * lengths. If the latter is set to NULL, strlen will be used to compute the
528  * argument lengths.
529  */
530 int redisFormatCommandArgv(char **target, int argc, const char **argv, const size_t *argvlen) {
531     char *cmd = NULL; /* final command */
532     int pos; /* position in final command */
533     size_t len;
534     int totlen, j;
535
536     /* Abort on a NULL target */
537     if (target == NULL)
538         return -1;
539
540     /* Calculate number of bytes needed for the command */
541     totlen = 1+countDigits(argc)+2;
542     for (j = 0; j < argc; j++) {
543         len = argvlen ? argvlen[j] : strlen(argv[j]);
544         totlen += bulklen(len);
545     }
546
547     /* Build the command at protocol level */
548     cmd = malloc(totlen+1);
549     if (cmd == NULL)
550         return -1;
551
552     pos = sprintf(cmd,"*%d\r\n",argc);
553     for (j = 0; j < argc; j++) {
554         len = argvlen ? argvlen[j] : strlen(argv[j]);
555         pos += sprintf(cmd+pos,"$%zu\r\n",len);
556         memcpy(cmd+pos,argv[j],len);
557         pos += len;
558         cmd[pos++] = '\r';
559         cmd[pos++] = '\n';
560     }
561     assert(pos == totlen);
562     cmd[pos] = '\0';
563
564     *target = cmd;
565     return totlen;
566 }
567
568 void redisFreeCommand(char *cmd) {
569     free(cmd);
570 }
571
572 void __redisSetError(redisContext *c, int type, const char *str) {
573     size_t len;
574
575     c->err = type;
576     if (str != NULL) {
577         len = strlen(str);
578         len = len < (sizeof(c->errstr)-1) ? len : (sizeof(c->errstr)-1);
579         memcpy(c->errstr,str,len);
580         c->errstr[len] = '\0';
581     } else {
582         /* Only REDIS_ERR_IO may lack a description! */
583         assert(type == REDIS_ERR_IO);
584         __redis_strerror_r(errno, c->errstr, sizeof(c->errstr));
585     }
586 }
587
588 redisReader *redisReaderCreate(void) {
589     return redisReaderCreateWithFunctions(&defaultFunctions);
590 }
591
592 static redisContext *redisContextInit(void) {
593     redisContext *c;
594
595     c = calloc(1,sizeof(redisContext));
596     if (c == NULL)
597         return NULL;
598
599     c->err = 0;
600     c->errstr[0] = '\0';
601     c->obuf = sdsempty();
602     c->reader = redisReaderCreate();
603     c->tcp.host = NULL;
604     c->tcp.source_addr = NULL;
605     c->unix_sock.path = NULL;
606     c->timeout = NULL;
607
608     if (c->obuf == NULL || c->reader == NULL) {
609         redisFree(c);
610         return NULL;
611     }
612
613     return c;
614 }
615
616 void redisFree(redisContext *c) {
617     if (c == NULL)
618         return;
619     if (c->fd > 0)
620         close(c->fd);
621     if (c->obuf != NULL)
622         sdsfree(c->obuf);
623     if (c->reader != NULL)
624         redisReaderFree(c->reader);
625     if (c->tcp.host)
626         free(c->tcp.host);
627     if (c->tcp.source_addr)
628         free(c->tcp.source_addr);
629     if (c->unix_sock.path)
630         free(c->unix_sock.path);
631     if (c->timeout)
632         free(c->timeout);
633     free(c);
634 }
635
636 int redisFreeKeepFd(redisContext *c) {
637     int fd = c->fd;
638     c->fd = -1;
639     redisFree(c);
640     return fd;
641 }
642
643 int redisReconnect(redisContext *c) {
644     c->err = 0;
645     memset(c->errstr, '\0', strlen(c->errstr));
646
647     if (c->fd > 0) {
648         close(c->fd);
649     }
650
651     sdsfree(c->obuf);
652     redisReaderFree(c->reader);
653
654     c->obuf = sdsempty();
655     c->reader = redisReaderCreate();
656
657     if (c->connection_type == REDIS_CONN_TCP) {
658         return redisContextConnectBindTcp(c, c->tcp.host, c->tcp.port,
659                 c->timeout, c->tcp.source_addr);
660     } else if (c->connection_type == REDIS_CONN_UNIX) {
661         return redisContextConnectUnix(c, c->unix_sock.path, c->timeout);
662     } else {
663         /* Something bad happened here and shouldn't have. There isn't
664            enough information in the context to reconnect. */
665         __redisSetError(c,REDIS_ERR_OTHER,"Not enough information to reconnect");
666     }
667
668     return REDIS_ERR;
669 }
670
671 /* Connect to a Redis instance. On error the field error in the returned
672  * context will be set to the return value of the error function.
673  * When no set of reply functions is given, the default set will be used. */
674 redisContext *redisConnect(const char *ip, int port) {
675     redisContext *c;
676
677     c = redisContextInit();
678     if (c == NULL)
679         return NULL;
680
681     c->flags |= REDIS_BLOCK;
682     redisContextConnectTcp(c,ip,port,NULL);
683     return c;
684 }
685
686 redisContext *redisConnectWithTimeout(const char *ip, int port, const struct timeval tv) {
687     redisContext *c;
688
689     c = redisContextInit();
690     if (c == NULL)
691         return NULL;
692
693     c->flags |= REDIS_BLOCK;
694     redisContextConnectTcp(c,ip,port,&tv);
695     return c;
696 }
697
698 redisContext *redisConnectNonBlock(const char *ip, int port) {
699     redisContext *c;
700
701     c = redisContextInit();
702     if (c == NULL)
703         return NULL;
704
705     c->flags &= ~REDIS_BLOCK;
706     redisContextConnectTcp(c,ip,port,NULL);
707     return c;
708 }
709
710 redisContext *redisConnectBindNonBlock(const char *ip, int port,
711                                        const char *source_addr) {
712     redisContext *c = redisContextInit();
713     c->flags &= ~REDIS_BLOCK;
714     redisContextConnectBindTcp(c,ip,port,NULL,source_addr);
715     return c;
716 }
717
718 redisContext *redisConnectBindNonBlockWithReuse(const char *ip, int port,
719                                                 const char *source_addr) {
720     redisContext *c = redisContextInit();
721     c->flags &= ~REDIS_BLOCK;
722     c->flags |= REDIS_REUSEADDR;
723     redisContextConnectBindTcp(c,ip,port,NULL,source_addr);
724     return c;
725 }
726
727 redisContext *redisConnectUnix(const char *path) {
728     redisContext *c;
729
730     c = redisContextInit();
731     if (c == NULL)
732         return NULL;
733
734     c->flags |= REDIS_BLOCK;
735     redisContextConnectUnix(c,path,NULL);
736     return c;
737 }
738
739 redisContext *redisConnectUnixWithTimeout(const char *path, const struct timeval tv) {
740     redisContext *c;
741
742     c = redisContextInit();
743     if (c == NULL)
744         return NULL;
745
746     c->flags |= REDIS_BLOCK;
747     redisContextConnectUnix(c,path,&tv);
748     return c;
749 }
750
751 redisContext *redisConnectUnixNonBlock(const char *path) {
752     redisContext *c;
753
754     c = redisContextInit();
755     if (c == NULL)
756         return NULL;
757
758     c->flags &= ~REDIS_BLOCK;
759     redisContextConnectUnix(c,path,NULL);
760     return c;
761 }
762
763 redisContext *redisConnectFd(int fd) {
764     redisContext *c;
765
766     c = redisContextInit();
767     if (c == NULL)
768         return NULL;
769
770     c->fd = fd;
771     c->flags |= REDIS_BLOCK | REDIS_CONNECTED;
772     return c;
773 }
774
775 /* Set read/write timeout on a blocking socket. */
776 int redisSetTimeout(redisContext *c, const struct timeval tv) {
777     if (c->flags & REDIS_BLOCK)
778         return redisContextSetTimeout(c,tv);
779     return REDIS_ERR;
780 }
781
782 /* Enable connection KeepAlive. */
783 int redisEnableKeepAlive(redisContext *c) {
784     if (redisKeepAlive(c, REDIS_KEEPALIVE_INTERVAL) != REDIS_OK)
785         return REDIS_ERR;
786     return REDIS_OK;
787 }
788
789 /* Use this function to handle a read event on the descriptor. It will try
790  * and read some bytes from the socket and feed them to the reply parser.
791  *
792  * After this function is called, you may use redisContextReadReply to
793  * see if there is a reply available. */
794 int redisBufferRead(redisContext *c) {
795     char buf[1024*16];
796     int nread;
797
798     /* Return early when the context has seen an error. */
799     if (c->err)
800         return REDIS_ERR;
801
802     nread = read(c->fd,buf,sizeof(buf));
803     if (nread == -1) {
804         if ((errno == EAGAIN && !(c->flags & REDIS_BLOCK)) || (errno == EINTR)) {
805             /* Try again later */
806         } else {
807             __redisSetError(c,REDIS_ERR_IO,NULL);
808             return REDIS_ERR;
809         }
810     } else if (nread == 0) {
811         __redisSetError(c,REDIS_ERR_EOF,"Server closed the connection");
812         return REDIS_ERR;
813     } else {
814         if (redisReaderFeed(c->reader,buf,nread) != REDIS_OK) {
815             __redisSetError(c,c->reader->err,c->reader->errstr);
816             return REDIS_ERR;
817         }
818     }
819     return REDIS_OK;
820 }
821
822 /* Write the output buffer to the socket.
823  *
824  * Returns REDIS_OK when the buffer is empty, or (a part of) the buffer was
825  * succesfully written to the socket. When the buffer is empty after the
826  * write operation, "done" is set to 1 (if given).
827  *
828  * Returns REDIS_ERR if an error occured trying to write and sets
829  * c->errstr to hold the appropriate error string.
830  */
831 int redisBufferWrite(redisContext *c, int *done) {
832     int nwritten;
833
834     /* Return early when the context has seen an error. */
835     if (c->err)
836         return REDIS_ERR;
837
838     if (sdslen(c->obuf) > 0) {
839         nwritten = write(c->fd,c->obuf,sdslen(c->obuf));
840         if (nwritten == -1) {
841             if ((errno == EAGAIN && !(c->flags & REDIS_BLOCK)) || (errno == EINTR)) {
842                 /* Try again later */
843             } else {
844                 __redisSetError(c,REDIS_ERR_IO,NULL);
845                 return REDIS_ERR;
846             }
847         } else if (nwritten > 0) {
848             if (nwritten == (signed)sdslen(c->obuf)) {
849                 sdsfree(c->obuf);
850                 c->obuf = sdsempty();
851             } else {
852                 sdsrange(c->obuf,nwritten,-1);
853             }
854         }
855     }
856     if (done != NULL) *done = (sdslen(c->obuf) == 0);
857     return REDIS_OK;
858 }
859
860 /* Internal helper function to try and get a reply from the reader,
861  * or set an error in the context otherwise. */
862 int redisGetReplyFromReader(redisContext *c, void **reply) {
863     if (redisReaderGetReply(c->reader,reply) == REDIS_ERR) {
864         __redisSetError(c,c->reader->err,c->reader->errstr);
865         return REDIS_ERR;
866     }
867     return REDIS_OK;
868 }
869
870 int redisGetReply(redisContext *c, void **reply) {
871     int wdone = 0;
872     void *aux = NULL;
873
874     /* Try to read pending replies */
875     if (redisGetReplyFromReader(c,&aux) == REDIS_ERR)
876         return REDIS_ERR;
877
878     /* For the blocking context, flush output buffer and read reply */
879     if (aux == NULL && c->flags & REDIS_BLOCK) {
880         /* Write until done */
881         do {
882             if (redisBufferWrite(c,&wdone) == REDIS_ERR)
883                 return REDIS_ERR;
884         } while (!wdone);
885
886         /* Read until there is a reply */
887         do {
888             if (redisBufferRead(c) == REDIS_ERR)
889                 return REDIS_ERR;
890             if (redisGetReplyFromReader(c,&aux) == REDIS_ERR)
891                 return REDIS_ERR;
892         } while (aux == NULL);
893     }
894
895     /* Set reply object */
896     if (reply != NULL) *reply = aux;
897     return REDIS_OK;
898 }
899
900
901 /* Helper function for the redisAppendCommand* family of functions.
902  *
903  * Write a formatted command to the output buffer. When this family
904  * is used, you need to call redisGetReply yourself to retrieve
905  * the reply (or replies in pub/sub).
906  */
907 int __redisAppendCommand(redisContext *c, const char *cmd, size_t len) {
908     sds newbuf;
909
910     newbuf = sdscatlen(c->obuf,cmd,len);
911     if (newbuf == NULL) {
912         __redisSetError(c,REDIS_ERR_OOM,"Out of memory");
913         return REDIS_ERR;
914     }
915
916     c->obuf = newbuf;
917     return REDIS_OK;
918 }
919
920 int redisAppendFormattedCommand(redisContext *c, const char *cmd, size_t len) {
921
922     if (__redisAppendCommand(c, cmd, len) != REDIS_OK) {
923         return REDIS_ERR;
924     }
925
926     return REDIS_OK;
927 }
928
929 int redisvAppendCommand(redisContext *c, const char *format, va_list ap) {
930     char *cmd;
931     int len;
932
933     len = redisvFormatCommand(&cmd,format,ap);
934     if (len == -1) {
935         __redisSetError(c,REDIS_ERR_OOM,"Out of memory");
936         return REDIS_ERR;
937     } else if (len == -2) {
938         __redisSetError(c,REDIS_ERR_OTHER,"Invalid format string");
939         return REDIS_ERR;
940     }
941
942     if (__redisAppendCommand(c,cmd,len) != REDIS_OK) {
943         free(cmd);
944         return REDIS_ERR;
945     }
946
947     free(cmd);
948     return REDIS_OK;
949 }
950
951 int redisAppendCommand(redisContext *c, const char *format, ...) {
952     va_list ap;
953     int ret;
954
955     va_start(ap,format);
956     ret = redisvAppendCommand(c,format,ap);
957     va_end(ap);
958     return ret;
959 }
960
961 int redisAppendCommandArgv(redisContext *c, int argc, const char **argv, const size_t *argvlen) {
962     sds cmd;
963     int len;
964
965     len = redisFormatSdsCommandArgv(&cmd,argc,argv,argvlen);
966     if (len == -1) {
967         __redisSetError(c,REDIS_ERR_OOM,"Out of memory");
968         return REDIS_ERR;
969     }
970
971     if (__redisAppendCommand(c,cmd,len) != REDIS_OK) {
972         sdsfree(cmd);
973         return REDIS_ERR;
974     }
975
976     sdsfree(cmd);
977     return REDIS_OK;
978 }
979
980 /* Helper function for the redisCommand* family of functions.
981  *
982  * Write a formatted command to the output buffer. If the given context is
983  * blocking, immediately read the reply into the "reply" pointer. When the
984  * context is non-blocking, the "reply" pointer will not be used and the
985  * command is simply appended to the write buffer.
986  *
987  * Returns the reply when a reply was succesfully retrieved. Returns NULL
988  * otherwise. When NULL is returned in a blocking context, the error field
989  * in the context will be set.
990  */
991 static void *__redisBlockForReply(redisContext *c) {
992     void *reply;
993
994     if (c->flags & REDIS_BLOCK) {
995         if (redisGetReply(c,&reply) != REDIS_OK)
996             return NULL;
997         return reply;
998     }
999     return NULL;
1000 }
1001
1002 void *redisvCommand(redisContext *c, const char *format, va_list ap) {
1003     if (redisvAppendCommand(c,format,ap) != REDIS_OK)
1004         return NULL;
1005     return __redisBlockForReply(c);
1006 }
1007
1008 void *redisCommand(redisContext *c, const char *format, ...) {
1009     va_list ap;
1010     void *reply = NULL;
1011     va_start(ap,format);
1012     reply = redisvCommand(c,format,ap);
1013     va_end(ap);
1014     return reply;
1015 }
1016
1017 void *redisCommandArgv(redisContext *c, int argc, const char **argv, const size_t *argvlen) {
1018     if (redisAppendCommandArgv(c,argc,argv,argvlen) != REDIS_OK)
1019         return NULL;
1020     return __redisBlockForReply(c);
1021 }