]> granicus.if.org Git - postgresql/blob - src/backend/utils/adt/arrayfuncs.c
- merging in Dr. George's tree with ours
[postgresql] / src / backend / utils / adt / arrayfuncs.c
1 /*-------------------------------------------------------------------------
2  *
3  * arrayfuncs.c--
4  *    Special functions for arrays.
5  *
6  * Copyright (c) 1994, Regents of the University of California
7  *
8  *
9  * IDENTIFICATION
10  *    $Header: /cvsroot/pgsql/src/backend/utils/adt/arrayfuncs.c,v 1.3 1996/07/22 21:56:00 scrappy Exp $
11  *
12  *-------------------------------------------------------------------------
13  */
14 #include <ctype.h>
15 #include <stdio.h>
16
17 #include "postgres.h"
18
19 #include "catalog/catalog.h"
20 #include "catalog/pg_type.h"
21
22 #include "utils/syscache.h"
23 #include "utils/palloc.h"
24 #include "utils/memutils.h"
25 #include "storage/fd.h"         /* for SEEK_ */
26 #include "fmgr.h"
27 #include "utils/elog.h"
28 #include "utils/array.h"
29
30 #include "libpq/libpq-fs.h"
31 #include "libpq/be-fsstubs.h"
32
33 #define ASSGN    "="
34
35 /* An array has the following internal structure:
36  *    <nbytes>            - total number of bytes 
37  *     <ndim>                - number of dimensions of the array 
38  *    <flags>                - bit mask of flags
39  *    <dim>                - size of each array axis 
40  *    <dim_lower>            - lower boundary of each dimension 
41  *    <actual data>        - whatever is the stored data
42 */
43
44 /*-=-=--=-=-=-=-=-=-=-=--=-=-=-=-=-=-=-=--=-=-=-=-=-=-=-=--=-=-=-=-=-=-=-*/
45 static int _ArrayCount(char *str, int dim[], int typdelim);
46 static char *_ReadArrayStr(char *arrayStr, int nitems, int ndim, int dim[],
47                            func_ptr inputproc, Oid typelem, char typdelim,
48                            int typlen, bool typbyval, char typalign,
49                            int *nbytes);
50 static char *_ReadLOArray(char *str, int *nbytes, int *fd, bool *chunkFlag,
51                           int ndim, int dim[], int baseSize);
52 static void _CopyArrayEls(char **values, char *p, int nitems, int typlen,
53                           char typalign, bool typbyval);
54 static void system_cache_lookup(Oid element_type, bool input, int *typlen,
55                 bool *typbyval, char *typdelim, Oid *typelem, Oid *proc,
56                 char *typalign);
57 static Datum _ArrayCast(char *value, bool byval, int len);
58 static char *_AdvanceBy1word(char *str, char **word);
59 static void _ArrayRange(int st[], int endp[], int bsize, char *destPtr,
60                         ArrayType *array, int from);
61 static int _ArrayClipCount(int stI[], int endpI[], ArrayType *array);
62 static void _LOArrayRange(int st[], int endp[], int bsize, int srcfd,
63           int destfd, ArrayType *array, int isSrcLO, bool *isNull);
64 static void _ReadArray (int st[], int endp[], int bsize, int srcfd, int destfd,
65                        ArrayType *array, int isDestLO, bool *isNull);
66 static char *_array_set(ArrayType *array, struct varlena *indx_str,
67                         struct varlena *dataPtr);
68 static ArrayCastAndSet(char *src, bool typbyval, int typlen, char *dest);
69
70
71 /*---------------------------------------------------------------------
72  * array_in : 
73  *        converts an array from the external format in "string" to
74  *        it internal format.
75  * return value :
76  *        the internal representation of the input array
77  *--------------------------------------------------------------------
78  */
79 char *
80 array_in(char *string,          /* input array in external form */
81          Oid element_type)      /* type OID of an array element */
82 {
83     int typlen;
84     bool typbyval, done;
85     char typdelim;
86     Oid typinput;
87     Oid typelem;
88     char *string_save, *p, *q, *r;
89     func_ptr inputproc;
90     int i, nitems, dummy;
91     int32 nbytes;
92     char *dataPtr;
93     ArrayType *retval;
94     int ndim, dim[MAXDIM], lBound[MAXDIM];
95     char typalign;
96     
97     system_cache_lookup(element_type, true, &typlen, &typbyval, &typdelim, 
98                         &typelem, &typinput, &typalign);
99     
100     fmgr_info(typinput, &inputproc, &dummy);
101     
102     string_save = (char *) palloc(strlen(string) + 3);
103     strcpy(string_save, string);
104     
105     /* --- read array dimensions  ---------- */
106     p = q = string_save; done = false;
107     for ( ndim = 0; !done; ) {
108         while (isspace(*p)) p++;
109         if (*p == '[' ) {
110             p++;
111             if ((r = (char *)strchr(p, ':')) == (char *)NULL)
112                 lBound[ndim] = 1;
113             else { 
114                 *r = '\0'; 
115                 lBound[ndim] = atoi(p); 
116                 p = r + 1;
117             }
118             for (q = p; isdigit(*q); q++);
119             if (*q != ']')
120                 elog(WARN, "array_in: missing ']' in array declaration");
121             *q = '\0';
122             dim[ndim] = atoi(p);
123             if ((dim[ndim] < 0) || (lBound[ndim] < 0))
124                 elog(WARN,"array_in: array dimensions need to be positive");
125             dim[ndim] = dim[ndim] - lBound[ndim] + 1;
126             if (dim[ndim] < 0)
127                 elog(WARN, "array_in: upper_bound cannot be < lower_bound");
128             p = q + 1; ndim++;
129         } else {
130             done = true;
131         }
132     }
133     
134     if (ndim == 0) {
135         if (*p == '{') {
136             ndim = _ArrayCount(p, dim, typdelim);
137             for (i = 0; i < ndim; lBound[i++] = 1);
138         } else { 
139             elog(WARN,"array_in: Need to specify dimension");
140         }
141     } else {
142         while (isspace(*p)) p++;
143         if (strncmp(p, ASSGN, strlen(ASSGN)))
144             elog(WARN, "array_in: missing assignment operator");
145         p+= strlen(ASSGN);
146         while (isspace(*p)) p++;
147     }        
148     
149     nitems = getNitems( ndim, dim);
150     if (nitems == 0) {
151         char *emptyArray = palloc(sizeof(ArrayType));
152         memset(emptyArray, 0, sizeof(ArrayType));
153         * (int32 *) emptyArray = sizeof(ArrayType);
154         return emptyArray;
155     }
156     
157     if (*p == '{') {
158         /* array not a large object */
159         dataPtr =
160             (char *) _ReadArrayStr(p, nitems,  ndim, dim, inputproc, typelem,
161                                    typdelim, typlen, typbyval, typalign,
162                                    &nbytes );
163         nbytes += ARR_OVERHEAD(ndim);
164         retval = (ArrayType *) palloc(nbytes);
165         memset(retval,0, nbytes);
166         memmove(retval, (char *)&nbytes, sizeof(int)); 
167         memmove((char*)ARR_NDIM_PTR(retval), (char *)&ndim, sizeof(int)); 
168         SET_LO_FLAG (false, retval);
169         memmove((char *)ARR_DIMS(retval), (char *)dim, ndim*sizeof(int)); 
170         memmove((char *)ARR_LBOUND(retval), (char *)lBound, 
171                 ndim*sizeof(int));
172         /* dataPtr is an array of arbitraystuff even though its type is char*
173            cast to char** to pass to _CopyArrayEls for now  - jolly */
174         _CopyArrayEls((char**)dataPtr,
175                       ARR_DATA_PTR(retval), nitems,
176                       typlen, typalign, typbyval);
177     } else  {
178 #ifdef LOARRAY
179         int dummy, bytes;
180         bool chunked = false;
181         
182         dataPtr = _ReadLOArray(p, &bytes, &dummy, &chunked, ndim, 
183                                dim, typlen );
184         nbytes = bytes + ARR_OVERHEAD(ndim);
185         retval = (ArrayType *) palloc(nbytes);
186         memset(retval, 0,nbytes); 
187         memmove(retval, (char *)&nbytes, sizeof(int));
188         memmove((char *)ARR_NDIM_PTR(retval), (char *)&ndim, sizeof(int));
189         SET_LO_FLAG (true, retval);
190         SET_CHUNK_FLAG (chunked, retval);
191         memmove((char *)ARR_DIMS(retval), (char *)dim, ndim*sizeof(int));
192         memmove((char *)ARR_LBOUND(retval),(char *)lBound, ndim*sizeof(int));
193         memmove(ARR_DATA_PTR(retval), dataPtr, bytes);
194 #endif
195         elog(WARN, "large object arrays not supported");
196     }
197     pfree(string_save);
198     return((char *)retval);
199 }
200
201 /*-----------------------------------------------------------------------------
202  * _ArrayCount --
203  *   Counts the number of dimensions and the dim[] array for an array string. 
204  *       The syntax for array input is C-like nested curly braces 
205  *-----------------------------------------------------------------------------
206  */
207 static int
208 _ArrayCount(char *str, int dim[], int typdelim)
209 {
210     int nest_level = 0, i; 
211     int ndim = 0, temp[MAXDIM];
212     bool scanning_string = false;
213     bool eoArray = false;
214     char *q;
215     
216     for (i = 0; i < MAXDIM; ++i) {
217         temp[i] = dim[i] = 0;
218     }
219     
220     if (strncmp (str, "{}", 2) == 0) return(0); 
221     
222     q = str;
223     while (eoArray != true) {
224         bool done = false;
225         while (!done) {
226             switch (*q) {
227 #ifdef ESCAPE_PATCH
228             case '\\':
229                 /* skip escaped characters (\ and ") inside strings */
230                 if (scanning_string && *(q+1)) {
231                     q++;
232                 }
233                 break;
234 #endif
235             case '\"':
236                 scanning_string = ! scanning_string;
237                 break;
238             case '{':
239                 if (!scanning_string)  { 
240                     temp[nest_level] = 0;
241                     nest_level++;
242                 }
243                 break;
244             case '}':
245                 if (!scanning_string) {
246                     if (!ndim) ndim = nest_level;
247                     nest_level--;
248                     if (nest_level) temp[nest_level-1]++;
249                     if (nest_level == 0) eoArray = done = true;
250                 }
251                 break;
252             default:
253                 if (!ndim) ndim = nest_level;
254                 if (*q == typdelim && !scanning_string )
255                     done = true;
256                 break;
257             }
258             if (!done) q++;
259         }
260         temp[ndim-1]++;
261         q++;
262         if (!eoArray) 
263             while (isspace(*q)) q++;
264     }
265     for (i = 0; i < ndim; ++i) {
266         dim[i] = temp[i];
267     }
268     
269     return(ndim);
270 }
271
272 /*---------------------------------------------------------------------------
273  * _ReadArrayStr :
274  *   parses the array string pointed by "arrayStr" and converts it in the 
275  *   internal format. The external format expected is like C array
276  *   declaration. Unspecified elements are initialized to zero for fixed length
277  *   base types and to empty varlena structures for variable length base
278  *   types.
279  * result :
280  *   returns the internal representation of the array elements
281  *   nbytes is set to the size of the array in its internal representation.
282  *---------------------------------------------------------------------------
283  */
284 static char *
285 _ReadArrayStr(char *arrayStr,
286               int nitems,
287               int ndim,
288               int dim[],
289               func_ptr inputproc, /* function used for the conversion */
290               Oid typelem,
291               char typdelim,
292               int typlen,
293               bool typbyval,
294               char typalign,
295               int *nbytes)
296 {
297     int i, nest_level = 0;
298     char *p, *q, *r, **values;
299     bool scanning_string = false;
300     int indx[MAXDIM], prod[MAXDIM];
301     bool eoArray = false;
302     
303     mda_get_prod(ndim, dim, prod);
304     for (i = 0; i < ndim; indx[i++] = 0);
305     /* read array enclosed within {} */    
306     values = (char **) palloc(nitems * sizeof(char *));
307     memset(values, 0, nitems * sizeof(char *));
308     q = p = arrayStr;
309     
310     while ( ! eoArray ) {
311         bool done = false;
312         int i = -1;
313         
314         while (!done) {
315             switch (*q) {
316             case '\\':
317                 /* Crunch the string on top of the backslash. */
318                 for (r = q; *r != '\0'; r++) *r = *(r+1);
319                 break;
320             case '\"':
321                 if (!scanning_string ) {
322                     while (p != q) p++;
323                     p++; /* get p past first doublequote */
324                 } else 
325                     *q = '\0';
326                 scanning_string = ! scanning_string;
327                 break;
328             case '{':
329                 if (!scanning_string) { 
330                     p++; 
331                     nest_level++;
332                     if (nest_level > ndim)
333                         elog(WARN, "array_in: illformed array constant");
334                     indx[nest_level - 1] = 0;
335                     indx[ndim - 1] = 0;
336                 }
337                 break;
338             case '}':
339                 if (!scanning_string) {
340                     if (i == -1)
341                         i = tuple2linear(ndim, indx, prod); 
342                     nest_level--;
343                     if (nest_level == 0)
344                         eoArray = done = true;
345                     else { 
346                         *q = '\0';
347                         indx[nest_level - 1]++;
348                     }
349                 }
350                 break;
351             default:
352                 if (*q == typdelim && !scanning_string ) {
353                     if (i == -1) 
354                         i = tuple2linear(ndim, indx, prod); 
355                     done = true;
356                     indx[ndim - 1]++;
357                 }
358                 break;
359             }
360             if (!done) 
361                 q++;
362         }
363         *q = '\0';                    
364         if (i >= nitems)
365             elog(WARN, "array_in: illformed array constant");
366         values[i] = (*inputproc) (p, typelem);
367         p = ++q;
368         if (!eoArray)    
369             /* 
370              * if not at the end of the array skip white space 
371              */
372             while (isspace(*q)) {
373                 p++;
374                 q++;
375             }
376     }
377     if (typlen > 0) {
378         *nbytes = nitems * typlen;
379         if (!typbyval)
380             for (i = 0; i < nitems; i++)
381                 if (!values[i]) {
382                     values[i] = palloc(typlen);
383                     memset(values[i], 0, typlen);
384                 }
385     } else {
386         for (i = 0, *nbytes = 0; i < nitems; i++) {
387             if (values[i]) {
388                 if (typalign=='d') {
389                     *nbytes += DOUBLEALIGN(* (int32 *) values[i]);
390                 } else {
391                     *nbytes += INTALIGN(* (int32 *) values[i]);
392                 }
393             } else {
394                 *nbytes += sizeof(int32);
395                 values[i] = palloc(sizeof(int32));
396                 *(int32 *)values[i] = sizeof(int32); 
397             }
398         }
399     }
400     return((char *)values);
401 }
402
403
404 /*----------------------------------------------------------------------------
405  * Read data about an array to be stored as a large object                    
406  *----------------------------------------------------------------------------
407  */
408 static char *
409 _ReadLOArray(char *str,
410              int *nbytes,
411              int *fd,
412              bool *chunkFlag,
413              int ndim,
414              int dim[],
415              int baseSize)
416 {
417     char *inputfile, *accessfile = NULL, *chunkfile = NULL;
418     char *retStr, *_AdvanceBy1word();
419     Oid lobjId;
420     
421     str = _AdvanceBy1word(str, &inputfile); 
422     
423     while (str != NULL) {
424         char *word;
425         
426         str = _AdvanceBy1word(str, &word); 
427         
428         if (!strcmp (word, "-chunk")) {
429             if (str == NULL)
430                 elog(WARN, "array_in: access pattern file required");
431             str = _AdvanceBy1word(str, &accessfile);
432         }
433         else if (!strcmp (word, "-noreorg"))  {
434             if (str == NULL)
435                 elog(WARN, "array_in: chunk file required");
436             str = _AdvanceBy1word(str, &chunkfile);
437         } else {
438             elog(WARN, "usage: <input file> -chunk DEFAULT/<access pattern file> -invert/-native [-noreorg <chunk file>]");
439         }
440     }
441     
442     if (inputfile == NULL) 
443         elog(WARN, "array_in: missing file name");
444     lobjId = lo_creat(0);
445     *fd = lo_open(lobjId, INV_READ);
446     if ( *fd < 0 )
447         elog(WARN, "Large object create failed");
448     retStr = inputfile;
449     *nbytes = strlen(retStr) + 2;
450     
451     if ( accessfile ) {
452         FILE *afd;
453         if ((afd = fopen (accessfile, "r")) == NULL)
454             elog(WARN, "unable to open access pattern file");
455         *chunkFlag = true;
456         retStr = _ChunkArray(*fd, afd, ndim, dim, baseSize, nbytes,
457                              chunkfile);
458     }    
459     return(retStr);
460 }
461
462 static void
463 _CopyArrayEls(char **values, 
464               char *p, 
465               int nitems, 
466               int typlen,
467               char typalign,
468               bool typbyval)
469 {
470     int i;
471     
472     for (i = 0; i < nitems; i++) {
473         int inc;
474         inc = ArrayCastAndSet(values[i], typbyval, typlen, p);
475         p += inc;
476         if (!typbyval) 
477             pfree(values[i]);
478     }
479     pfree(values);
480 }
481
482 /*-------------------------------------------------------------------------
483  * array_out : 
484  *         takes the internal representation of an array and returns a string
485  *        containing the array in its external format.
486  *-------------------------------------------------------------------------
487  */
488 char *
489 array_out(ArrayType *v, Oid element_type)
490 {
491     int typlen;
492     bool typbyval;
493     char typdelim;
494     Oid typoutput, typelem;
495     func_ptr outputproc;
496     char typalign;
497     
498     char *p, *retval, **values, delim[2];
499     int nitems, overall_length, i, j, k, indx[MAXDIM];
500     bool dummy_bool;
501     int dummy_int;
502     int ndim, *dim;
503     
504     if (v == (ArrayType *) NULL)
505         return ((char *) NULL);
506     
507     if (ARR_IS_LO(v) == true)  {
508         char *p, *save_p;
509         int nbytes;
510         
511         /* get a wide string to print to */
512         p = array_dims(v, &dummy_bool);
513         nbytes = strlen(ARR_DATA_PTR(v)) + 4 + *(int *)p;
514         
515         save_p = (char *) palloc(nbytes);
516         
517         strcpy(save_p, p + sizeof(int));
518         strcat(save_p, ASSGN);
519         strcat(save_p, ARR_DATA_PTR(v));
520         pfree(p);
521         return (save_p);
522     }
523     
524     system_cache_lookup(element_type, false, &typlen, &typbyval,
525                         &typdelim, &typelem, &typoutput, &typalign);
526     fmgr_info(typoutput, & outputproc, &dummy_int);
527     sprintf(delim, "%c", typdelim);
528     ndim = ARR_NDIM(v);
529     dim = ARR_DIMS(v);
530     nitems = getNitems(ndim, dim);
531     
532     if (nitems == 0) {
533         char *emptyArray = palloc(3);
534         emptyArray[0] = '{';
535         emptyArray[1] = '}';
536         emptyArray[2] = '\0';
537         return emptyArray;
538     }
539     
540     p = ARR_DATA_PTR(v);
541     overall_length = 1; /* [TRH] don't forget to count \0 at end. */
542     values = (char **) palloc(nitems * sizeof (char *));
543     for (i = 0; i < nitems; i++) {
544         if (typbyval) {
545             switch(typlen) {
546             case 1:
547                 values[i] = (*outputproc) (*p, typelem);
548                 break;
549             case 2:
550                 values[i] = (*outputproc) (* (int16 *) p, typelem);
551                 break;
552             case 3:
553             case 4:
554                 values[i] = (*outputproc) (* (int32 *) p, typelem);
555                 break;
556             }
557             p += typlen;
558         } else {
559             values[i] = (*outputproc) (p, typelem);
560             if (typlen > 0)
561                 p += typlen;
562             else
563                 p += INTALIGN(* (int32 *) p);
564             /*
565              * For the pair of double quotes
566              */
567             overall_length += 2;
568         }
569         overall_length += (strlen(values[i]) + 1);
570     }
571     
572     /* 
573      * count total number of curly braces in output string 
574      */
575     for (i = j = 0, k = 1; i < ndim; k *= dim[i++], j += k);
576     
577     p = (char *) palloc(overall_length + 2*j);        
578     retval = p;
579     
580     strcpy(p, "{");
581     for (i = 0; i < ndim; indx[i++] = 0);
582     j = 0; k = 0;
583     do {
584         for (i = j; i < ndim - 1; i++)
585             strcat(p, "{");
586         /*
587          * Surround anything that is not passed by value in double quotes.
588          * See above for more details.
589          */
590         if (!typbyval) {
591             strcat(p, "\"");
592             strcat(p, values[k]);
593             strcat(p, "\"");
594         } else
595             strcat(p, values[k]);
596         pfree(values[k++]); 
597         
598         for (i = ndim - 1; i >= 0; i--) {
599             indx[i] = (indx[i] + 1)%dim[i];
600             if (indx[i]) {
601                 strcat (p, delim);
602                 break;
603             } else  
604                 strcat (p, "}");
605         }
606         j = i;
607     } while (j  != -1);
608     
609     pfree(values);
610     return(retval);
611 }
612
613 /*-----------------------------------------------------------------------------
614  * array_dims :
615  *        returns the dimension of the array pointed to by "v"
616  *---------------------------------------------------------------------------- 
617  */
618 char *
619 array_dims(ArrayType *v, bool *isNull)
620 {
621     char *p, *save_p;
622     int nbytes, i;
623     int *dimv, *lb;
624     
625     if (v == (ArrayType *) NULL) RETURN_NULL;
626     nbytes = ARR_NDIM(v)*33;    
627     /* 
628      * 33 since we assume 15 digits per number + ':' +'[]' 
629      */         
630     save_p = p =  (char *) palloc(nbytes + 4);
631     memset(save_p, 0, nbytes + 4);
632     dimv = ARR_DIMS(v); lb = ARR_LBOUND(v);
633     p += 4;
634     for (i = 0; i < ARR_NDIM(v); i++) {
635         sprintf(p, "[%d:%d]", lb[i], dimv[i]+lb[i]-1);
636         p += strlen(p);
637     }
638     nbytes = strlen(save_p + 4) + 4;
639     memmove(save_p, &nbytes,4); 
640     return (save_p);
641
642
643 /*---------------------------------------------------------------------------
644  * array_ref :
645  *    This routing takes an array pointer and an index array and returns
646  *    a pointer to the referred element if element is passed by 
647  *    reference otherwise returns the value of the referred element.
648  *---------------------------------------------------------------------------
649  */
650 Datum
651 array_ref(ArrayType *array,
652           int n,
653           int indx[],
654           int reftype,
655           int elmlen,
656           int arraylen,
657           bool *isNull)
658 {
659     int i, ndim, *dim, *lb, offset, nbytes;
660     struct varlena *v;
661     char *retval;
662     
663     if (array == (ArrayType *) NULL) RETURN_NULL;
664     if (arraylen > 0) {
665         /*
666          * fixed length arrays -- these are assumed to be 1-d
667          */
668         if (indx[0]*elmlen > arraylen) 
669             elog(WARN, "array_ref: array bound exceeded");
670         retval = (char *)array + indx[0]*elmlen;
671         return _ArrayCast(retval, reftype, elmlen);
672     }
673     dim = ARR_DIMS(array);
674     lb = ARR_LBOUND(array);
675     ndim = ARR_NDIM(array);
676     nbytes = (* (int32 *) array) - ARR_OVERHEAD(ndim);
677     
678     if (!SanityCheckInput(ndim, n,  dim, lb, indx))
679         RETURN_NULL;
680     
681     offset = GetOffset(n, dim, lb, indx);
682     
683     if (ARR_IS_LO(array)) {
684         char * lo_name;
685         int fd;
686         
687         /* We are assuming fixed element lengths here */
688         offset *= elmlen;
689         lo_name = (char *)ARR_DATA_PTR(array);
690 #ifdef LOARRAY
691         if ((fd = LOopen(lo_name, ARR_IS_INV(array)?INV_READ:O_RDONLY)) < 0)
692             RETURN_NULL;
693 #endif  
694         if (ARR_IS_CHUNKED(array))
695             v = _ReadChunkArray1El(indx, elmlen, fd, array, isNull);
696         else {
697             if (lo_lseek(fd, offset, SEEK_SET) < 0)
698                 RETURN_NULL;
699 #ifdef LOARRAY
700             v = (struct varlena *) LOread(fd, elmlen);
701 #endif
702         }
703         if (*isNull) RETURN_NULL;
704         if (VARSIZE(v) - 4 < elmlen)
705             RETURN_NULL;
706         (void) lo_close(fd);
707         retval  = (char *)_ArrayCast((char *)VARDATA(v), reftype, elmlen);
708         if ( reftype == 0) { /* not by value */
709             char * tempdata = palloc (elmlen);
710             memmove(tempdata, retval, elmlen);
711             retval = tempdata;
712         }
713         pfree(v);
714         return (Datum) retval;
715     }
716     
717     if (elmlen >  0) {
718         offset = offset * elmlen;
719         /*  off the end of the array */
720         if (nbytes - offset < 1) RETURN_NULL;
721         retval = ARR_DATA_PTR (array) + offset;
722         return _ArrayCast(retval, reftype, elmlen);
723     } else {
724         bool done = false;
725         char *temp;
726         int bytes = nbytes;
727         temp = ARR_DATA_PTR (array);
728         i = 0;
729         while (bytes > 0 && !done) {
730             if (i == offset) {
731                 retval = temp;
732                 done = true;
733             }
734             bytes -= INTALIGN(* (int32 *) temp);
735             temp += INTALIGN(* (int32 *) temp);
736             i++;
737         }
738         if (! done) 
739             RETURN_NULL;
740         return (Datum) retval;
741     }
742 }
743
744 /*-----------------------------------------------------------------------------
745  * array_clip :
746  *        This routine takes an array and a range of indices (upperIndex and 
747  *         lowerIndx), creates a new array structure for the referred elements 
748  *         and returns a pointer to it.
749  *-----------------------------------------------------------------------------
750  */
751 Datum
752 array_clip(ArrayType *array,
753            int n,
754            int upperIndx[],
755            int lowerIndx[],
756            int reftype,
757            int len,
758            bool *isNull)
759 {
760     int i, ndim, *dim, *lb, nbytes;
761     ArrayType *newArr; 
762     int bytes, span[MAXDIM];
763     
764     /* timer_start(); */
765     if (array == (ArrayType *) NULL) 
766         RETURN_NULL;
767     dim = ARR_DIMS(array);
768     lb = ARR_LBOUND(array);
769     ndim = ARR_NDIM(array);
770     nbytes = (* (int32 *) array) - ARR_OVERHEAD(ndim);
771     
772     if (!SanityCheckInput(ndim, n,  dim, lb, upperIndx))
773         RETURN_NULL;
774     
775     if (!SanityCheckInput(ndim, n, dim, lb, lowerIndx))
776         RETURN_NULL;
777     
778     for (i = 0; i < n; i++)
779         if (lowerIndx[i] > upperIndx[i])
780             elog(WARN, "lowerIndex cannot be larger than upperIndx"); 
781     mda_get_range(n, span, lowerIndx, upperIndx);
782     
783     if (ARR_IS_LO(array)) {
784         char * lo_name, * newname;
785         int fd, newfd, isDestLO = true, rsize;
786         
787         if (len < 0) 
788             elog(WARN, "array_clip: array of variable length objects not supported");  
789 #ifdef LOARRAY
790         lo_name = (char *)ARR_DATA_PTR(array);
791         if ((fd = LOopen(lo_name, ARR_IS_INV(array)?INV_READ:O_RDONLY)) < 0)
792             RETURN_NULL;
793         newname = _array_newLO( &newfd, Unix );
794 #endif
795         bytes = strlen(newname) + 1 + ARR_OVERHEAD(n);
796         newArr = (ArrayType *) palloc(bytes);
797         memmove(newArr, array, sizeof(ArrayType));
798         memmove(newArr, &bytes, sizeof(int));
799         memmove(ARR_DIMS(newArr), span, n*sizeof(int));
800         memmove(ARR_LBOUND(newArr), lowerIndx, n*sizeof(int));
801         strcpy(ARR_DATA_PTR(newArr), newname);
802         
803         rsize = compute_size (lowerIndx, upperIndx, n, len);
804         if (rsize < MAX_BUFF_SIZE) { 
805             char *buff;
806             rsize += 4;
807             buff = palloc(rsize);
808             if ( buff ) 
809                 isDestLO = false;
810             if (ARR_IS_CHUNKED(array)) {
811                 _ReadChunkArray(lowerIndx, upperIndx, len, fd, &(buff[4]), 
812                                 array,0,isNull);
813             } else { 
814                 _ReadArray(lowerIndx, upperIndx, len, fd, (int)&(buff[4]),
815                            array, 
816                            0,isNull);
817             }
818             memmove(buff, &rsize, 4);
819 #ifdef LOARRAY
820             if (! *isNull)
821                 bytes = LOwrite(newfd, (struct varlena *)buff);
822 #endif
823             pfree (buff);
824         }
825         if (isDestLO)   
826             if (ARR_IS_CHUNKED(array)) {
827                 _ReadChunkArray(lowerIndx, upperIndx, len, fd, (char*)newfd, array,
828                                 1,isNull);
829             } else {
830                 _ReadArray(lowerIndx, upperIndx, len, fd, newfd, array, 1,isNull);
831             }
832 #ifdef LOARRAY
833         (void) LOclose(fd);
834         (void) LOclose(newfd);
835 #endif
836         if (*isNull) { 
837             pfree(newArr); 
838             newArr = NULL;
839         }
840         /* timer_end(); */
841         return ((Datum) newArr);
842     }
843     
844     if (len >  0) {
845         bytes = getNitems(n, span);
846         bytes = bytes*len + ARR_OVERHEAD(n);
847     } else {
848         bytes = _ArrayClipCount(lowerIndx, upperIndx, array);
849         bytes += ARR_OVERHEAD(n);
850     }
851     newArr = (ArrayType *) palloc(bytes);
852     memmove(newArr, array, sizeof(ArrayType));
853     memmove(newArr, &bytes, sizeof(int));
854     memmove(ARR_DIMS(newArr), span, n*sizeof(int));
855     memmove(ARR_LBOUND(newArr), lowerIndx, n*sizeof(int));
856     _ArrayRange(lowerIndx, upperIndx, len, ARR_DATA_PTR(newArr), array, 1);
857     return (Datum) newArr;
858 }
859
860 /*-----------------------------------------------------------------------------
861  * array_set :
862  *        This routine sets the value of an array location (specified by an index array)
863  *        to a new value specified by "dataPtr".
864  * result :
865  *        returns a pointer to the modified array.
866  *-----------------------------------------------------------------------------
867  */
868 char *
869 array_set(ArrayType *array,
870           int n,
871           int indx[],
872           char *dataPtr,
873           int reftype,
874           int elmlen,
875           int arraylen,
876           bool *isNull)
877 {
878     int ndim, *dim, *lb, offset, nbytes;
879     char *pos;
880     
881     if (array == (ArrayType *) NULL) 
882         RETURN_NULL;
883     if (arraylen > 0) {
884         /*
885          * fixed length arrays -- these are assumed to be 1-d
886          */
887         if (indx[0]*elmlen > arraylen) 
888 #ifdef ARRAY_PATCH
889             elog(WARN, "array_ref: array bound exceeded");
890 #else
891             elog(WARN, "array_set: array bound exceeded");
892 #endif
893         pos = (char *)array + indx[0]*elmlen;
894         ArrayCastAndSet(dataPtr, (bool) reftype, elmlen, pos);
895         return((char *)array);
896     }
897     dim = ARR_DIMS(array);
898     lb = ARR_LBOUND(array);
899     ndim = ARR_NDIM(array);
900     nbytes = (* (int32 *) array) - ARR_OVERHEAD(ndim);
901     
902     if (!SanityCheckInput(ndim, n,  dim, lb, indx)) 
903 #ifdef ARRAY_PATCH
904     {
905         elog(WARN, "array_set: array bound exceeded");
906         return((char *)array);
907     }
908 #else
909         return((char *)array);
910 #endif
911     offset = GetOffset( n, dim, lb, indx);
912     
913     if (ARR_IS_LO(array)) {
914         int fd;
915         char * lo_name;
916         struct varlena *v;
917         
918         /* We are assuming fixed element lengths here */
919         offset *= elmlen;
920 #ifdef LOARRAY
921         lo_name = ARR_DATA_PTR(array);
922         if ((fd = LOopen(lo_name, ARR_IS_INV(array)?INV_WRITE:O_WRONLY)) < 0)
923             return((char *)array);
924 #endif  
925         if (lo_lseek(fd, offset, SEEK_SET) < 0)
926             return((char *)array);
927         v = (struct varlena *) palloc(elmlen + 4);
928         VARSIZE (v) = elmlen + 4;
929         ArrayCastAndSet(dataPtr, (bool) reftype, elmlen, VARDATA(v));
930 #ifdef LOARRAY
931         n =  LOwrite(fd, v);
932 #endif
933         /* if (n < VARSIZE(v) - 4) 
934            RETURN_NULL;
935            */
936         pfree(v);
937         (void) lo_close(fd);
938         return((char *)array);
939     }
940     if (elmlen >  0) {
941         offset = offset * elmlen;
942         /*  off the end of the array */
943         if (nbytes - offset < 1) return((char *)array);
944         pos = ARR_DATA_PTR (array) + offset;
945     } else {
946 #ifdef ARRAY_PATCH
947         ArrayType *newarray;
948         char *elt_ptr;
949         int oldsize, newsize, oldlen, newlen, lth0, lth1, lth2;
950
951         elt_ptr = array_seek(ARR_DATA_PTR(array), -1, offset);
952         oldlen = INTALIGN(*(int32 *)elt_ptr);
953         newlen = INTALIGN(*(int32 *)dataPtr);
954
955         if (oldlen == newlen) {
956             /* new element with same size, overwrite old data */
957             ArrayCastAndSet(dataPtr, (bool)reftype, elmlen, elt_ptr);
958             return((char *)array);
959         }
960
961         /* new element with different size, reallocate the array */
962         oldsize = array->size;
963         lth0 = ARR_OVERHEAD(n);
964         lth1 = (int)(elt_ptr - ARR_DATA_PTR(array));
965         lth2 = (int)(oldsize - lth0 - lth1 - oldlen);
966         newsize = lth0 + lth1 + newlen + lth2;
967
968         newarray = (ArrayType *)palloc(newsize);
969         memmove((char *)newarray, (char *)array, lth0+lth1);
970         newarray->size = newsize;
971         newlen = ArrayCastAndSet(dataPtr, (bool)reftype, elmlen,
972                                  (char *)newarray+lth0+lth1);
973         memmove((char *)newarray+lth0+lth1+newlen,
974                 (char *)array+lth0+lth1+oldlen, lth2);
975
976         /* ??? who should free this storage ??? */
977         return((char *)newarray);
978 #else
979         elog(WARN, "array_set: update of variable length fields not supported");
980 #endif
981     } 
982     ArrayCastAndSet(dataPtr, (bool) reftype, elmlen, pos);
983     return((char *)array);
984 }
985
986 /*----------------------------------------------------------------------------
987  * array_assgn :
988  *        This routine sets the value of a range of array locations (specified
989  *        by upper and lower index values ) to new values passed as 
990  *        another array
991  * result :
992  *        returns a pointer to the modified array.
993  *----------------------------------------------------------------------------
994  */
995 char *
996 array_assgn(ArrayType *array,
997             int n,
998             int upperIndx[],
999             int lowerIndx[],
1000             ArrayType *newArr,
1001             int reftype,
1002             int len,
1003             bool *isNull)
1004 {
1005     int i, ndim, *dim, *lb;
1006     
1007     if (array == (ArrayType *) NULL) 
1008         RETURN_NULL;
1009     if (len < 0) 
1010         elog(WARN,"array_assgn:updates on arrays of variable length elements not allowed");
1011     
1012     dim = ARR_DIMS(array);
1013     lb = ARR_LBOUND(array);
1014     ndim = ARR_NDIM(array);
1015     
1016     if (!SanityCheckInput(ndim, n,  dim, lb, upperIndx) || 
1017         !SanityCheckInput(ndim, n, dim, lb, lowerIndx)) {
1018         return((char *)array);
1019     }
1020     
1021     for (i = 0; i < n; i++)
1022         if (lowerIndx[i] > upperIndx[i])
1023             elog(WARN, "lowerIndex larger than upperIndx"); 
1024     
1025     if (ARR_IS_LO(array)) {
1026         char * lo_name;
1027         int fd, newfd;
1028         
1029 #ifdef LOARRAY
1030         lo_name = (char *)ARR_DATA_PTR(array);
1031         if ((fd = LOopen(lo_name,  ARR_IS_INV(array)?INV_WRITE:O_WRONLY)) < 0)
1032             return((char *)array);
1033 #endif
1034         if (ARR_IS_LO(newArr)) {
1035 #ifdef LOARRAY
1036             lo_name = (char *)ARR_DATA_PTR(newArr);
1037             if ((newfd = LOopen(lo_name, ARR_IS_INV(newArr)?INV_READ:O_RDONLY)) < 0)
1038                 return((char *)array);
1039 #endif
1040             _LOArrayRange(lowerIndx, upperIndx, len, fd, newfd, array, 1, isNull);
1041             (void) lo_close(newfd);
1042         } else {
1043             _LOArrayRange(lowerIndx, upperIndx, len, fd, (int)ARR_DATA_PTR(newArr), 
1044                           array, 0, isNull);
1045         }
1046         (void) lo_close(fd);
1047         return ((char *) array);
1048     }
1049     _ArrayRange(lowerIndx, upperIndx, len, ARR_DATA_PTR(newArr), array, 0);
1050     return (char *) array;
1051 }
1052
1053 /*-----------------------------------------------------------------------------
1054  * array_eq :
1055  *        compares two arrays for equality    
1056  * result :
1057  *        returns 1 if the arrays are equal, 0 otherwise.
1058  *-----------------------------------------------------------------------------
1059  */
1060 int
1061 array_eq (ArrayType *array1, ArrayType *array2)
1062 {
1063     if ((array1 == NULL) || (array2 == NULL))
1064         return(0);
1065     if (*(int *)array1 != *(int *)array2)
1066         return (0);
1067     if (memcmp(array1, array2, *(int *)array1))
1068         return(0);
1069     return(1);
1070 }
1071
1072 /***************************************************************************/
1073 /******************|          Support  Routines           |*****************/
1074 /***************************************************************************/
1075 static void
1076 system_cache_lookup(Oid element_type,
1077                     bool input,
1078                     int *typlen,
1079                     bool *typbyval,
1080                     char *typdelim,
1081                     Oid *typelem,
1082                     Oid *proc,
1083                     char *typalign)
1084 {
1085     HeapTuple typeTuple;
1086     TypeTupleForm typeStruct;
1087     
1088     typeTuple = SearchSysCacheTuple(TYPOID, ObjectIdGetDatum(element_type),
1089                                     0,0,0);
1090     
1091     if (!HeapTupleIsValid(typeTuple)) {
1092         elog(WARN, "array_out: Cache lookup failed for type %d\n",
1093              element_type);
1094         return;
1095     }
1096     typeStruct = (TypeTupleForm) GETSTRUCT(typeTuple);
1097     *typlen    = typeStruct->typlen;
1098     *typbyval  = typeStruct->typbyval;
1099     *typdelim  = typeStruct->typdelim;
1100     *typelem   = typeStruct->typelem;
1101     *typalign  = typeStruct->typalign;
1102     if (input) {
1103         *proc = typeStruct->typinput;
1104     } else {
1105         *proc = typeStruct->typoutput;
1106     }
1107 }
1108
1109 static Datum
1110 _ArrayCast(char *value, bool byval, int len)
1111 {
1112     if (byval) {
1113         switch (len) {
1114         case 1:
1115             return((Datum) * value);
1116         case 2:
1117             return((Datum) * (int16 *) value);
1118         case 3:
1119         case 4:
1120             return((Datum) * (int32 *) value);
1121         default:
1122             elog(WARN, "array_ref: byval and elt len > 4!");
1123             break;
1124         }
1125     } else {
1126         return (Datum) value;
1127     }
1128  return 0;
1129 }
1130
1131
1132 static int
1133 ArrayCastAndSet(char *src,
1134                 bool typbyval,
1135                 int typlen,
1136                 char *dest)
1137 {
1138     int inc;
1139     
1140     if (typlen > 0) {
1141         if (typbyval) {
1142             switch(typlen) {
1143             case 1: 
1144                 *dest = DatumGetChar(src);
1145                 break;
1146             case 2: 
1147                 * (int16 *) dest = DatumGetInt16(src);
1148                 break;
1149             case 4:
1150                 * (int32 *) dest = (int32)src;
1151                 break;
1152             }
1153         } else {
1154             memmove(dest, src, typlen);
1155         }
1156         inc = typlen;
1157     } else {
1158         memmove(dest, src, *(int32 *)src);
1159         inc = (INTALIGN(* (int32 *) src));
1160     }
1161     return(inc);
1162
1163
1164 static char *
1165 _AdvanceBy1word(char *str, char **word)
1166 {
1167     char *retstr, *space;
1168     
1169     *word = NULL;
1170     if (str == NULL) return str;
1171     while (isspace(*str)) str++;
1172     *word = str;
1173     if ((space = (char *)strchr(str, ' ')) != (char *) NULL) {
1174         retstr = space + 1;
1175         *space = '\0';
1176     }
1177     else 
1178         retstr = NULL;
1179     return retstr;
1180 }
1181
1182 int
1183 SanityCheckInput(int ndim, int n, int dim[], int lb[], int indx[])
1184 {
1185     int i;
1186     /* Do Sanity check on input */
1187     if (n != ndim) return 0;
1188     for (i = 0; i < ndim; i++)
1189         if ((lb[i] > indx[i]) || (indx[i] >= (dim[i] + lb[i])))
1190             return 0;
1191     return 1;
1192 }
1193
1194 static void
1195 _ArrayRange(int st[],
1196             int endp[],
1197             int bsize,
1198             char *destPtr,
1199             ArrayType *array,
1200             int from)
1201 {
1202     int n, *dim, *lb, st_pos, prod[MAXDIM];
1203     int span[MAXDIM], dist[MAXDIM], indx[MAXDIM];
1204     int i, j, inc;
1205     char *srcPtr, *array_seek();
1206     
1207     n = ARR_NDIM(array); dim = ARR_DIMS(array);
1208     lb = ARR_LBOUND(array); srcPtr = ARR_DATA_PTR(array);
1209     for (i = 0; i < n; st[i] -= lb[i], endp[i] -= lb[i], i++); 
1210     mda_get_prod(n, dim, prod);
1211     st_pos = tuple2linear(n, st, prod);
1212     srcPtr = array_seek(srcPtr, bsize, st_pos);
1213     mda_get_range(n, span, st, endp);
1214     mda_get_offset_values(n, dist, prod, span);
1215     for (i=0; i < n; indx[i++]=0);
1216     i = j = n-1; inc = bsize;
1217     do {
1218         srcPtr = array_seek(srcPtr, bsize,  dist[j]); 
1219         if (from) 
1220             inc = array_read(destPtr, bsize, 1, srcPtr);
1221         else 
1222             inc = array_read(srcPtr, bsize, 1, destPtr);
1223         destPtr += inc; srcPtr += inc;
1224     } while ((j = next_tuple(i+1, indx, span)) != -1);
1225 }
1226
1227 static int
1228 _ArrayClipCount(int stI[], int endpI[], ArrayType *array)
1229 {
1230     int n, *dim, *lb, st_pos, prod[MAXDIM];
1231     int span[MAXDIM], dist[MAXDIM], indx[MAXDIM];
1232     int i, j, inc, st[MAXDIM], endp[MAXDIM];
1233     int count = 0;
1234     char *ptr, *array_seek();
1235     
1236     n = ARR_NDIM(array); dim = ARR_DIMS(array);
1237     lb = ARR_LBOUND(array); ptr = ARR_DATA_PTR(array);
1238     for (i = 0; i < n; st[i] = stI[i]-lb[i], endp[i]=endpI[i]-lb[i], i++);
1239     mda_get_prod(n, dim, prod);
1240     st_pos = tuple2linear(n, st, prod);
1241     ptr = array_seek(ptr, -1, st_pos);
1242     mda_get_range(n, span, st, endp);
1243     mda_get_offset_values(n, dist, prod, span);
1244     for (i=0; i < n; indx[i++]=0);
1245     i = j = n-1;
1246     do {
1247         ptr = array_seek(ptr, -1,  dist[j]);
1248         inc =  INTALIGN(* (int32 *) ptr);
1249         ptr += inc; count += inc;
1250     } while ((j = next_tuple(i+1, indx, span)) != -1);
1251     return count;
1252 }
1253
1254 char *
1255 array_seek(char *ptr, int eltsize, int nitems)
1256 {
1257     int i;
1258     
1259     if (eltsize > 0) 
1260         return(ptr + eltsize*nitems);
1261     for (i = 0; i < nitems; i++) 
1262         ptr += INTALIGN(* (int32 *) ptr);
1263     return(ptr);
1264 }
1265
1266 int
1267 array_read(char *destptr, int eltsize, int nitems, char *srcptr)
1268 {
1269     int i, inc, tmp;
1270     
1271     if (eltsize > 0)  {
1272         memmove(destptr, srcptr, eltsize*nitems);
1273         return(eltsize*nitems);
1274     }
1275     for (i = inc = 0; i < nitems; i++) {
1276         tmp = (INTALIGN(* (int32 *) srcptr));
1277         memmove(destptr, srcptr, tmp);
1278         srcptr += tmp;
1279         destptr += tmp;
1280         inc += tmp;
1281     }
1282     return(inc);
1283 }
1284
1285 static void
1286 _LOArrayRange(int st[],
1287               int endp[],
1288               int bsize,
1289               int srcfd,
1290               int destfd,
1291               ArrayType *array,
1292               int isSrcLO,
1293               bool *isNull)
1294 {
1295     int n, *dim, st_pos, prod[MAXDIM];
1296     int span[MAXDIM], dist[MAXDIM], indx[MAXDIM];
1297     int i, j, inc, tmp, *lb, offset;
1298     
1299     n = ARR_NDIM(array); dim = ARR_DIMS(array);
1300     lb = ARR_LBOUND(array);
1301     for (i = 0; i < n; st[i] -= lb[i], endp[i] -= lb[i], i++);
1302     
1303     mda_get_prod(n, dim, prod);
1304     st_pos = tuple2linear(n, st, prod);
1305     offset = st_pos*bsize;
1306     if (lo_lseek(srcfd, offset, SEEK_SET) < 0) 
1307         return;
1308     mda_get_range(n, span, st, endp);
1309     mda_get_offset_values(n, dist, prod, span);
1310     for (i=0; i < n; indx[i++]=0);
1311     for (i = n-1, inc = bsize; i >= 0; inc *= span[i--])
1312         if (dist[i]) 
1313             break;
1314     j = n-1;
1315     do {
1316         offset += (dist[j]*bsize);
1317         if (lo_lseek(srcfd,  offset, SEEK_SET) < 0) 
1318             return;
1319         tmp = _LOtransfer((char**)&srcfd, inc, 1, (char**)&destfd, isSrcLO, 1);
1320         if ( tmp < inc )
1321             return;
1322         offset += inc;
1323     } while ((j = next_tuple(i+1, indx, span)) != -1);
1324 }
1325
1326
1327 static void
1328 _ReadArray (int st[],
1329             int endp[],
1330             int bsize,
1331             int srcfd,
1332             int destfd,
1333             ArrayType *array,
1334             int isDestLO,
1335             bool *isNull)
1336 {
1337     int n, *dim, st_pos, prod[MAXDIM];
1338     int span[MAXDIM], dist[MAXDIM], indx[MAXDIM];
1339     int i, j, inc, tmp, *lb, offset;
1340     
1341     n = ARR_NDIM(array); dim = ARR_DIMS(array);
1342     lb = ARR_LBOUND(array);
1343     for (i = 0; i < n; st[i] -= lb[i], endp[i] -= lb[i], i++);
1344     
1345     mda_get_prod(n, dim, prod);
1346     st_pos = tuple2linear(n, st, prod);
1347     offset = st_pos*bsize;
1348     if (lo_lseek(srcfd, offset, SEEK_SET) < 0)
1349         return;
1350     mda_get_range(n, span, st, endp);
1351     mda_get_offset_values(n, dist, prod, span);
1352     for (i=0; i < n; indx[i++]=0);
1353     for (i = n-1, inc = bsize; i >= 0; inc *= span[i--])
1354         if (dist[i]) 
1355             break;
1356     j = n-1;
1357     do {
1358         offset += (dist[j]*bsize);
1359         if (lo_lseek(srcfd,  offset, SEEK_SET) < 0) 
1360             return;
1361         tmp = _LOtransfer((char**)&destfd, inc, 1, (char**)&srcfd, 1, isDestLO);
1362         if ( tmp < inc ) 
1363             return;
1364         offset += inc;
1365     } while ((j = next_tuple(i+1, indx, span)) != -1);
1366 }
1367
1368
1369 int
1370 _LOtransfer(char **destfd,
1371             int size,
1372             int nitems,
1373             char **srcfd,
1374             int isSrcLO,
1375             int isDestLO)
1376 {
1377 #define MAX_READ (512 * 1024)
1378 #define min(a, b) (a < b ? a : b)
1379     struct varlena *v;
1380     int tmp, inc, resid;
1381     
1382     inc = nitems*size; 
1383     if (isSrcLO && isDestLO && inc > 0)
1384         for (tmp = 0, resid = inc;
1385              resid > 0 && (inc = min(resid, MAX_READ)) > 0; resid -= inc) { 
1386 #ifdef LOARRAY
1387             v = (struct varlena *) LOread((int) *srcfd, inc);
1388             if (VARSIZE(v) - 4 < inc) 
1389                 {pfree(v); return(-1);}
1390             tmp += LOwrite((int) *destfd, v);    
1391 #endif
1392             pfree(v);
1393             
1394         } 
1395     else if (!isSrcLO && isDestLO) {
1396         tmp = lo_write((int) *destfd, *srcfd, inc);
1397         *srcfd = *srcfd + tmp;
1398     } 
1399     else if (isSrcLO && !isDestLO) {
1400         tmp = lo_read((int) *srcfd, *destfd, inc);
1401         *destfd = *destfd + tmp;
1402     } 
1403     else {
1404         memmove(*destfd, *srcfd, inc);
1405         tmp = inc;
1406         *srcfd += inc;
1407         *destfd += inc;
1408     }
1409     return(tmp);
1410 #undef MAX_READ
1411 }
1412
1413 char *
1414 _array_newLO(int *fd, int flag)
1415 {
1416     char *p;
1417     char saveName[NAME_LEN];
1418     
1419     p = (char *) palloc(NAME_LEN);
1420     sprintf(p, "/Arry.%d", newoid());
1421     strcpy (saveName, p);
1422 #ifdef LOARRAY
1423     if ( (*fd = LOcreat (saveName, 0600, flag)) < 0)
1424         elog(WARN, "Large object create failed");
1425 #endif
1426     return (p);
1427 }
1428