]> granicus.if.org Git - postgresql/blob - src/backend/utils/adt/arrayfuncs.c
There's a patch attached to fix gcc 2.8.x warnings, except for the
[postgresql] / src / backend / utils / adt / arrayfuncs.c
1 /*-------------------------------------------------------------------------
2  *
3  * arrayfuncs.c--
4  *        Special functions for arrays.
5  *
6  * Copyright (c) 1994, Regents of the University of California
7  *
8  *
9  * IDENTIFICATION
10  *        $Header: /cvsroot/pgsql/src/backend/utils/adt/arrayfuncs.c,v 1.29 1998/03/30 16:47:23 momjian Exp $
11  *
12  *-------------------------------------------------------------------------
13  */
14
15 #include <ctype.h>
16 #include <stdio.h>
17 #include <string.h>
18
19 #include "postgres.h"
20
21 #include "catalog/catalog.h"
22 #include "catalog/pg_type.h"
23
24 #include "utils/syscache.h"
25 #include "utils/memutils.h"
26 #include "storage/fd.h"
27 #include "fmgr.h"
28 #include "utils/array.h"
29
30 #include "libpq/libpq-fs.h"
31 #include "libpq/be-fsstubs.h"
32
33 #define ASSGN    "="
34
35 /* An array has the following internal structure:
36  *        <nbytes>              - total number of bytes
37  *        <ndim>                - number of dimensions of the array
38  *        <flags>               - bit mask of flags
39  *        <dim>                 - size of each array axis
40  *        <dim_lower>   - lower boundary of each dimension
41  *        <actual data> - whatever is the stored data
42  */
43
44 /*-=-=--=-=-=-=-=-=-=-=--=-=-=-=-=-=-=-=--=-=-=-=-=-=-=-=--=-=-=-=-=-=-=-*/
45 static int      _ArrayCount(char *str, int dim[], int typdelim);
46 static char *
47 _ReadArrayStr(char *arrayStr, int nitems, int ndim, int dim[],
48                           FmgrInfo *inputproc, Oid typelem, int16 typmod,
49                           char typdelim, int typlen, bool typbyval,
50                           char typalign, int *nbytes);
51
52 #ifdef LOARRAY
53 static char *
54 _ReadLOArray(char *str, int *nbytes, int *fd, bool *chunkFlag,
55                          int ndim, int dim[], int baseSize);
56
57 #endif
58 static void
59 _CopyArrayEls(char **values, char *p, int nitems, int typlen,
60                           char typalign, bool typbyval);
61 static void
62 system_cache_lookup(Oid element_type, bool input, int *typlen,
63                                  bool *typbyval, char *typdelim, Oid *typelem, Oid *proc,
64                                         char *typalign);
65 static Datum _ArrayCast(char *value, bool byval, int len);
66
67 #ifdef LOARRAY
68 static char *_AdvanceBy1word(char *str, char **word);
69
70 #endif
71 static void
72 _ArrayRange(int st[], int endp[], int bsize, char *destPtr,
73                         ArrayType *array, int from);
74 static int      _ArrayClipCount(int stI[], int endpI[], ArrayType *array);
75 static void
76 _LOArrayRange(int st[], int endp[], int bsize, int srcfd,
77                           int destfd, ArrayType *array, int isSrcLO, bool *isNull);
78 static void
79 _ReadArray(int st[], int endp[], int bsize, int srcfd, int destfd,
80                    ArrayType *array, int isDestLO, bool *isNull);
81 static int  ArrayCastAndSet(char *src, bool typbyval, int typlen, char *dest);
82 static int  SanityCheckInput(int ndim, int n, int dim[], int lb[], int indx[]);
83 static int      array_read(char *destptr, int eltsize, int nitems, char *srcptr);
84 static char *array_seek(char *ptr, int eltsize, int nitems);
85
86 /*---------------------------------------------------------------------
87  * array_in :
88  *                converts an array from the external format in "string" to
89  *                it internal format.
90  * return value :
91  *                the internal representation of the input array
92  *--------------------------------------------------------------------
93  */
94 char *
95 array_in(char *string,                  /* input array in external form */
96                  Oid element_type,              /* type OID of an array element */
97                  int16 typmod)
98 {
99         int                     typlen;
100         bool            typbyval,
101                                 done;
102         char            typdelim;
103         Oid                     typinput;
104         Oid                     typelem;
105         char       *string_save,
106                            *p,
107                            *q,
108                            *r;
109         FmgrInfo        inputproc;
110         int                     i,
111                                 nitems;
112         int32           nbytes;
113         char       *dataPtr;
114         ArrayType  *retval = NULL;
115         int                     ndim,
116                                 dim[MAXDIM],
117                                 lBound[MAXDIM];
118         char            typalign;
119
120         system_cache_lookup(element_type, true, &typlen, &typbyval, &typdelim,
121                                                 &typelem, &typinput, &typalign);
122
123         fmgr_info(typinput, &inputproc);
124
125         string_save = (char *) palloc(strlen(string) + 3);
126         strcpy(string_save, string);
127
128         /* --- read array dimensions  ---------- */
129         p = q = string_save;
130         done = false;
131         for (ndim = 0; !done;)
132         {
133                 while (isspace(*p))
134                         p++;
135                 if (*p == '[')
136                 {
137                         p++;
138                         if ((r = (char *) strchr(p, ':')) == (char *) NULL)
139                                 lBound[ndim] = 1;
140                         else
141                         {
142                                 *r = '\0';
143                                 lBound[ndim] = atoi(p);
144                                 p = r + 1;
145                         }
146                         for (q = p; isdigit(*q); q++);
147                         if (*q != ']')
148                                 elog(ERROR, "array_in: missing ']' in array declaration");
149                         *q = '\0';
150                         dim[ndim] = atoi(p);
151                         if ((dim[ndim] < 0) || (lBound[ndim] < 0))
152                                 elog(ERROR, "array_in: array dimensions need to be positive");
153                         dim[ndim] = dim[ndim] - lBound[ndim] + 1;
154                         if (dim[ndim] < 0)
155                                 elog(ERROR, "array_in: upper_bound cannot be < lower_bound");
156                         p = q + 1;
157                         ndim++;
158                 }
159                 else
160                 {
161                         done = true;
162                 }
163         }
164
165         if (ndim == 0)
166         {
167                 if (*p == '{')
168                 {
169                         ndim = _ArrayCount(p, dim, typdelim);
170                         for (i = 0; i < ndim; lBound[i++] = 1);
171                 }
172                 else
173                 {
174                         elog(ERROR, "array_in: Need to specify dimension");
175                 }
176         }
177         else
178         {
179                 while (isspace(*p))
180                         p++;
181                 if (strncmp(p, ASSGN, strlen(ASSGN)))
182                         elog(ERROR, "array_in: missing assignment operator");
183                 p += strlen(ASSGN);
184                 while (isspace(*p))
185                         p++;
186         }
187
188 #ifdef ARRAYDEBUG
189         printf("array_in- ndim %d (", ndim);
190         for (i = 0; i < ndim; i++)
191         {
192                 printf(" %d", dim[i]);
193         };
194         printf(") for %s\n", string);
195 #endif
196
197         nitems = getNitems(ndim, dim);
198         if (nitems == 0)
199         {
200                 char       *emptyArray = palloc(sizeof(ArrayType));
201
202                 MemSet(emptyArray, 0, sizeof(ArrayType));
203                 *(int32 *) emptyArray = sizeof(ArrayType);
204                 return emptyArray;
205         }
206
207         if (*p == '{')
208         {
209                 /* array not a large object */
210                 dataPtr =
211                         (char *) _ReadArrayStr(p, nitems, ndim, dim, &inputproc, typelem,
212                                                         typmod, typdelim, typlen, typbyval, typalign,
213                                                                    &nbytes);
214                 nbytes += ARR_OVERHEAD(ndim);
215                 retval = (ArrayType *) palloc(nbytes);
216                 MemSet(retval, 0, nbytes);
217                 memmove(retval, (char *) &nbytes, sizeof(int));
218                 memmove((char *) ARR_NDIM_PTR(retval), (char *) &ndim, sizeof(int));
219                 SET_LO_FLAG(false, retval);
220                 memmove((char *) ARR_DIMS(retval), (char *) dim, ndim * sizeof(int));
221                 memmove((char *) ARR_LBOUND(retval), (char *) lBound,
222                                 ndim * sizeof(int));
223
224                 /*
225                  * dataPtr is an array of arbitraystuff even though its type is
226                  * char* cast to char** to pass to _CopyArrayEls for now  - jolly
227                  */
228                 _CopyArrayEls((char **) dataPtr,
229                                           ARR_DATA_PTR(retval), nitems,
230                                           typlen, typalign, typbyval);
231         }
232         else
233         {
234 #ifdef LOARRAY
235                 int                     dummy,
236                                         bytes;
237                 bool            chunked = false;
238
239                 dataPtr = _ReadLOArray(p, &bytes, &dummy, &chunked, ndim,
240                                                            dim, typlen);
241                 nbytes = bytes + ARR_OVERHEAD(ndim);
242                 retval = (ArrayType *) palloc(nbytes);
243                 MemSet(retval, 0, nbytes);
244                 memmove(retval, (char *) &nbytes, sizeof(int));
245                 memmove((char *) ARR_NDIM_PTR(retval), (char *) &ndim, sizeof(int));
246                 SET_LO_FLAG(true, retval);
247                 SET_CHUNK_FLAG(chunked, retval);
248                 memmove((char *) ARR_DIMS(retval), (char *) dim, ndim * sizeof(int));
249                 memmove((char *) ARR_LBOUND(retval), (char *) lBound, ndim * sizeof(int));
250                 memmove(ARR_DATA_PTR(retval), dataPtr, bytes);
251 #endif
252                 elog(ERROR, "large object arrays not supported");
253         }
254         pfree(string_save);
255         return ((char *) retval);
256 }
257
258 /*-----------------------------------------------------------------------------
259  * _ArrayCount --
260  *       Counts the number of dimensions and the dim[] array for an array string.
261  *               The syntax for array input is C-like nested curly braces
262  *-----------------------------------------------------------------------------
263  */
264 static int
265 _ArrayCount(char *str, int dim[], int typdelim)
266 {
267         int                     nest_level = 0,
268                                 i;
269         int                     ndim = 0,
270                                 temp[MAXDIM];
271         bool            scanning_string = false;
272         bool            eoArray = false;
273         char       *q;
274
275         for (i = 0; i < MAXDIM; ++i)
276         {
277                 temp[i] = dim[i] = 0;
278         }
279
280         if (strncmp(str, "{}", 2) == 0)
281                 return (0);
282
283         q = str;
284         while (eoArray != true)
285         {
286                 bool            done = false;
287
288                 while (!done)
289                 {
290                         switch (*q)
291                         {
292                                 case '\\':
293                                         /* skip escaped characters (\ and ") inside strings */
294                                         if (scanning_string && *(q + 1))
295                                         {
296                                                 q++;
297                                         }
298                                         break;
299                                 case '\0':
300
301                                         /*
302                                          * Signal a premature end of the string.  DZ -
303                                          * 2-9-1996
304                                          */
305                                         elog(ERROR, "malformed array constant: %s", str);
306                                         break;
307                                 case '\"':
308                                         scanning_string = !scanning_string;
309                                         break;
310                                 case '{':
311                                         if (!scanning_string)
312                                         {
313                                                 temp[nest_level] = 0;
314                                                 nest_level++;
315                                         }
316                                         break;
317                                 case '}':
318                                         if (!scanning_string)
319                                         {
320                                                 if (!ndim)
321                                                         ndim = nest_level;
322                                                 nest_level--;
323                                                 if (nest_level)
324                                                         temp[nest_level - 1]++;
325                                                 if (nest_level == 0)
326                                                         eoArray = done = true;
327                                         }
328                                         break;
329                                 default:
330                                         if (!ndim)
331                                                 ndim = nest_level;
332                                         if (*q == typdelim && !scanning_string)
333                                                 done = true;
334                                         break;
335                         }
336                         if (!done)
337                                 q++;
338                 }
339                 temp[ndim - 1]++;
340                 q++;
341                 if (!eoArray)
342                         while (isspace(*q))
343                                 q++;
344         }
345         for (i = 0; i < ndim; ++i)
346         {
347                 dim[i] = temp[i];
348         }
349
350         return (ndim);
351 }
352
353 /*---------------------------------------------------------------------------
354  * _ReadArrayStr :
355  *       parses the array string pointed by "arrayStr" and converts it in the
356  *       internal format. The external format expected is like C array
357  *       declaration. Unspecified elements are initialized to zero for fixed length
358  *       base types and to empty varlena structures for variable length base
359  *       types.
360  * result :
361  *       returns the internal representation of the array elements
362  *       nbytes is set to the size of the array in its internal representation.
363  *---------------------------------------------------------------------------
364  */
365 static char *
366 _ReadArrayStr(char *arrayStr,
367                           int nitems,
368                           int ndim,
369                           int dim[],
370                           FmgrInfo *inputproc,          /* function used for the
371                                                                                  * conversion */
372                           Oid typelem,
373                           int16 typmod,
374                           char typdelim,
375                           int typlen,
376                           bool typbyval,
377                           char typalign,
378                           int *nbytes)
379 {
380         int                     i,
381                                 nest_level = 0;
382         char       *p,
383                            *q,
384                            *r,
385                           **values;
386         bool            scanning_string = false;
387         int                     indx[MAXDIM],
388                                 prod[MAXDIM];
389         bool            eoArray = false;
390
391         mda_get_prod(ndim, dim, prod);
392         for (i = 0; i < ndim; indx[i++] = 0);
393         /* read array enclosed within {} */
394         values = (char **) palloc(nitems * sizeof(char *));
395         MemSet(values, 0, nitems * sizeof(char *));
396         q = p = arrayStr;
397
398         while (!eoArray)
399         {
400                 bool            done = false;
401                 int                     i = -1;
402
403                 while (!done)
404                 {
405                         switch (*q)
406                         {
407                                 case '\\':
408                                         /* Crunch the string on top of the backslash. */
409                                         for (r = q; *r != '\0'; r++)
410                                                 *r = *(r + 1);
411                                         break;
412                                 case '\"':
413                                         if (!scanning_string)
414                                         {
415                                                 while (p != q)
416                                                         p++;
417                                                 p++;    /* get p past first doublequote */
418                                         }
419                                         else
420                                                 *q = '\0';
421                                         scanning_string = !scanning_string;
422                                         break;
423                                 case '{':
424                                         if (!scanning_string)
425                                         {
426                                                 p++;
427                                                 nest_level++;
428                                                 if (nest_level > ndim)
429                                                         elog(ERROR, "array_in: illformed array constant");
430                                                 indx[nest_level - 1] = 0;
431                                                 indx[ndim - 1] = 0;
432                                         }
433                                         break;
434                                 case '}':
435                                         if (!scanning_string)
436                                         {
437                                                 if (i == -1)
438                                                         i = tuple2linear(ndim, indx, prod);
439                                                 nest_level--;
440                                                 if (nest_level == 0)
441                                                         eoArray = done = true;
442                                                 else
443                                                 {
444                                                         *q = '\0';
445                                                         indx[nest_level - 1]++;
446                                                 }
447                                         }
448                                         break;
449                                 default:
450                                         if (*q == typdelim && !scanning_string)
451                                         {
452                                                 if (i == -1)
453                                                         i = tuple2linear(ndim, indx, prod);
454                                                 done = true;
455                                                 indx[ndim - 1]++;
456                                         }
457                                         break;
458                         }
459                         if (!done)
460                                 q++;
461                 }
462                 *q = '\0';
463                 if (i >= nitems)
464                         elog(ERROR, "array_in: illformed array constant");
465                 values[i] = (*fmgr_faddr(inputproc)) (p, typelem, typmod);
466                 p = ++q;
467                 if (!eoArray)
468
469                         /*
470                          * if not at the end of the array skip white space
471                          */
472                         while (isspace(*q))
473                         {
474                                 p++;
475                                 q++;
476                         }
477         }
478         if (typlen > 0)
479         {
480                 *nbytes = nitems * typlen;
481                 if (!typbyval)
482                         for (i = 0; i < nitems; i++)
483                                 if (!values[i])
484                                 {
485                                         values[i] = palloc(typlen);
486                                         MemSet(values[i], 0, typlen);
487                                 }
488         }
489         else
490         {
491                 for (i = 0, *nbytes = 0; i < nitems; i++)
492                 {
493                         if (values[i])
494                         {
495                                 if (typalign == 'd')
496                                 {
497                                         *nbytes += DOUBLEALIGN(*(int32 *) values[i]);
498                                 }
499                                 else
500                                 {
501                                         *nbytes += INTALIGN(*(int32 *) values[i]);
502                                 }
503                         }
504                         else
505                         {
506                                 *nbytes += sizeof(int32);
507                                 values[i] = palloc(sizeof(int32));
508                                 *(int32 *) values[i] = sizeof(int32);
509                         }
510                 }
511         }
512         return ((char *) values);
513 }
514
515
516 /*----------------------------------------------------------------------------
517  * Read data about an array to be stored as a large object
518  *----------------------------------------------------------------------------
519  */
520 #ifdef LOARRAY
521 static char *
522 _ReadLOArray(char *str,
523                          int *nbytes,
524                          int *fd,
525                          bool *chunkFlag,
526                          int ndim,
527                          int dim[],
528                          int baseSize)
529 {
530         char       *inputfile,
531                            *accessfile = NULL,
532                            *chunkfile = NULL;
533         char       *retStr,
534                            *_AdvanceBy1word();
535         Oid                     lobjId;
536
537         str = _AdvanceBy1word(str, &inputfile);
538
539         while (str != NULL)
540         {
541                 char       *word;
542
543                 str = _AdvanceBy1word(str, &word);
544
545                 if (!strcmp(word, "-chunk"))
546                 {
547                         if (str == NULL)
548                                 elog(ERROR, "array_in: access pattern file required");
549                         str = _AdvanceBy1word(str, &accessfile);
550                 }
551                 else if (!strcmp(word, "-noreorg"))
552                 {
553                         if (str == NULL)
554                                 elog(ERROR, "array_in: chunk file required");
555                         str = _AdvanceBy1word(str, &chunkfile);
556                 }
557                 else
558                 {
559                         elog(ERROR, "usage: <input file> -chunk DEFAULT/<access pattern file> -invert/-native [-noreorg <chunk file>]");
560                 }
561         }
562
563         if (inputfile == NULL)
564                 elog(ERROR, "array_in: missing file name");
565         lobjId = lo_creat(0);
566         *fd = lo_open(lobjId, INV_READ);
567         if (*fd < 0)
568                 elog(ERROR, "Large object create failed");
569         retStr = inputfile;
570         *nbytes = strlen(retStr) + 2;
571
572         if (accessfile)
573         {
574                 FILE       *afd;
575
576                 if ((afd = AllocateFile(accessfile, "r")) == NULL)
577                         elog(ERROR, "unable to open access pattern file");
578                 *chunkFlag = true;
579                 retStr = _ChunkArray(*fd, afd, ndim, dim, baseSize, nbytes,
580                                                          chunkfile);
581                 FreeFile(afd);
582         }
583         return (retStr);
584 }
585
586 #endif
587
588 static void
589 _CopyArrayEls(char **values,
590                           char *p,
591                           int nitems,
592                           int typlen,
593                           char typalign,
594                           bool typbyval)
595 {
596         int                     i;
597
598         for (i = 0; i < nitems; i++)
599         {
600                 int                     inc;
601
602                 inc = ArrayCastAndSet(values[i], typbyval, typlen, p);
603                 p += inc;
604                 if (!typbyval)
605                         pfree(values[i]);
606         }
607         pfree(values);
608 }
609
610 /*-------------------------------------------------------------------------
611  * array_out :
612  *                 takes the internal representation of an array and returns a string
613  *                containing the array in its external format.
614  *-------------------------------------------------------------------------
615  */
616 char *
617 array_out(ArrayType *v, Oid element_type)
618 {
619         int                     typlen;
620         bool            typbyval;
621         char            typdelim;
622         Oid                     typoutput,
623                                 typelem;
624         FmgrInfo        outputproc;
625         char            typalign;
626
627         char       *p, *tmp, 
628                            *retval,
629                           **values,
630                                 delim[2];
631         int                     nitems,
632                                 overall_length,
633                                 i,
634                                 j,
635                                 k,
636                                 l,
637                                 indx[MAXDIM];
638         bool            dummy_bool;
639         int                     ndim,
640                            *dim;
641
642         if (v == (ArrayType *) NULL)
643                 return ((char *) NULL);
644
645         if (ARR_IS_LO(v) == true)
646         {
647                 char       *p,
648                                    *save_p;
649                 int                     nbytes;
650
651                 /* get a wide string to print to */
652                 p = array_dims(v, &dummy_bool);
653                 nbytes = strlen(ARR_DATA_PTR(v)) + VARHDRSZ + *(int *) p;
654
655                 save_p = (char *) palloc(nbytes);
656
657                 strcpy(save_p, p + sizeof(int));
658                 strcat(save_p, ASSGN);
659                 strcat(save_p, ARR_DATA_PTR(v));
660                 pfree(p);
661                 return (save_p);
662         }
663
664         system_cache_lookup(element_type, false, &typlen, &typbyval,
665                                                 &typdelim, &typelem, &typoutput, &typalign);
666         fmgr_info(typoutput, &outputproc);
667         sprintf(delim, "%c", typdelim);
668         ndim = ARR_NDIM(v);
669         dim = ARR_DIMS(v);
670         nitems = getNitems(ndim, dim);
671
672         if (nitems == 0)
673         {
674                 char       *emptyArray = palloc(3);
675
676                 emptyArray[0] = '{';
677                 emptyArray[1] = '}';
678                 emptyArray[2] = '\0';
679                 return emptyArray;
680         }
681
682         p = ARR_DATA_PTR(v);
683         overall_length = 1;                     /* [TRH] don't forget to count \0 at end. */
684         values = (char **) palloc(nitems * sizeof(char *));
685         for (i = 0; i < nitems; i++)
686         {
687                 if (typbyval)
688                 {
689                         switch (typlen)
690                         {
691                                 case 1:
692                                         values[i] = (*fmgr_faddr(&outputproc)) (*p, typelem);
693                                         break;
694                                 case 2:
695                                         values[i] = (*fmgr_faddr(&outputproc)) (*(int16 *) p, typelem);
696                                         break;
697                                 case 3:
698                                 case 4:
699                                         values[i] = (*fmgr_faddr(&outputproc)) (*(int32 *) p, typelem);
700                                         break;
701                         }
702                         p += typlen;
703                 }
704                 else
705                 {
706                         values[i] = (*fmgr_faddr(&outputproc)) (p, typelem);
707                         if (typlen > 0)
708                                 p += typlen;
709                         else
710                                 p += INTALIGN(*(int32 *) p);
711
712                         /*
713                          * For the pair of double quotes
714                          */
715                         overall_length += 2;
716                 }
717                 for (tmp=values[i];*tmp;tmp++) {
718                         overall_length += 1;
719                         if (*tmp=='"') overall_length += 1;
720                 }
721                 overall_length += 1;
722         }
723
724         /*
725          * count total number of curly braces in output string
726          */
727         for (i = j = 0, k = 1; i < ndim; k *= dim[i++], j += k);
728
729         p = (char *) palloc(overall_length + 2 * j);
730         retval = p;
731
732         strcpy(p, "{");
733         for (i = 0; i < ndim; indx[i++] = 0);
734         j = 0;
735         k = 0;
736         do
737         {
738                 for (i = j; i < ndim - 1; i++)
739                         strcat(p, "{");
740
741                 /*
742                  * Surround anything that is not passed by value in double quotes.
743                  * See above for more details.
744                  */
745                 if (!typbyval)
746                 {
747                         strcat(p, "\"");
748                         l=strlen(p);
749                         for (tmp=values[k];*tmp;tmp++) {
750                                 if (*tmp=='"') p[l++]='\\';
751                                 p[l++]=*tmp;
752                                 }
753                         p[l]='\0';
754                         strcat(p, "\"");
755                 }
756                 else
757                         strcat(p, values[k]);
758                 pfree(values[k++]);
759
760                 for (i = ndim - 1; i >= 0; i--)
761                 {
762                         indx[i] = (indx[i] + 1) % dim[i];
763                         if (indx[i])
764                         {
765                                 strcat(p, delim);
766                                 break;
767                         }
768                         else
769                                 strcat(p, "}");
770                 }
771                 j = i;
772         } while (j != -1);
773
774         pfree(values);
775         return (retval);
776 }
777
778 /*-----------------------------------------------------------------------------
779  * array_dims :
780  *                returns the dimension of the array pointed to by "v"
781  *----------------------------------------------------------------------------
782  */
783 char *
784 array_dims(ArrayType *v, bool *isNull)
785 {
786         char       *p,
787                            *save_p;
788         int                     nbytes,
789                                 i;
790         int                *dimv,
791                            *lb;
792
793         if (v == (ArrayType *) NULL)
794                 RETURN_NULL;
795         nbytes = ARR_NDIM(v) * 33;
796
797         /*
798          * 33 since we assume 15 digits per number + ':' +'[]'
799          */
800         save_p = p = (char *) palloc(nbytes + VARHDRSZ);
801         MemSet(save_p, 0, nbytes + VARHDRSZ);
802         dimv = ARR_DIMS(v);
803         lb = ARR_LBOUND(v);
804         p += VARHDRSZ;
805         for (i = 0; i < ARR_NDIM(v); i++)
806         {
807                 sprintf(p, "[%d:%d]", lb[i], dimv[i] + lb[i] - 1);
808                 p += strlen(p);
809         }
810         nbytes = strlen(save_p + VARHDRSZ) + VARHDRSZ;
811         memmove(save_p, &nbytes, VARHDRSZ);
812         return (save_p);
813 }
814
815 /*---------------------------------------------------------------------------
816  * array_ref :
817  *        This routing takes an array pointer and an index array and returns
818  *        a pointer to the referred element if element is passed by
819  *        reference otherwise returns the value of the referred element.
820  *---------------------------------------------------------------------------
821  */
822 Datum
823 array_ref(ArrayType *array,
824                   int n,
825                   int indx[],
826                   int reftype,
827                   int elmlen,
828                   int arraylen,
829                   bool *isNull)
830 {
831         int                     i,
832                                 ndim,
833                            *dim,
834                            *lb,
835                                 offset,
836                                 nbytes;
837         struct varlena *v = NULL;
838         char       *retval = NULL;
839
840         if (array == (ArrayType *) NULL)
841                 RETURN_NULL;
842         if (arraylen > 0)
843         {
844
845                 /*
846                  * fixed length arrays -- these are assumed to be 1-d
847                  */
848                 if (indx[0] * elmlen > arraylen)
849                         elog(ERROR, "array_ref: array bound exceeded");
850                 retval = (char *) array + indx[0] * elmlen;
851                 return _ArrayCast(retval, reftype, elmlen);
852         }
853         dim = ARR_DIMS(array);
854         lb = ARR_LBOUND(array);
855         ndim = ARR_NDIM(array);
856         nbytes = (*(int32 *) array) - ARR_OVERHEAD(ndim);
857
858         if (!SanityCheckInput(ndim, n, dim, lb, indx))
859                 RETURN_NULL;
860
861         offset = GetOffset(n, dim, lb, indx);
862
863         if (ARR_IS_LO(array))
864         {
865                 char       *lo_name;
866                 int                     fd = 0;
867
868                 /* We are assuming fixed element lengths here */
869                 offset *= elmlen;
870                 lo_name = (char *) ARR_DATA_PTR(array);
871 #ifdef LOARRAY
872                 if ((fd = LOopen(lo_name, ARR_IS_INV(array) ? INV_READ : O_RDONLY)) < 0)
873                         RETURN_NULL;
874 #endif
875                 if (ARR_IS_CHUNKED(array))
876                         v = _ReadChunkArray1El(indx, elmlen, fd, array, isNull);
877                 else
878                 {
879                         if (lo_lseek(fd, offset, SEEK_SET) < 0)
880                                 RETURN_NULL;
881 #ifdef LOARRAY
882                         v = (struct varlena *) LOread(fd, elmlen);
883 #endif
884                 }
885                 if (*isNull)
886                         RETURN_NULL;
887                 if (VARSIZE(v) - VARHDRSZ < elmlen)
888                         RETURN_NULL;
889                 lo_close(fd);
890                 retval = (char *) _ArrayCast((char *) VARDATA(v), reftype, elmlen);
891                 if (reftype == 0)
892                 {                                               /* not by value */
893                         char       *tempdata = palloc(elmlen);
894
895                         memmove(tempdata, retval, elmlen);
896                         retval = tempdata;
897                 }
898                 pfree(v);
899                 return (Datum) retval;
900         }
901
902         if (elmlen > 0)
903         {
904                 offset = offset * elmlen;
905                 /* off the end of the array */
906                 if (nbytes - offset < 1)
907                         RETURN_NULL;
908                 retval = ARR_DATA_PTR(array) + offset;
909                 return _ArrayCast(retval, reftype, elmlen);
910         }
911         else
912         {
913                 bool            done = false;
914                 char       *temp;
915                 int                     bytes = nbytes;
916
917                 temp = ARR_DATA_PTR(array);
918                 i = 0;
919                 while (bytes > 0 && !done)
920                 {
921                         if (i == offset)
922                         {
923                                 retval = temp;
924                                 done = true;
925                         }
926                         bytes -= INTALIGN(*(int32 *) temp);
927                         temp += INTALIGN(*(int32 *) temp);
928                         i++;
929                 }
930                 if (!done)
931                         RETURN_NULL;
932                 return (Datum) retval;
933         }
934 }
935
936 /*-----------------------------------------------------------------------------
937  * array_clip :
938  *                This routine takes an array and a range of indices (upperIndex and
939  *                 lowerIndx), creates a new array structure for the referred elements
940  *                 and returns a pointer to it.
941  *-----------------------------------------------------------------------------
942  */
943 Datum
944 array_clip(ArrayType *array,
945                    int n,
946                    int upperIndx[],
947                    int lowerIndx[],
948                    int reftype,
949                    int len,
950                    bool *isNull)
951 {
952         int                     i,
953                                 ndim,
954                            *dim,
955                            *lb,
956                                 nbytes;
957         ArrayType  *newArr;
958         int                     bytes,
959                                 span[MAXDIM];
960
961         /* timer_start(); */
962         if (array == (ArrayType *) NULL)
963                 RETURN_NULL;
964         dim = ARR_DIMS(array);
965         lb = ARR_LBOUND(array);
966         ndim = ARR_NDIM(array);
967         nbytes = (*(int32 *) array) - ARR_OVERHEAD(ndim);
968
969         if (!SanityCheckInput(ndim, n, dim, lb, upperIndx))
970                 RETURN_NULL;
971
972         if (!SanityCheckInput(ndim, n, dim, lb, lowerIndx))
973                 RETURN_NULL;
974
975         for (i = 0; i < n; i++)
976                 if (lowerIndx[i] > upperIndx[i])
977                         elog(ERROR, "lowerIndex cannot be larger than upperIndx");
978         mda_get_range(n, span, lowerIndx, upperIndx);
979
980         if (ARR_IS_LO(array))
981         {
982 #ifdef LOARRAY
983                 char       *lo_name;
984
985 #endif
986                 char       *newname = NULL;
987                 int                     fd = 0,
988                                         newfd = 0,
989                                         isDestLO = true,
990                                         rsize;
991
992                 if (len < 0)
993                         elog(ERROR, "array_clip: array of variable length objects not supported");
994 #ifdef LOARRAY
995                 lo_name = (char *) ARR_DATA_PTR(array);
996                 if ((fd = LOopen(lo_name, ARR_IS_INV(array) ? INV_READ : O_RDONLY)) < 0)
997                         RETURN_NULL;
998                 newname = _array_newLO(&newfd, Unix);
999 #endif
1000                 bytes = strlen(newname) + 1 + ARR_OVERHEAD(n);
1001                 newArr = (ArrayType *) palloc(bytes);
1002                 memmove(newArr, array, sizeof(ArrayType));
1003                 memmove(newArr, &bytes, sizeof(int));
1004                 memmove(ARR_DIMS(newArr), span, n * sizeof(int));
1005                 memmove(ARR_LBOUND(newArr), lowerIndx, n * sizeof(int));
1006                 strcpy(ARR_DATA_PTR(newArr), newname);
1007
1008                 rsize = compute_size(lowerIndx, upperIndx, n, len);
1009                 if (rsize < MAX_BUFF_SIZE)
1010                 {
1011                         char       *buff;
1012
1013                         rsize += VARHDRSZ;
1014                         buff = palloc(rsize);
1015                         if (buff)
1016                                 isDestLO = false;
1017                         if (ARR_IS_CHUNKED(array))
1018                         {
1019                                 _ReadChunkArray(lowerIndx, upperIndx, len, fd, &(buff[VARHDRSZ]),
1020                                                                 array, 0, isNull);
1021                         }
1022                         else
1023                         {
1024                                 _ReadArray(lowerIndx, upperIndx, len, fd, (int) &(buff[VARHDRSZ]),
1025                                                    array,
1026                                                    0, isNull);
1027                         }
1028                         memmove(buff, &rsize, VARHDRSZ);
1029 #ifdef LOARRAY
1030                         if (!*isNull)
1031                                 bytes = LOwrite(newfd, (struct varlena *) buff);
1032 #endif
1033                         pfree(buff);
1034                 }
1035                 if (isDestLO)
1036                 {
1037                         if (ARR_IS_CHUNKED(array))
1038                         {
1039                                 _ReadChunkArray(lowerIndx, upperIndx, len, fd, (char *) newfd, array,
1040                                                                 1, isNull);
1041                         }
1042                         else
1043                         {
1044                                 _ReadArray(lowerIndx, upperIndx, len, fd, newfd, array, 1, isNull);
1045                         }
1046                 }
1047 #ifdef LOARRAY
1048                 LOclose(fd);
1049                 LOclose(newfd);
1050 #endif
1051                 if (*isNull)
1052                 {
1053                         pfree(newArr);
1054                         newArr = NULL;
1055                 }
1056                 /* timer_end(); */
1057                 return ((Datum) newArr);
1058         }
1059
1060         if (len > 0)
1061         {
1062                 bytes = getNitems(n, span);
1063                 bytes = bytes * len + ARR_OVERHEAD(n);
1064         }
1065         else
1066         {
1067                 bytes = _ArrayClipCount(lowerIndx, upperIndx, array);
1068                 bytes += ARR_OVERHEAD(n);
1069         }
1070         newArr = (ArrayType *) palloc(bytes);
1071         memmove(newArr, array, sizeof(ArrayType));
1072         memmove(newArr, &bytes, sizeof(int));
1073         memmove(ARR_DIMS(newArr), span, n * sizeof(int));
1074         memmove(ARR_LBOUND(newArr), lowerIndx, n * sizeof(int));
1075         _ArrayRange(lowerIndx, upperIndx, len, ARR_DATA_PTR(newArr), array, 1);
1076         return (Datum) newArr;
1077 }
1078
1079 /*-----------------------------------------------------------------------------
1080  * array_set  :
1081  *                This routine sets the value of an array location (specified by an index array)
1082  *                to a new value specified by "dataPtr".
1083  * result :
1084  *                returns a pointer to the modified array.
1085  *-----------------------------------------------------------------------------
1086  */
1087 char *
1088 array_set(ArrayType *array,
1089                   int n,
1090                   int indx[],
1091                   char *dataPtr,
1092                   int reftype,
1093                   int elmlen,
1094                   int arraylen,
1095                   bool *isNull)
1096 {
1097         int                     ndim,
1098                            *dim,
1099                            *lb,
1100                                 offset,
1101                                 nbytes;
1102         char       *pos;
1103
1104         if (array == (ArrayType *) NULL)
1105                 RETURN_NULL;
1106         if (arraylen > 0)
1107         {
1108
1109                 /*
1110                  * fixed length arrays -- these are assumed to be 1-d
1111                  */
1112                 if (indx[0] * elmlen > arraylen)
1113                         elog(ERROR, "array_ref: array bound exceeded");
1114                 pos = (char *) array + indx[0] * elmlen;
1115                 ArrayCastAndSet(dataPtr, (bool) reftype, elmlen, pos);
1116                 return ((char *) array);
1117         }
1118         dim = ARR_DIMS(array);
1119         lb = ARR_LBOUND(array);
1120         ndim = ARR_NDIM(array);
1121         nbytes = (*(int32 *) array) - ARR_OVERHEAD(ndim);
1122
1123         if (!SanityCheckInput(ndim, n, dim, lb, indx))
1124         {
1125                 elog(ERROR, "array_set: array bound exceeded");
1126                 return ((char *) array);
1127         }
1128         offset = GetOffset(n, dim, lb, indx);
1129
1130         if (ARR_IS_LO(array))
1131         {
1132                 int                     fd = 0;
1133                 struct varlena *v;
1134
1135                 /* We are assuming fixed element lengths here */
1136                 offset *= elmlen;
1137 #ifdef LOARRAY
1138                 char       *lo_name;
1139
1140                 lo_name = ARR_DATA_PTR(array);
1141                 if ((fd = LOopen(lo_name, ARR_IS_INV(array) ? INV_WRITE : O_WRONLY)) < 0)
1142                         return ((char *) array);
1143 #endif
1144                 if (lo_lseek(fd, offset, SEEK_SET) < 0)
1145                         return ((char *) array);
1146                 v = (struct varlena *) palloc(elmlen + VARHDRSZ);
1147                 VARSIZE(v) = elmlen + VARHDRSZ;
1148                 ArrayCastAndSet(dataPtr, (bool) reftype, elmlen, VARDATA(v));
1149 #ifdef LOARRAY
1150                 n = LOwrite(fd, v);
1151 #endif
1152
1153                 /*
1154                  * if (n < VARSIZE(v) - VARHDRSZ) RETURN_NULL;
1155                  */
1156                 pfree(v);
1157                 lo_close(fd);
1158                 return ((char *) array);
1159         }
1160         if (elmlen > 0)
1161         {
1162                 offset = offset * elmlen;
1163                 /* off the end of the array */
1164                 if (nbytes - offset < 1)
1165                         return ((char *) array);
1166                 pos = ARR_DATA_PTR(array) + offset;
1167         }
1168         else
1169         {
1170                 ArrayType  *newarray;
1171                 char       *elt_ptr;
1172                 int                     oldsize,
1173                                         newsize,
1174                                         oldlen,
1175                                         newlen,
1176                                         lth0,
1177                                         lth1,
1178                                         lth2;
1179
1180                 elt_ptr = array_seek(ARR_DATA_PTR(array), -1, offset);
1181                 oldlen = INTALIGN(*(int32 *) elt_ptr);
1182                 newlen = INTALIGN(*(int32 *) dataPtr);
1183
1184                 if (oldlen == newlen)
1185                 {
1186                         /* new element with same size, overwrite old data */
1187                         ArrayCastAndSet(dataPtr, (bool) reftype, elmlen, elt_ptr);
1188                         return ((char *) array);
1189                 }
1190
1191                 /* new element with different size, reallocate the array */
1192                 oldsize = array->size;
1193                 lth0 = ARR_OVERHEAD(n);
1194                 lth1 = (int) (elt_ptr - ARR_DATA_PTR(array));
1195                 lth2 = (int) (oldsize - lth0 - lth1 - oldlen);
1196                 newsize = lth0 + lth1 + newlen + lth2;
1197
1198                 newarray = (ArrayType *) palloc(newsize);
1199                 memmove((char *) newarray, (char *) array, lth0 + lth1);
1200                 newarray->size = newsize;
1201                 newlen = ArrayCastAndSet(dataPtr, (bool) reftype, elmlen,
1202                                                                  (char *) newarray + lth0 + lth1);
1203                 memmove((char *) newarray + lth0 + lth1 + newlen,
1204                                 (char *) array + lth0 + lth1 + oldlen, lth2);
1205
1206                 /* ??? who should free this storage ??? */
1207                 return ((char *) newarray);
1208         }
1209         ArrayCastAndSet(dataPtr, (bool) reftype, elmlen, pos);
1210         return ((char *) array);
1211 }
1212
1213 /*----------------------------------------------------------------------------
1214  * array_assgn :
1215  *                This routine sets the value of a range of array locations (specified
1216  *                by upper and lower index values ) to new values passed as
1217  *                another array
1218  * result :
1219  *                returns a pointer to the modified array.
1220  *----------------------------------------------------------------------------
1221  */
1222 char *
1223 array_assgn(ArrayType *array,
1224                         int n,
1225                         int upperIndx[],
1226                         int lowerIndx[],
1227                         ArrayType *newArr,
1228                         int reftype,
1229                         int len,
1230                         bool *isNull)
1231 {
1232         int                     i,
1233                                 ndim,
1234                            *dim,
1235                            *lb;
1236
1237         if (array == (ArrayType *) NULL)
1238                 RETURN_NULL;
1239         if (len < 0)
1240                 elog(ERROR, "array_assgn:updates on arrays of variable length elements not allowed");
1241
1242         dim = ARR_DIMS(array);
1243         lb = ARR_LBOUND(array);
1244         ndim = ARR_NDIM(array);
1245
1246         if (!SanityCheckInput(ndim, n, dim, lb, upperIndx) ||
1247                 !SanityCheckInput(ndim, n, dim, lb, lowerIndx))
1248         {
1249                 return ((char *) array);
1250         }
1251
1252         for (i = 0; i < n; i++)
1253                 if (lowerIndx[i] > upperIndx[i])
1254                         elog(ERROR, "lowerIndex larger than upperIndx");
1255
1256         if (ARR_IS_LO(array))
1257         {
1258                 int                     fd = 0,
1259                                         newfd = 0;
1260
1261 #ifdef LOARRAY
1262                 char       *lo_name;
1263
1264                 lo_name = (char *) ARR_DATA_PTR(array);
1265                 if ((fd = LOopen(lo_name, ARR_IS_INV(array) ? INV_WRITE : O_WRONLY)) < 0)
1266                         return ((char *) array);
1267 #endif
1268                 if (ARR_IS_LO(newArr))
1269                 {
1270 #ifdef LOARRAY
1271                         lo_name = (char *) ARR_DATA_PTR(newArr);
1272                         if ((newfd = LOopen(lo_name, ARR_IS_INV(newArr) ? INV_READ : O_RDONLY)) < 0)
1273                                 return ((char *) array);
1274 #endif
1275                         _LOArrayRange(lowerIndx, upperIndx, len, fd, newfd, array, 1, isNull);
1276                         lo_close(newfd);
1277                 }
1278                 else
1279                 {
1280                         _LOArrayRange(lowerIndx, upperIndx, len, fd, (int) ARR_DATA_PTR(newArr),
1281                                                   array, 0, isNull);
1282                 }
1283                 lo_close(fd);
1284                 return ((char *) array);
1285         }
1286         _ArrayRange(lowerIndx, upperIndx, len, ARR_DATA_PTR(newArr), array, 0);
1287         return (char *) array;
1288 }
1289
1290 /*-----------------------------------------------------------------------------
1291  * array_eq :
1292  *                compares two arrays for equality
1293  * result :
1294  *                returns 1 if the arrays are equal, 0 otherwise.
1295  *-----------------------------------------------------------------------------
1296  */
1297 int
1298 array_eq(ArrayType *array1, ArrayType *array2)
1299 {
1300         if ((array1 == NULL) || (array2 == NULL))
1301                 return (0);
1302         if (*(int *) array1 != *(int *) array2)
1303                 return (0);
1304         if (memcmp(array1, array2, *(int *) array1))
1305                 return (0);
1306         return (1);
1307 }
1308
1309 /***************************************************************************/
1310 /******************|              Support  Routines                       |*****************/
1311 /***************************************************************************/
1312 static void
1313 system_cache_lookup(Oid element_type,
1314                                         bool input,
1315                                         int *typlen,
1316                                         bool *typbyval,
1317                                         char *typdelim,
1318                                         Oid *typelem,
1319                                         Oid *proc,
1320                                         char *typalign)
1321 {
1322         HeapTuple       typeTuple;
1323         TypeTupleForm typeStruct;
1324
1325         typeTuple = SearchSysCacheTuple(TYPOID, ObjectIdGetDatum(element_type),
1326                                                                         0, 0, 0);
1327
1328         if (!HeapTupleIsValid(typeTuple))
1329         {
1330                 elog(ERROR, "array_out: Cache lookup failed for type %d\n",
1331                          element_type);
1332                 return;
1333         }
1334         typeStruct = (TypeTupleForm) GETSTRUCT(typeTuple);
1335         *typlen = typeStruct->typlen;
1336         *typbyval = typeStruct->typbyval;
1337         *typdelim = typeStruct->typdelim;
1338         *typelem = typeStruct->typelem;
1339         *typalign = typeStruct->typalign;
1340         if (input)
1341         {
1342                 *proc = typeStruct->typinput;
1343         }
1344         else
1345         {
1346                 *proc = typeStruct->typoutput;
1347         }
1348 }
1349
1350 static Datum
1351 _ArrayCast(char *value, bool byval, int len)
1352 {
1353         if (byval)
1354         {
1355                 switch (len)
1356                 {
1357                                 case 1:
1358                                 return ((Datum) *value);
1359                         case 2:
1360                                 return ((Datum) *(int16 *) value);
1361                         case 3:
1362                         case 4:
1363                                 return ((Datum) *(int32 *) value);
1364                         default:
1365                                 elog(ERROR, "array_ref: byval and elt len > 4!");
1366                                 break;
1367                 }
1368         }
1369         else
1370         {
1371                 return (Datum) value;
1372         }
1373         return 0;
1374 }
1375
1376
1377 static int
1378 ArrayCastAndSet(char *src,
1379                                 bool typbyval,
1380                                 int typlen,
1381                                 char *dest)
1382 {
1383         int                     inc;
1384
1385         if (typlen > 0)
1386         {
1387                 if (typbyval)
1388                 {
1389                         switch (typlen)
1390                         {
1391                                 case 1:
1392                                         *dest = DatumGetChar(src);
1393                                         break;
1394                                 case 2:
1395                                         *(int16 *) dest = DatumGetInt16(src);
1396                                         break;
1397                                 case 4:
1398                                         *(int32 *) dest = (int32) src;
1399                                         break;
1400                         }
1401                 }
1402                 else
1403                 {
1404                         memmove(dest, src, typlen);
1405                 }
1406                 inc = typlen;
1407         }
1408         else
1409         {
1410                 memmove(dest, src, *(int32 *) src);
1411                 inc = (INTALIGN(*(int32 *) src));
1412         }
1413         return (inc);
1414 }
1415
1416 #ifdef LOARRAY
1417 static char *
1418 _AdvanceBy1word(char *str, char **word)
1419 {
1420         char       *retstr,
1421                            *space;
1422
1423         *word = NULL;
1424         if (str == NULL)
1425                 return str;
1426         while (isspace(*str))
1427                 str++;
1428         *word = str;
1429         if ((space = (char *) strchr(str, ' ')) != (char *) NULL)
1430         {
1431                 retstr = space + 1;
1432                 *space = '\0';
1433         }
1434         else
1435                 retstr = NULL;
1436         return retstr;
1437 }
1438
1439 #endif
1440
1441 static int
1442 SanityCheckInput(int ndim, int n, int dim[], int lb[], int indx[])
1443 {
1444         int                     i;
1445
1446         /* Do Sanity check on input */
1447         if (n != ndim)
1448                 return 0;
1449         for (i = 0; i < ndim; i++)
1450                 if ((lb[i] > indx[i]) || (indx[i] >= (dim[i] + lb[i])))
1451                         return 0;
1452         return 1;
1453 }
1454
1455 static void
1456 _ArrayRange(int st[],
1457                         int endp[],
1458                         int bsize,
1459                         char *destPtr,
1460                         ArrayType *array,
1461                         int from)
1462 {
1463         int                     n,
1464                            *dim,
1465                            *lb,
1466                                 st_pos,
1467                                 prod[MAXDIM];
1468         int                     span[MAXDIM],
1469                                 dist[MAXDIM],
1470                                 indx[MAXDIM];
1471         int                     i,
1472                                 j,
1473                                 inc;
1474         char       *srcPtr;
1475
1476         n = ARR_NDIM(array);
1477         dim = ARR_DIMS(array);
1478         lb = ARR_LBOUND(array);
1479         srcPtr = ARR_DATA_PTR(array);
1480         for (i = 0; i < n; st[i] -= lb[i], endp[i] -= lb[i], i++);
1481         mda_get_prod(n, dim, prod);
1482         st_pos = tuple2linear(n, st, prod);
1483         srcPtr = array_seek(srcPtr, bsize, st_pos);
1484         mda_get_range(n, span, st, endp);
1485         mda_get_offset_values(n, dist, prod, span);
1486         for (i = 0; i < n; indx[i++] = 0);
1487         i = j = n - 1;
1488         inc = bsize;
1489         do
1490         {
1491                 srcPtr = array_seek(srcPtr, bsize, dist[j]);
1492                 if (from)
1493                         inc = array_read(destPtr, bsize, 1, srcPtr);
1494                 else
1495                         inc = array_read(srcPtr, bsize, 1, destPtr);
1496                 destPtr += inc;
1497                 srcPtr += inc;
1498         } while ((j = next_tuple(i + 1, indx, span)) != -1);
1499 }
1500
1501 static int
1502 _ArrayClipCount(int stI[], int endpI[], ArrayType *array)
1503 {
1504         int                     n,
1505                            *dim,
1506                            *lb,
1507                                 st_pos,
1508                                 prod[MAXDIM];
1509         int                     span[MAXDIM],
1510                                 dist[MAXDIM],
1511                                 indx[MAXDIM];
1512         int                     i,
1513                                 j,
1514                                 inc,
1515                                 st[MAXDIM],
1516                                 endp[MAXDIM];
1517         int                     count = 0;
1518         char       *ptr;
1519
1520         n = ARR_NDIM(array);
1521         dim = ARR_DIMS(array);
1522         lb = ARR_LBOUND(array);
1523         ptr = ARR_DATA_PTR(array);
1524         for (i = 0; i < n; st[i] = stI[i] - lb[i], endp[i] = endpI[i] - lb[i], i++);
1525         mda_get_prod(n, dim, prod);
1526         st_pos = tuple2linear(n, st, prod);
1527         ptr = array_seek(ptr, -1, st_pos);
1528         mda_get_range(n, span, st, endp);
1529         mda_get_offset_values(n, dist, prod, span);
1530         for (i = 0; i < n; indx[i++] = 0);
1531         i = j = n - 1;
1532         do
1533         {
1534                 ptr = array_seek(ptr, -1, dist[j]);
1535                 inc = INTALIGN(*(int32 *) ptr);
1536                 ptr += inc;
1537                 count += inc;
1538         } while ((j = next_tuple(i + 1, indx, span)) != -1);
1539         return count;
1540 }
1541
1542 static char *
1543 array_seek(char *ptr, int eltsize, int nitems)
1544 {
1545         int                     i;
1546
1547         if (eltsize > 0)
1548                 return (ptr + eltsize * nitems);
1549         for (i = 0; i < nitems; i++)
1550                 ptr += INTALIGN(*(int32 *) ptr);
1551         return (ptr);
1552 }
1553
1554 static int
1555 array_read(char *destptr, int eltsize, int nitems, char *srcptr)
1556 {
1557         int                     i,
1558                                 inc,
1559                                 tmp;
1560
1561         if (eltsize > 0)
1562         {
1563                 memmove(destptr, srcptr, eltsize * nitems);
1564                 return (eltsize * nitems);
1565         }
1566         for (i = inc = 0; i < nitems; i++)
1567         {
1568                 tmp = (INTALIGN(*(int32 *) srcptr));
1569                 memmove(destptr, srcptr, tmp);
1570                 srcptr += tmp;
1571                 destptr += tmp;
1572                 inc += tmp;
1573         }
1574         return (inc);
1575 }
1576
1577 static void
1578 _LOArrayRange(int st[],
1579                           int endp[],
1580                           int bsize,
1581                           int srcfd,
1582                           int destfd,
1583                           ArrayType *array,
1584                           int isSrcLO,
1585                           bool *isNull)
1586 {
1587         int                     n,
1588                            *dim,
1589                                 st_pos,
1590                                 prod[MAXDIM];
1591         int                     span[MAXDIM],
1592                                 dist[MAXDIM],
1593                                 indx[MAXDIM];
1594         int                     i,
1595                                 j,
1596                                 inc,
1597                                 tmp,
1598                            *lb,
1599                                 offset;
1600
1601         n = ARR_NDIM(array);
1602         dim = ARR_DIMS(array);
1603         lb = ARR_LBOUND(array);
1604         for (i = 0; i < n; st[i] -= lb[i], endp[i] -= lb[i], i++);
1605
1606         mda_get_prod(n, dim, prod);
1607         st_pos = tuple2linear(n, st, prod);
1608         offset = st_pos * bsize;
1609         if (lo_lseek(srcfd, offset, SEEK_SET) < 0)
1610                 return;
1611         mda_get_range(n, span, st, endp);
1612         mda_get_offset_values(n, dist, prod, span);
1613         for (i = 0; i < n; indx[i++] = 0);
1614         for (i = n - 1, inc = bsize; i >= 0; inc *= span[i--])
1615                 if (dist[i])
1616                         break;
1617         j = n - 1;
1618         do
1619         {
1620                 offset += (dist[j] * bsize);
1621                 if (lo_lseek(srcfd, offset, SEEK_SET) < 0)
1622                         return;
1623                 tmp = _LOtransfer((char **) &srcfd, inc, 1, (char **) &destfd, isSrcLO, 1);
1624                 if (tmp < inc)
1625                         return;
1626                 offset += inc;
1627         } while ((j = next_tuple(i + 1, indx, span)) != -1);
1628 }
1629
1630
1631 static void
1632 _ReadArray(int st[],
1633                    int endp[],
1634                    int bsize,
1635                    int srcfd,
1636                    int destfd,
1637                    ArrayType *array,
1638                    int isDestLO,
1639                    bool *isNull)
1640 {
1641         int                     n,
1642                            *dim,
1643                                 st_pos,
1644                                 prod[MAXDIM];
1645         int                     span[MAXDIM],
1646                                 dist[MAXDIM],
1647                                 indx[MAXDIM];
1648         int                     i,
1649                                 j,
1650                                 inc,
1651                                 tmp,
1652                            *lb,
1653                                 offset;
1654
1655         n = ARR_NDIM(array);
1656         dim = ARR_DIMS(array);
1657         lb = ARR_LBOUND(array);
1658         for (i = 0; i < n; st[i] -= lb[i], endp[i] -= lb[i], i++);
1659
1660         mda_get_prod(n, dim, prod);
1661         st_pos = tuple2linear(n, st, prod);
1662         offset = st_pos * bsize;
1663         if (lo_lseek(srcfd, offset, SEEK_SET) < 0)
1664                 return;
1665         mda_get_range(n, span, st, endp);
1666         mda_get_offset_values(n, dist, prod, span);
1667         for (i = 0; i < n; indx[i++] = 0);
1668         for (i = n - 1, inc = bsize; i >= 0; inc *= span[i--])
1669                 if (dist[i])
1670                         break;
1671         j = n - 1;
1672         do
1673         {
1674                 offset += (dist[j] * bsize);
1675                 if (lo_lseek(srcfd, offset, SEEK_SET) < 0)
1676                         return;
1677                 tmp = _LOtransfer((char **) &destfd, inc, 1, (char **) &srcfd, 1, isDestLO);
1678                 if (tmp < inc)
1679                         return;
1680                 offset += inc;
1681         } while ((j = next_tuple(i + 1, indx, span)) != -1);
1682 }
1683
1684
1685 int
1686 _LOtransfer(char **destfd,
1687                         int size,
1688                         int nitems,
1689                         char **srcfd,
1690                         int isSrcLO,
1691                         int isDestLO)
1692 {
1693 #define MAX_READ (512 * 1024)
1694 #define min(a, b) (a < b ? a : b)
1695         struct varlena *v = NULL;
1696         int                     tmp,
1697                                 inc,
1698                                 resid;
1699
1700         inc = nitems * size;
1701         if (isSrcLO && isDestLO && inc > 0)
1702                 for (tmp = 0, resid = inc;
1703                          resid > 0 && (inc = min(resid, MAX_READ)) > 0; resid -= inc)
1704                 {
1705 #ifdef LOARRAY
1706                         v = (struct varlena *) LOread((int) *srcfd, inc);
1707                         if (VARSIZE(v) - VARHDRSZ < inc)
1708                         {
1709                                 pfree(v);
1710                                 return (-1);
1711                         }
1712                         tmp += LOwrite((int) *destfd, v);
1713 #endif
1714                         pfree(v);
1715
1716                 }
1717         else if (!isSrcLO && isDestLO)
1718         {
1719                 tmp = lo_write((int) *destfd, *srcfd, inc);
1720                 *srcfd = *srcfd + tmp;
1721         }
1722         else if (isSrcLO && !isDestLO)
1723         {
1724                 tmp = lo_read((int) *srcfd, *destfd, inc);
1725                 *destfd = *destfd + tmp;
1726         }
1727         else
1728         {
1729                 memmove(*destfd, *srcfd, inc);
1730                 tmp = inc;
1731                 *srcfd += inc;
1732                 *destfd += inc;
1733         }
1734         return (tmp);
1735 #undef MAX_READ
1736 }
1737
1738 char *
1739 _array_newLO(int *fd, int flag)
1740 {
1741         char       *p;
1742         char            saveName[NAME_LEN];
1743
1744         p = (char *) palloc(NAME_LEN);
1745         sprintf(p, "/Arry.%d", newoid());
1746         strcpy(saveName, p);
1747 #ifdef LOARRAY
1748         if ((*fd = LOcreat(saveName, 0600, flag)) < 0)
1749                 elog(ERROR, "Large object create failed");
1750 #endif
1751         return (p);
1752 }