]> granicus.if.org Git - postgresql/blob - src/backend/utils/adt/arrayfuncs.c
Apply Win32 patch from Horak Daniel.
[postgresql] / src / backend / utils / adt / arrayfuncs.c
1 /*-------------------------------------------------------------------------
2  *
3  * arrayfuncs.c--
4  *        Special functions for arrays.
5  *
6  * Copyright (c) 1994, Regents of the University of California
7  *
8  *
9  * IDENTIFICATION
10  *        $Header: /cvsroot/pgsql/src/backend/utils/adt/arrayfuncs.c,v 1.35 1999/01/17 06:18:45 momjian Exp $
11  *
12  *-------------------------------------------------------------------------
13  */
14
15 #include <ctype.h>
16 #include <stdio.h>
17 #include <string.h>
18
19 #include "postgres.h"
20
21 #include "catalog/catalog.h"
22 #include "catalog/pg_type.h"
23
24 #include "utils/syscache.h"
25 #include "utils/memutils.h"
26 #include "storage/fd.h"
27 #include "fmgr.h"
28 #include "utils/array.h"
29
30 #include "libpq/libpq-fs.h"
31 #include "libpq/be-fsstubs.h"
32
33 #define ASSGN    "="
34
35 /* An array has the following internal structure:
36  *        <nbytes>              - total number of bytes
37  *        <ndim>                - number of dimensions of the array
38  *        <flags>               - bit mask of flags
39  *        <dim>                 - size of each array axis
40  *        <dim_lower>   - lower boundary of each dimension
41  *        <actual data> - whatever is the stored data
42  */
43
44 /*-=-=--=-=-=-=-=-=-=-=--=-=-=-=-=-=-=-=--=-=-=-=-=-=-=-=--=-=-=-=-=-=-=-*/
45 static int      _ArrayCount(char *str, int *dim, int typdelim);
46 static char *_ReadArrayStr(char *arrayStr, int nitems, int ndim, int *dim,
47                           FmgrInfo *inputproc, Oid typelem, int32 typmod,
48                           char typdelim, int typlen, bool typbyval,
49                           char typalign, int *nbytes);
50
51 #ifdef LOARRAY
52 static char *_ReadLOArray(char *str, int *nbytes, int *fd, bool *chunkFlag,
53                          int ndim, int *dim, int baseSize);
54
55 #endif
56 static void _CopyArrayEls(char **values, char *p, int nitems, int typlen,
57                           char typalign, bool typbyval);
58 static void system_cache_lookup(Oid element_type, bool input, int *typlen,
59                                  bool *typbyval, char *typdelim, Oid *typelem, Oid *proc,
60                                         char *typalign);
61 static Datum _ArrayCast(char *value, bool byval, int len);
62
63 #ifdef LOARRAY
64 static char *_AdvanceBy1word(char *str, char **word);
65
66 #endif
67 static void _ArrayRange(int *st, int *endp, int bsize, char *destPtr,
68                         ArrayType *array, int from);
69 static int      _ArrayClipCount(int *stI, int *endpI, ArrayType *array);
70 static void _LOArrayRange(int *st, int *endp, int bsize, int srcfd,
71                           int destfd, ArrayType *array, int isSrcLO, bool *isNull);
72 static void _ReadArray(int *st, int *endp, int bsize, int srcfd, int destfd,
73                    ArrayType *array, int isDestLO, bool *isNull);
74 static int      ArrayCastAndSet(char *src, bool typbyval, int typlen, char *dest);
75 static int      SanityCheckInput(int ndim, int n, int *dim, int *lb, int *indx);
76 static int      array_read(char *destptr, int eltsize, int nitems, char *srcptr);
77 static char *array_seek(char *ptr, int eltsize, int nitems);
78
79 /*---------------------------------------------------------------------
80  * array_in :
81  *                converts an array from the external format in "string" to
82  *                it internal format.
83  * return value :
84  *                the internal representation of the input array
85  *--------------------------------------------------------------------
86  */
87 char *
88 array_in(char *string,                  /* input array in external form */
89                  Oid element_type,              /* type OID of an array element */
90                  int32 typmod)
91 {
92         int                     typlen;
93         bool            typbyval,
94                                 done;
95         char            typdelim;
96         Oid                     typinput;
97         Oid                     typelem;
98         char       *string_save,
99                            *p,
100                            *q,
101                            *r;
102         FmgrInfo        inputproc;
103         int                     i,
104                                 nitems;
105         int32           nbytes;
106         char       *dataPtr;
107         ArrayType  *retval = NULL;
108         int                     ndim,
109                                 dim[MAXDIM],
110                                 lBound[MAXDIM];
111         char            typalign;
112
113         system_cache_lookup(element_type, true, &typlen, &typbyval, &typdelim,
114                                                 &typelem, &typinput, &typalign);
115
116         fmgr_info(typinput, &inputproc);
117
118         string_save = (char *) palloc(strlen(string) + 3);
119         strcpy(string_save, string);
120
121         /* --- read array dimensions  ---------- */
122         p = q = string_save;
123         done = false;
124         for (ndim = 0; !done;)
125         {
126                 while (isspace(*p))
127                         p++;
128                 if (*p == '[')
129                 {
130                         p++;
131                         if ((r = (char *) strchr(p, ':')) == (char *) NULL)
132                                 lBound[ndim] = 1;
133                         else
134                         {
135                                 *r = '\0';
136                                 lBound[ndim] = atoi(p);
137                                 p = r + 1;
138                         }
139                         for (q = p; isdigit(*q); q++);
140                         if (*q != ']')
141                                 elog(ERROR, "array_in: missing ']' in array declaration");
142                         *q = '\0';
143                         dim[ndim] = atoi(p);
144                         if ((dim[ndim] < 0) || (lBound[ndim] < 0))
145                                 elog(ERROR, "array_in: array dimensions need to be positive");
146                         dim[ndim] = dim[ndim] - lBound[ndim] + 1;
147                         if (dim[ndim] < 0)
148                                 elog(ERROR, "array_in: upper_bound cannot be < lower_bound");
149                         p = q + 1;
150                         ndim++;
151                 }
152                 else
153                         done = true;
154         }
155
156         if (ndim == 0)
157         {
158                 if (*p == '{')
159                 {
160                         ndim = _ArrayCount(p, dim, typdelim);
161                         for (i = 0; i < ndim; lBound[i++] = 1);
162                 }
163                 else
164                         elog(ERROR, "array_in: Need to specify dimension");
165         }
166         else
167         {
168                 while (isspace(*p))
169                         p++;
170                 if (strncmp(p, ASSGN, strlen(ASSGN)))
171                         elog(ERROR, "array_in: missing assignment operator");
172                 p += strlen(ASSGN);
173                 while (isspace(*p))
174                         p++;
175         }
176
177 #ifdef ARRAYDEBUG
178         printf("array_in- ndim %d (", ndim);
179         for (i = 0; i < ndim; i++)
180         {
181                 printf(" %d", dim[i]);
182         };
183         printf(") for %s\n", string);
184 #endif
185
186         nitems = getNitems(ndim, dim);
187         if (nitems == 0)
188         {
189                 char       *emptyArray = palloc(sizeof(ArrayType));
190
191                 MemSet(emptyArray, 0, sizeof(ArrayType));
192                 *(int32 *) emptyArray = sizeof(ArrayType);
193                 return emptyArray;
194         }
195
196         if (*p == '{')
197         {
198                 /* array not a large object */
199                 dataPtr =
200                         (char *) _ReadArrayStr(p, nitems, ndim, dim, &inputproc, typelem,
201                                                         typmod, typdelim, typlen, typbyval, typalign,
202                                                                    &nbytes);
203                 nbytes += ARR_OVERHEAD(ndim);
204                 retval = (ArrayType *) palloc(nbytes);
205                 MemSet(retval, 0, nbytes);
206                 memmove(retval, (char *) &nbytes, sizeof(int));
207                 memmove((char *) ARR_NDIM_PTR(retval), (char *) &ndim, sizeof(int));
208                 SET_LO_FLAG(false, retval);
209                 memmove((char *) ARR_DIMS(retval), (char *) dim, ndim * sizeof(int));
210                 memmove((char *) ARR_LBOUND(retval), (char *) lBound,
211                                 ndim * sizeof(int));
212
213                 /*
214                  * dataPtr is an array of arbitraystuff even though its type is
215                  * char* cast to char** to pass to _CopyArrayEls for now  - jolly
216                  */
217                 _CopyArrayEls((char **) dataPtr,
218                                           ARR_DATA_PTR(retval), nitems,
219                                           typlen, typalign, typbyval);
220         }
221         else
222         {
223 #ifdef LOARRAY
224                 int                     dummy,
225                                         bytes;
226                 bool            chunked = false;
227
228                 dataPtr = _ReadLOArray(p, &bytes, &dummy, &chunked, ndim,
229                                                            dim, typlen);
230                 nbytes = bytes + ARR_OVERHEAD(ndim);
231                 retval = (ArrayType *) palloc(nbytes);
232                 MemSet(retval, 0, nbytes);
233                 memmove(retval, (char *) &nbytes, sizeof(int));
234                 memmove((char *) ARR_NDIM_PTR(retval), (char *) &ndim, sizeof(int));
235                 SET_LO_FLAG(true, retval);
236                 SET_CHUNK_FLAG(chunked, retval);
237                 memmove((char *) ARR_DIMS(retval), (char *) dim, ndim * sizeof(int));
238                 memmove((char *) ARR_LBOUND(retval), (char *) lBound, ndim * sizeof(int));
239                 memmove(ARR_DATA_PTR(retval), dataPtr, bytes);
240 #endif
241                 elog(ERROR, "large object arrays not supported");
242         }
243         pfree(string_save);
244         return (char *) retval;
245 }
246
247 /*-----------------------------------------------------------------------------
248  * _ArrayCount --
249  *       Counts the number of dimensions and the *dim array for an array string.
250  *               The syntax for array input is C-like nested curly braces
251  *-----------------------------------------------------------------------------
252  */
253 static int
254 _ArrayCount(char *str, int *dim, int typdelim)
255 {
256         int                     nest_level = 0,
257                                 i;
258         int                     ndim = 0,
259                                 temp[MAXDIM];
260         bool            scanning_string = false;
261         bool            eoArray = false;
262         char       *q;
263
264         for (i = 0; i < MAXDIM; ++i)
265                 temp[i] = dim[i] = 0;
266
267         if (strncmp(str, "{}", 2) == 0)
268                 return 0;
269
270         q = str;
271         while (eoArray != true)
272         {
273                 bool            done = false;
274
275                 while (!done)
276                 {
277                         switch (*q)
278                         {
279                                 case '\\':
280                                         /* skip escaped characters (\ and ") inside strings */
281                                         if (scanning_string && *(q + 1))
282                                                 q++;
283                                         break;
284                                 case '\0':
285
286                                         /*
287                                          * Signal a premature end of the string.  DZ -
288                                          * 2-9-1996
289                                          */
290                                         elog(ERROR, "malformed array constant: %s", str);
291                                         break;
292                                 case '\"':
293                                         scanning_string = !scanning_string;
294                                         break;
295                                 case '{':
296                                         if (!scanning_string)
297                                         {
298                                                 temp[nest_level] = 0;
299                                                 nest_level++;
300                                         }
301                                         break;
302                                 case '}':
303                                         if (!scanning_string)
304                                         {
305                                                 if (!ndim)
306                                                         ndim = nest_level;
307                                                 nest_level--;
308                                                 if (nest_level)
309                                                         temp[nest_level - 1]++;
310                                                 if (nest_level == 0)
311                                                         eoArray = done = true;
312                                         }
313                                         break;
314                                 default:
315                                         if (!ndim)
316                                                 ndim = nest_level;
317                                         if (*q == typdelim && !scanning_string)
318                                                 done = true;
319                                         break;
320                         }
321                         if (!done)
322                                 q++;
323                 }
324                 temp[ndim - 1]++;
325                 q++;
326                 if (!eoArray)
327                         while (isspace(*q))
328                                 q++;
329         }
330         for (i = 0; i < ndim; ++i)
331                 dim[i] = temp[i];
332
333         return ndim;
334 }
335
336 /*---------------------------------------------------------------------------
337  * _ReadArrayStr :
338  *       parses the array string pointed by "arrayStr" and converts it in the
339  *       internal format. The external format expected is like C array
340  *       declaration. Unspecified elements are initialized to zero for fixed length
341  *       base types and to empty varlena structures for variable length base
342  *       types.
343  * result :
344  *       returns the internal representation of the array elements
345  *       nbytes is set to the size of the array in its internal representation.
346  *---------------------------------------------------------------------------
347  */
348 static char *
349 _ReadArrayStr(char *arrayStr,
350                           int nitems,
351                           int ndim,
352                           int *dim,
353                           FmgrInfo *inputproc,          /* function used for the
354                                                                                  * conversion */
355                           Oid typelem,
356                           int32 typmod,
357                           char typdelim,
358                           int typlen,
359                           bool typbyval,
360                           char typalign,
361                           int *nbytes)
362 {
363         int                     i,
364                                 nest_level = 0;
365         char       *p,
366                            *q,
367                            *r,
368                           **values;
369         bool            scanning_string = false;
370         int                     indx[MAXDIM],
371                                 prod[MAXDIM];
372         bool            eoArray = false;
373
374         mda_get_prod(ndim, dim, prod);
375         for (i = 0; i < ndim; indx[i++] = 0);
376         /* read array enclosed within {} */
377         values = (char **) palloc(nitems * sizeof(char *));
378         MemSet(values, 0, nitems * sizeof(char *));
379         q = p = arrayStr;
380
381         while (!eoArray)
382         {
383                 bool            done = false;
384                 int                     i = -1;
385
386                 while (!done)
387                 {
388                         switch (*q)
389                         {
390                                 case '\\':
391                                         /* Crunch the string on top of the backslash. */
392                                         for (r = q; *r != '\0'; r++)
393                                                 *r = *(r + 1);
394                                         break;
395                                 case '\"':
396                                         if (!scanning_string)
397                                         {
398                                                 while (p != q)
399                                                         p++;
400                                                 p++;    /* get p past first doublequote */
401                                         }
402                                         else
403                                                 *q = '\0';
404                                         scanning_string = !scanning_string;
405                                         break;
406                                 case '{':
407                                         if (!scanning_string)
408                                         {
409                                                 p++;
410                                                 nest_level++;
411                                                 if (nest_level > ndim)
412                                                         elog(ERROR, "array_in: illformed array constant");
413                                                 indx[nest_level - 1] = 0;
414                                                 indx[ndim - 1] = 0;
415                                         }
416                                         break;
417                                 case '}':
418                                         if (!scanning_string)
419                                         {
420                                                 if (i == -1)
421                                                         i = tuple2linear(ndim, indx, prod);
422                                                 nest_level--;
423                                                 if (nest_level == 0)
424                                                         eoArray = done = true;
425                                                 else
426                                                 {
427                                                         *q = '\0';
428                                                         indx[nest_level - 1]++;
429                                                 }
430                                         }
431                                         break;
432                                 default:
433                                         if (*q == typdelim && !scanning_string)
434                                         {
435                                                 if (i == -1)
436                                                         i = tuple2linear(ndim, indx, prod);
437                                                 done = true;
438                                                 indx[ndim - 1]++;
439                                         }
440                                         break;
441                         }
442                         if (!done)
443                                 q++;
444                 }
445                 *q = '\0';
446                 if (i >= nitems)
447                         elog(ERROR, "array_in: illformed array constant");
448                 values[i] = (*fmgr_faddr(inputproc)) (p, typelem, typmod);
449                 p = ++q;
450                 if (!eoArray)
451
452                         /*
453                          * if not at the end of the array skip white space
454                          */
455                         while (isspace(*q))
456                         {
457                                 p++;
458                                 q++;
459                         }
460         }
461         if (typlen > 0)
462         {
463                 *nbytes = nitems * typlen;
464                 if (!typbyval)
465                         for (i = 0; i < nitems; i++)
466                                 if (!values[i])
467                                 {
468                                         values[i] = palloc(typlen);
469                                         MemSet(values[i], 0, typlen);
470                                 }
471         }
472         else
473         {
474                 for (i = 0, *nbytes = 0; i < nitems; i++)
475                 {
476                         if (values[i])
477                         {
478                                 if (typalign == 'd')
479                                         *nbytes += DOUBLEALIGN(*(int32 *) values[i]);
480                                 else
481                                         *nbytes += INTALIGN(*(int32 *) values[i]);
482                         }
483                         else
484                         {
485                                 *nbytes += sizeof(int32);
486                                 values[i] = palloc(sizeof(int32));
487                                 *(int32 *) values[i] = sizeof(int32);
488                         }
489                 }
490         }
491         return (char *) values;
492 }
493
494
495 /*----------------------------------------------------------------------------
496  * Read data about an array to be stored as a large object
497  *----------------------------------------------------------------------------
498  */
499 #ifdef LOARRAY
500 static char *
501 _ReadLOArray(char *str,
502                          int *nbytes,
503                          int *fd,
504                          bool *chunkFlag,
505                          int ndim,
506                          int *dim,
507                          int baseSize)
508 {
509         char       *inputfile,
510                            *accessfile = NULL,
511                            *chunkfile = NULL;
512         char       *retStr,
513                            *_AdvanceBy1word();
514         Oid                     lobjId;
515
516         str = _AdvanceBy1word(str, &inputfile);
517
518         while (str != NULL)
519         {
520                 char       *word;
521
522                 str = _AdvanceBy1word(str, &word);
523
524                 if (!strcmp(word, "-chunk"))
525                 {
526                         if (str == NULL)
527                                 elog(ERROR, "array_in: access pattern file required");
528                         str = _AdvanceBy1word(str, &accessfile);
529                 }
530                 else if (!strcmp(word, "-noreorg"))
531                 {
532                         if (str == NULL)
533                                 elog(ERROR, "array_in: chunk file required");
534                         str = _AdvanceBy1word(str, &chunkfile);
535                 }
536                 else
537                         elog(ERROR, "usage: <input file> -chunk DEFAULT/<access pattern file> -invert/-native [-noreorg <chunk file>]");
538         }
539
540         if (inputfile == NULL)
541                 elog(ERROR, "array_in: missing file name");
542         lobjId = lo_creat(0);
543         *fd = lo_open(lobjId, INV_READ);
544         if (*fd < 0)
545                 elog(ERROR, "Large object create failed");
546         retStr = inputfile;
547         *nbytes = strlen(retStr) + 2;
548
549         if (accessfile)
550         {
551                 FILE       *afd;
552
553 #ifndef __CYGWIN32__
554                 if ((afd = AllocateFile(accessfile, "r")) == NULL)
555 #else
556                 if ((afd = AllocateFile(accessfile, "r")) == NULL)
557 #endif
558                         elog(ERROR, "unable to open access pattern file");
559                 *chunkFlag = true;
560                 retStr = _ChunkArray(*fd, afd, ndim, dim, baseSize, nbytes,
561                                                          chunkfile);
562                 FreeFile(afd);
563         }
564         return retStr;
565 }
566
567 #endif
568
569 static void
570 _CopyArrayEls(char **values,
571                           char *p,
572                           int nitems,
573                           int typlen,
574                           char typalign,
575                           bool typbyval)
576 {
577         int                     i;
578
579         for (i = 0; i < nitems; i++)
580         {
581                 int                     inc;
582
583                 inc = ArrayCastAndSet(values[i], typbyval, typlen, p);
584                 p += inc;
585                 if (!typbyval)
586                         pfree(values[i]);
587         }
588         pfree(values);
589 }
590
591 /*-------------------------------------------------------------------------
592  * array_out :
593  *                 takes the internal representation of an array and returns a string
594  *                containing the array in its external format.
595  *-------------------------------------------------------------------------
596  */
597 char *
598 array_out(ArrayType *v, Oid element_type)
599 {
600         int                     typlen;
601         bool            typbyval;
602         char            typdelim;
603         Oid                     typoutput,
604                                 typelem;
605         FmgrInfo        outputproc;
606         char            typalign;
607
608         char       *p,
609                            *tmp,
610                            *retval,
611                           **values,
612                                 delim[2];
613         int                     nitems,
614                                 overall_length,
615                                 i,
616                                 j,
617                                 k,
618                                 l,
619                                 indx[MAXDIM];
620         bool            dummy_bool;
621         int                     ndim,
622                            *dim;
623
624         if (v == (ArrayType *) NULL)
625                 return (char *) NULL;
626
627         if (ARR_IS_LO(v) == true)
628         {
629                 char       *p,
630                                    *save_p;
631                 int                     nbytes;
632
633                 /* get a wide string to print to */
634                 p = array_dims(v, &dummy_bool);
635                 nbytes = strlen(ARR_DATA_PTR(v)) + VARHDRSZ + *(int *) p;
636
637                 save_p = (char *) palloc(nbytes);
638
639                 strcpy(save_p, p + sizeof(int));
640                 strcat(save_p, ASSGN);
641                 strcat(save_p, ARR_DATA_PTR(v));
642                 pfree(p);
643                 return save_p;
644         }
645
646         system_cache_lookup(element_type, false, &typlen, &typbyval,
647                                                 &typdelim, &typelem, &typoutput, &typalign);
648         fmgr_info(typoutput, &outputproc);
649         sprintf(delim, "%c", typdelim);
650         ndim = ARR_NDIM(v);
651         dim = ARR_DIMS(v);
652         nitems = getNitems(ndim, dim);
653
654         if (nitems == 0)
655         {
656                 char       *emptyArray = palloc(3);
657
658                 emptyArray[0] = '{';
659                 emptyArray[1] = '}';
660                 emptyArray[2] = '\0';
661                 return emptyArray;
662         }
663
664         p = ARR_DATA_PTR(v);
665         overall_length = 1;                     /* [TRH] don't forget to count \0 at end. */
666         values = (char **) palloc(nitems * sizeof(char *));
667         for (i = 0; i < nitems; i++)
668         {
669                 if (typbyval)
670                 {
671                         switch (typlen)
672                         {
673                                 case 1:
674                                         values[i] = (*fmgr_faddr(&outputproc)) (*p, typelem);
675                                         break;
676                                 case 2:
677                                         values[i] = (*fmgr_faddr(&outputproc)) (*(int16 *) p, typelem);
678                                         break;
679                                 case 3:
680                                 case 4:
681                                         values[i] = (*fmgr_faddr(&outputproc)) (*(int32 *) p, typelem);
682                                         break;
683                         }
684                         p += typlen;
685                 }
686                 else
687                 {
688                         values[i] = (*fmgr_faddr(&outputproc)) (p, typelem);
689                         if (typlen > 0)
690                                 p += typlen;
691                         else
692                                 p += INTALIGN(*(int32 *) p);
693
694                         /*
695                          * For the pair of double quotes
696                          */
697                         overall_length += 2;
698                 }
699                 for (tmp = values[i]; *tmp; tmp++)
700                 {
701                         overall_length += 1;
702                         if (*tmp == '"')
703                                 overall_length += 1;
704                 }
705                 overall_length += 1;
706         }
707
708         /*
709          * count total number of curly braces in output string
710          */
711         for (i = j = 0, k = 1; i < ndim; k *= dim[i++], j += k);
712
713         p = (char *) palloc(overall_length + 2 * j);
714         retval = p;
715
716         strcpy(p, "{");
717         for (i = 0; i < ndim; indx[i++] = 0);
718         j = 0;
719         k = 0;
720         do
721         {
722                 for (i = j; i < ndim - 1; i++)
723                         strcat(p, "{");
724
725                 /*
726                  * Surround anything that is not passed by value in double quotes.
727                  * See above for more details.
728                  */
729                 if (!typbyval)
730                 {
731                         strcat(p, "\"");
732                         l = strlen(p);
733                         for (tmp = values[k]; *tmp; tmp++)
734                         {
735                                 if (*tmp == '"')
736                                         p[l++] = '\\';
737                                 p[l++] = *tmp;
738                         }
739                         p[l] = '\0';
740                         strcat(p, "\"");
741                 }
742                 else
743                         strcat(p, values[k]);
744                 pfree(values[k++]);
745
746                 for (i = ndim - 1; i >= 0; i--)
747                 {
748                         indx[i] = (indx[i] + 1) % dim[i];
749                         if (indx[i])
750                         {
751                                 strcat(p, delim);
752                                 break;
753                         }
754                         else
755                                 strcat(p, "}");
756                 }
757                 j = i;
758         } while (j != -1);
759
760         pfree(values);
761         return retval;
762 }
763
764 /*-----------------------------------------------------------------------------
765  * array_dims :
766  *                returns the dimension of the array pointed to by "v"
767  *----------------------------------------------------------------------------
768  */
769 char *
770 array_dims(ArrayType *v, bool *isNull)
771 {
772         char       *p,
773                            *save_p;
774         int                     nbytes,
775                                 i;
776         int                *dimv,
777                            *lb;
778
779         if (v == (ArrayType *) NULL)
780                 RETURN_NULL;
781         nbytes = ARR_NDIM(v) * 33;
782
783         /*
784          * 33 since we assume 15 digits per number + ':' +'[]'
785          */
786         save_p = p = (char *) palloc(nbytes + VARHDRSZ);
787         MemSet(save_p, 0, nbytes + VARHDRSZ);
788         dimv = ARR_DIMS(v);
789         lb = ARR_LBOUND(v);
790         p += VARHDRSZ;
791         for (i = 0; i < ARR_NDIM(v); i++)
792         {
793                 sprintf(p, "[%d:%d]", lb[i], dimv[i] + lb[i] - 1);
794                 p += strlen(p);
795         }
796         nbytes = strlen(save_p + VARHDRSZ) + VARHDRSZ;
797         memmove(save_p, &nbytes, VARHDRSZ);
798         return save_p;
799 }
800
801 /*---------------------------------------------------------------------------
802  * array_ref :
803  *        This routing takes an array pointer and an index array and returns
804  *        a pointer to the referred element if element is passed by
805  *        reference otherwise returns the value of the referred element.
806  *---------------------------------------------------------------------------
807  */
808 Datum
809 array_ref(ArrayType *array,
810                   int n,
811                   int *indx,
812                   int reftype,
813                   int elmlen,
814                   int arraylen,
815                   bool *isNull)
816 {
817         int                     i,
818                                 ndim,
819                            *dim,
820                            *lb,
821                                 offset,
822                                 nbytes;
823         struct varlena *v = NULL;
824         char       *retval = NULL;
825
826         if (array == (ArrayType *) NULL)
827                 RETURN_NULL;
828         if (arraylen > 0)
829         {
830
831                 /*
832                  * fixed length arrays -- these are assumed to be 1-d
833                  */
834                 if (indx[0] * elmlen > arraylen)
835                         elog(ERROR, "array_ref: array bound exceeded");
836                 retval = (char *) array + indx[0] * elmlen;
837                 return _ArrayCast(retval, reftype, elmlen);
838         }
839         dim = ARR_DIMS(array);
840         lb = ARR_LBOUND(array);
841         ndim = ARR_NDIM(array);
842         nbytes = (*(int32 *) array) - ARR_OVERHEAD(ndim);
843
844         if (!SanityCheckInput(ndim, n, dim, lb, indx))
845                 RETURN_NULL;
846
847         offset = GetOffset(n, dim, lb, indx);
848
849         if (ARR_IS_LO(array))
850         {
851                 char       *lo_name;
852                 int                     fd = 0;
853
854                 /* We are assuming fixed element lengths here */
855                 offset *= elmlen;
856                 lo_name = (char *) ARR_DATA_PTR(array);
857 #ifdef LOARRAY
858                 if ((fd = LOopen(lo_name, ARR_IS_INV(array) ? INV_READ : O_RDONLY)) < 0)
859                         RETURN_NULL;
860 #endif
861                 if (ARR_IS_CHUNKED(array))
862                         v = _ReadChunkArray1El(indx, elmlen, fd, array, isNull);
863                 else
864                 {
865                         if (lo_lseek(fd, offset, SEEK_SET) < 0)
866                                 RETURN_NULL;
867 #ifdef LOARRAY
868                         v = (struct varlena *) LOread(fd, elmlen);
869 #endif
870                 }
871                 if (*isNull)
872                         RETURN_NULL;
873                 if (VARSIZE(v) - VARHDRSZ < elmlen)
874                         RETURN_NULL;
875                 lo_close(fd);
876                 retval = (char *) _ArrayCast((char *) VARDATA(v), reftype, elmlen);
877                 if (reftype == 0)
878                 {                                               /* not by value */
879                         char       *tempdata = palloc(elmlen);
880
881                         memmove(tempdata, retval, elmlen);
882                         retval = tempdata;
883                 }
884                 pfree(v);
885                 return (Datum) retval;
886         }
887
888         if (elmlen > 0)
889         {
890                 offset = offset * elmlen;
891                 /* off the end of the array */
892                 if (nbytes - offset < 1)
893                         RETURN_NULL;
894                 retval = ARR_DATA_PTR(array) + offset;
895                 return _ArrayCast(retval, reftype, elmlen);
896         }
897         else
898         {
899                 bool            done = false;
900                 char       *temp;
901                 int                     bytes = nbytes;
902
903                 temp = ARR_DATA_PTR(array);
904                 i = 0;
905                 while (bytes > 0 && !done)
906                 {
907                         if (i == offset)
908                         {
909                                 retval = temp;
910                                 done = true;
911                         }
912                         bytes -= INTALIGN(*(int32 *) temp);
913                         temp += INTALIGN(*(int32 *) temp);
914                         i++;
915                 }
916                 if (!done)
917                         RETURN_NULL;
918                 return (Datum) retval;
919         }
920 }
921
922 /*-----------------------------------------------------------------------------
923  * array_clip :
924  *                This routine takes an array and a range of indices (upperIndex and
925  *                 lowerIndx), creates a new array structure for the referred elements
926  *                 and returns a pointer to it.
927  *-----------------------------------------------------------------------------
928  */
929 Datum
930 array_clip(ArrayType *array,
931                    int n,
932                    int *upperIndx,
933                    int *lowerIndx,
934                    int reftype,
935                    int len,
936                    bool *isNull)
937 {
938         int                     i,
939                                 ndim,
940                            *dim,
941                            *lb,
942                                 nbytes;
943         ArrayType  *newArr;
944         int                     bytes,
945                                 span[MAXDIM];
946
947         /* timer_start(); */
948         if (array == (ArrayType *) NULL)
949                 RETURN_NULL;
950         dim = ARR_DIMS(array);
951         lb = ARR_LBOUND(array);
952         ndim = ARR_NDIM(array);
953         nbytes = (*(int32 *) array) - ARR_OVERHEAD(ndim);
954
955         if (!SanityCheckInput(ndim, n, dim, lb, upperIndx))
956                 RETURN_NULL;
957
958         if (!SanityCheckInput(ndim, n, dim, lb, lowerIndx))
959                 RETURN_NULL;
960
961         for (i = 0; i < n; i++)
962                 if (lowerIndx[i] > upperIndx[i])
963                         elog(ERROR, "lowerIndex cannot be larger than upperIndx");
964         mda_get_range(n, span, lowerIndx, upperIndx);
965
966         if (ARR_IS_LO(array))
967         {
968 #ifdef LOARRAY
969                 char       *lo_name;
970
971 #endif
972                 char       *newname = NULL;
973                 int                     fd = 0,
974                                         newfd = 0,
975                                         isDestLO = true,
976                                         rsize;
977
978                 if (len < 0)
979                         elog(ERROR, "array_clip: array of variable length objects not supported");
980 #ifdef LOARRAY
981                 lo_name = (char *) ARR_DATA_PTR(array);
982                 if ((fd = LOopen(lo_name, ARR_IS_INV(array) ? INV_READ : O_RDONLY)) < 0)
983                         RETURN_NULL;
984                 newname = _array_newLO(&newfd, Unix);
985 #endif
986                 bytes = strlen(newname) + 1 + ARR_OVERHEAD(n);
987                 newArr = (ArrayType *) palloc(bytes);
988                 memmove(newArr, array, sizeof(ArrayType));
989                 memmove(newArr, &bytes, sizeof(int));
990                 memmove(ARR_DIMS(newArr), span, n * sizeof(int));
991                 memmove(ARR_LBOUND(newArr), lowerIndx, n * sizeof(int));
992                 strcpy(ARR_DATA_PTR(newArr), newname);
993
994                 rsize = compute_size(lowerIndx, upperIndx, n, len);
995                 if (rsize < MAX_BUFF_SIZE)
996                 {
997                         char       *buff;
998
999                         rsize += VARHDRSZ;
1000                         buff = palloc(rsize);
1001                         if (buff)
1002                                 isDestLO = false;
1003                         if (ARR_IS_CHUNKED(array))
1004                         {
1005                                 _ReadChunkArray(lowerIndx, upperIndx, len, fd, &(buff[VARHDRSZ]),
1006                                                                 array, 0, isNull);
1007                         }
1008                         else
1009                         {
1010                                 _ReadArray(lowerIndx, upperIndx, len, fd, (int) &(buff[VARHDRSZ]),
1011                                                    array,
1012                                                    0, isNull);
1013                         }
1014                         memmove(buff, &rsize, VARHDRSZ);
1015 #ifdef LOARRAY
1016                         if (!*isNull)
1017                                 bytes = LOwrite(newfd, (struct varlena *) buff);
1018 #endif
1019                         pfree(buff);
1020                 }
1021                 if (isDestLO)
1022                 {
1023                         if (ARR_IS_CHUNKED(array))
1024                         {
1025                                 _ReadChunkArray(lowerIndx, upperIndx, len, fd, (char *) newfd, array,
1026                                                                 1, isNull);
1027                         }
1028                         else
1029                                 _ReadArray(lowerIndx, upperIndx, len, fd, newfd, array, 1, isNull);
1030                 }
1031 #ifdef LOARRAY
1032                 LOclose(fd);
1033                 LOclose(newfd);
1034 #endif
1035                 if (*isNull)
1036                 {
1037                         pfree(newArr);
1038                         newArr = NULL;
1039                 }
1040                 /* timer_end(); */
1041                 return (Datum) newArr;
1042         }
1043
1044         if (len > 0)
1045         {
1046                 bytes = getNitems(n, span);
1047                 bytes = bytes * len + ARR_OVERHEAD(n);
1048         }
1049         else
1050         {
1051                 bytes = _ArrayClipCount(lowerIndx, upperIndx, array);
1052                 bytes += ARR_OVERHEAD(n);
1053         }
1054         newArr = (ArrayType *) palloc(bytes);
1055         memmove(newArr, array, sizeof(ArrayType));
1056         memmove(newArr, &bytes, sizeof(int));
1057         memmove(ARR_DIMS(newArr), span, n * sizeof(int));
1058         memmove(ARR_LBOUND(newArr), lowerIndx, n * sizeof(int));
1059         _ArrayRange(lowerIndx, upperIndx, len, ARR_DATA_PTR(newArr), array, 1);
1060         return (Datum) newArr;
1061 }
1062
1063 /*-----------------------------------------------------------------------------
1064  * array_set  :
1065  *                This routine sets the value of an array location (specified by an index array)
1066  *                to a new value specified by "dataPtr".
1067  * result :
1068  *                returns a pointer to the modified array.
1069  *-----------------------------------------------------------------------------
1070  */
1071 char *
1072 array_set(ArrayType *array,
1073                   int n,
1074                   int *indx,
1075                   char *dataPtr,
1076                   int reftype,
1077                   int elmlen,
1078                   int arraylen,
1079                   bool *isNull)
1080 {
1081         int                     ndim,
1082                            *dim,
1083                            *lb,
1084                                 offset,
1085                                 nbytes;
1086         char       *pos;
1087
1088         if (array == (ArrayType *) NULL)
1089                 RETURN_NULL;
1090         if (arraylen > 0)
1091         {
1092
1093                 /*
1094                  * fixed length arrays -- these are assumed to be 1-d
1095                  */
1096                 if (indx[0] * elmlen > arraylen)
1097                         elog(ERROR, "array_ref: array bound exceeded");
1098                 pos = (char *) array + indx[0] * elmlen;
1099                 ArrayCastAndSet(dataPtr, (bool) reftype, elmlen, pos);
1100                 return (char *) array;
1101         }
1102         dim = ARR_DIMS(array);
1103         lb = ARR_LBOUND(array);
1104         ndim = ARR_NDIM(array);
1105         nbytes = (*(int32 *) array) - ARR_OVERHEAD(ndim);
1106
1107         if (!SanityCheckInput(ndim, n, dim, lb, indx))
1108         {
1109                 elog(ERROR, "array_set: array bound exceeded");
1110                 return (char *) array;
1111         }
1112         offset = GetOffset(n, dim, lb, indx);
1113
1114         if (ARR_IS_LO(array))
1115         {
1116                 int                     fd = 0;
1117                 struct varlena *v;
1118
1119                 /* We are assuming fixed element lengths here */
1120                 offset *= elmlen;
1121 #ifdef LOARRAY
1122                 char       *lo_name;
1123
1124                 lo_name = ARR_DATA_PTR(array);
1125                 if ((fd = LOopen(lo_name, ARR_IS_INV(array) ? INV_WRITE : O_WRONLY)) < 0)
1126                         return (char *) array;
1127 #endif
1128                 if (lo_lseek(fd, offset, SEEK_SET) < 0)
1129                         return (char *) array;
1130                 v = (struct varlena *) palloc(elmlen + VARHDRSZ);
1131                 VARSIZE(v) = elmlen + VARHDRSZ;
1132                 ArrayCastAndSet(dataPtr, (bool) reftype, elmlen, VARDATA(v));
1133 #ifdef LOARRAY
1134                 n = LOwrite(fd, v);
1135 #endif
1136
1137                 /*
1138                  * if (n < VARSIZE(v) - VARHDRSZ) RETURN_NULL;
1139                  */
1140                 pfree(v);
1141                 lo_close(fd);
1142                 return (char *) array;
1143         }
1144         if (elmlen > 0)
1145         {
1146                 offset = offset * elmlen;
1147                 /* off the end of the array */
1148                 if (nbytes - offset < 1)
1149                         return (char *) array;
1150                 pos = ARR_DATA_PTR(array) + offset;
1151         }
1152         else
1153         {
1154                 ArrayType  *newarray;
1155                 char       *elt_ptr;
1156                 int                     oldsize,
1157                                         newsize,
1158                                         oldlen,
1159                                         newlen,
1160                                         lth0,
1161                                         lth1,
1162                                         lth2;
1163
1164                 elt_ptr = array_seek(ARR_DATA_PTR(array), -1, offset);
1165                 oldlen = INTALIGN(*(int32 *) elt_ptr);
1166                 newlen = INTALIGN(*(int32 *) dataPtr);
1167
1168                 if (oldlen == newlen)
1169                 {
1170                         /* new element with same size, overwrite old data */
1171                         ArrayCastAndSet(dataPtr, (bool) reftype, elmlen, elt_ptr);
1172                         return (char *) array;
1173                 }
1174
1175                 /* new element with different size, reallocate the array */
1176                 oldsize = array->size;
1177                 lth0 = ARR_OVERHEAD(n);
1178                 lth1 = (int) (elt_ptr - ARR_DATA_PTR(array));
1179                 lth2 = (int) (oldsize - lth0 - lth1 - oldlen);
1180                 newsize = lth0 + lth1 + newlen + lth2;
1181
1182                 newarray = (ArrayType *) palloc(newsize);
1183                 memmove((char *) newarray, (char *) array, lth0 + lth1);
1184                 newarray->size = newsize;
1185                 newlen = ArrayCastAndSet(dataPtr, (bool) reftype, elmlen,
1186                                                                  (char *) newarray + lth0 + lth1);
1187                 memmove((char *) newarray + lth0 + lth1 + newlen,
1188                                 (char *) array + lth0 + lth1 + oldlen, lth2);
1189
1190                 /* ??? who should free this storage ??? */
1191                 return (char *) newarray;
1192         }
1193         ArrayCastAndSet(dataPtr, (bool) reftype, elmlen, pos);
1194         return (char *) array;
1195 }
1196
1197 /*----------------------------------------------------------------------------
1198  * array_assgn :
1199  *                This routine sets the value of a range of array locations (specified
1200  *                by upper and lower index values ) to new values passed as
1201  *                another array
1202  * result :
1203  *                returns a pointer to the modified array.
1204  *----------------------------------------------------------------------------
1205  */
1206 char *
1207 array_assgn(ArrayType *array,
1208                         int n,
1209                         int *upperIndx,
1210                         int *lowerIndx,
1211                         ArrayType *newArr,
1212                         int reftype,
1213                         int len,
1214                         bool *isNull)
1215 {
1216         int                     i,
1217                                 ndim,
1218                            *dim,
1219                            *lb;
1220
1221         if (array == (ArrayType *) NULL)
1222                 RETURN_NULL;
1223         if (len < 0)
1224                 elog(ERROR, "array_assgn:updates on arrays of variable length elements not allowed");
1225
1226         dim = ARR_DIMS(array);
1227         lb = ARR_LBOUND(array);
1228         ndim = ARR_NDIM(array);
1229
1230         if (!SanityCheckInput(ndim, n, dim, lb, upperIndx) ||
1231                 !SanityCheckInput(ndim, n, dim, lb, lowerIndx))
1232                 return (char *) array;
1233
1234         for (i = 0; i < n; i++)
1235                 if (lowerIndx[i] > upperIndx[i])
1236                         elog(ERROR, "lowerIndex larger than upperIndx");
1237
1238         if (ARR_IS_LO(array))
1239         {
1240                 int                     fd = 0,
1241                                         newfd = 0;
1242
1243 #ifdef LOARRAY
1244                 char       *lo_name;
1245
1246                 lo_name = (char *) ARR_DATA_PTR(array);
1247                 if ((fd = LOopen(lo_name, ARR_IS_INV(array) ? INV_WRITE : O_WRONLY)) < 0)
1248                         return (char *) array;
1249 #endif
1250                 if (ARR_IS_LO(newArr))
1251                 {
1252 #ifdef LOARRAY
1253                         lo_name = (char *) ARR_DATA_PTR(newArr);
1254                         if ((newfd = LOopen(lo_name, ARR_IS_INV(newArr) ? INV_READ : O_RDONLY)) < 0)
1255                                 return (char *) array;
1256 #endif
1257                         _LOArrayRange(lowerIndx, upperIndx, len, fd, newfd, array, 1, isNull);
1258                         lo_close(newfd);
1259                 }
1260                 else
1261                 {
1262                         _LOArrayRange(lowerIndx, upperIndx, len, fd, (int) ARR_DATA_PTR(newArr),
1263                                                   array, 0, isNull);
1264                 }
1265                 lo_close(fd);
1266                 return (char *) array;
1267         }
1268         _ArrayRange(lowerIndx, upperIndx, len, ARR_DATA_PTR(newArr), array, 0);
1269         return (char *) array;
1270 }
1271
1272 /*-----------------------------------------------------------------------------
1273  * array_eq :
1274  *                compares two arrays for equality
1275  * result :
1276  *                returns 1 if the arrays are equal, 0 otherwise.
1277  *-----------------------------------------------------------------------------
1278  */
1279 int
1280 array_eq(ArrayType *array1, ArrayType *array2)
1281 {
1282         if ((array1 == NULL) || (array2 == NULL))
1283                 return 0;
1284         if (*(int *) array1 != *(int *) array2)
1285                 return 0;
1286         if (memcmp(array1, array2, *(int *) array1))
1287                 return 0;
1288         return 1;
1289 }
1290
1291 /***************************************************************************/
1292 /******************|              Support  Routines                       |*****************/
1293 /***************************************************************************/
1294 static void
1295 system_cache_lookup(Oid element_type,
1296                                         bool input,
1297                                         int *typlen,
1298                                         bool *typbyval,
1299                                         char *typdelim,
1300                                         Oid *typelem,
1301                                         Oid *proc,
1302                                         char *typalign)
1303 {
1304         HeapTuple       typeTuple;
1305         Form_pg_type typeStruct;
1306
1307         typeTuple = SearchSysCacheTuple(TYPOID,
1308                                                                         ObjectIdGetDatum(element_type),
1309                                                                         0, 0, 0);
1310
1311         if (!HeapTupleIsValid(typeTuple))
1312         {
1313                 elog(ERROR, "array_out: Cache lookup failed for type %d\n",
1314                          element_type);
1315                 return;
1316         }
1317         typeStruct = (Form_pg_type) GETSTRUCT(typeTuple);
1318         *typlen = typeStruct->typlen;
1319         *typbyval = typeStruct->typbyval;
1320         *typdelim = typeStruct->typdelim;
1321         *typelem = typeStruct->typelem;
1322         *typalign = typeStruct->typalign;
1323         if (input)
1324                 *proc = typeStruct->typinput;
1325         else
1326                 *proc = typeStruct->typoutput;
1327 }
1328
1329 static Datum
1330 _ArrayCast(char *value, bool byval, int len)
1331 {
1332         if (byval)
1333         {
1334                 switch (len)
1335                 {
1336                                 case 1:
1337                                 return (Datum) *value;
1338                         case 2:
1339                                 return (Datum) *(int16 *) value;
1340                         case 3:
1341                         case 4:
1342                                 return (Datum) *(int32 *) value;
1343                         default:
1344                                 elog(ERROR, "array_ref: byval and elt len > 4!");
1345                                 break;
1346                 }
1347         }
1348         else
1349                 return (Datum) value;
1350         return 0;
1351 }
1352
1353
1354 static int
1355 ArrayCastAndSet(char *src,
1356                                 bool typbyval,
1357                                 int typlen,
1358                                 char *dest)
1359 {
1360         int                     inc;
1361
1362         if (typlen > 0)
1363         {
1364                 if (typbyval)
1365                 {
1366                         switch (typlen)
1367                         {
1368                                 case 1:
1369                                         *dest = DatumGetChar(src);
1370                                         break;
1371                                 case 2:
1372                                         *(int16 *) dest = DatumGetInt16(src);
1373                                         break;
1374                                 case 4:
1375                                         *(int32 *) dest = (int32) src;
1376                                         break;
1377                         }
1378                 }
1379                 else
1380                         memmove(dest, src, typlen);
1381                 inc = typlen;
1382         }
1383         else
1384         {
1385                 memmove(dest, src, *(int32 *) src);
1386                 inc = (INTALIGN(*(int32 *) src));
1387         }
1388         return inc;
1389 }
1390
1391 #ifdef LOARRAY
1392 static char *
1393 _AdvanceBy1word(char *str, char **word)
1394 {
1395         char       *retstr,
1396                            *space;
1397
1398         *word = NULL;
1399         if (str == NULL)
1400                 return str;
1401         while (isspace(*str))
1402                 str++;
1403         *word = str;
1404         if ((space = (char *) strchr(str, ' ')) != (char *) NULL)
1405         {
1406                 retstr = space + 1;
1407                 *space = '\0';
1408         }
1409         else
1410                 retstr = NULL;
1411         return retstr;
1412 }
1413
1414 #endif
1415
1416 static int
1417 SanityCheckInput(int ndim, int n, int *dim, int *lb, int *indx)
1418 {
1419         int                     i;
1420
1421         /* Do Sanity check on input */
1422         if (n != ndim)
1423                 return 0;
1424         for (i = 0; i < ndim; i++)
1425                 if ((lb[i] > indx[i]) || (indx[i] >= (dim[i] + lb[i])))
1426                         return 0;
1427         return 1;
1428 }
1429
1430 static void
1431 _ArrayRange(int *st,
1432                         int *endp,
1433                         int bsize,
1434                         char *destPtr,
1435                         ArrayType *array,
1436                         int from)
1437 {
1438         int                     n,
1439                            *dim,
1440                            *lb,
1441                                 st_pos,
1442                                 prod[MAXDIM];
1443         int                     span[MAXDIM],
1444                                 dist[MAXDIM],
1445                                 indx[MAXDIM];
1446         int                     i,
1447                                 j,
1448                                 inc;
1449         char       *srcPtr;
1450
1451         n = ARR_NDIM(array);
1452         dim = ARR_DIMS(array);
1453         lb = ARR_LBOUND(array);
1454         srcPtr = ARR_DATA_PTR(array);
1455         for (i = 0; i < n; st[i] -= lb[i], endp[i] -= lb[i], i++);
1456         mda_get_prod(n, dim, prod);
1457         st_pos = tuple2linear(n, st, prod);
1458         srcPtr = array_seek(srcPtr, bsize, st_pos);
1459         mda_get_range(n, span, st, endp);
1460         mda_get_offset_values(n, dist, prod, span);
1461         for (i = 0; i < n; indx[i++] = 0);
1462         i = j = n - 1;
1463         inc = bsize;
1464         do
1465         {
1466                 srcPtr = array_seek(srcPtr, bsize, dist[j]);
1467                 if (from)
1468                         inc = array_read(destPtr, bsize, 1, srcPtr);
1469                 else
1470                         inc = array_read(srcPtr, bsize, 1, destPtr);
1471                 destPtr += inc;
1472                 srcPtr += inc;
1473         } while ((j = next_tuple(i + 1, indx, span)) != -1);
1474 }
1475
1476 static int
1477 _ArrayClipCount(int *stI, int *endpI, ArrayType *array)
1478 {
1479         int                     n,
1480                            *dim,
1481                            *lb,
1482                                 st_pos,
1483                                 prod[MAXDIM];
1484         int                     span[MAXDIM],
1485                                 dist[MAXDIM],
1486                                 indx[MAXDIM];
1487         int                     i,
1488                                 j,
1489                                 inc,
1490                                 st[MAXDIM],
1491                                 endp[MAXDIM];
1492         int                     count = 0;
1493         char       *ptr;
1494
1495         n = ARR_NDIM(array);
1496         dim = ARR_DIMS(array);
1497         lb = ARR_LBOUND(array);
1498         ptr = ARR_DATA_PTR(array);
1499         for (i = 0; i < n; st[i] = stI[i] - lb[i], endp[i] = endpI[i] - lb[i], i++);
1500         mda_get_prod(n, dim, prod);
1501         st_pos = tuple2linear(n, st, prod);
1502         ptr = array_seek(ptr, -1, st_pos);
1503         mda_get_range(n, span, st, endp);
1504         mda_get_offset_values(n, dist, prod, span);
1505         for (i = 0; i < n; indx[i++] = 0);
1506         i = j = n - 1;
1507         do
1508         {
1509                 ptr = array_seek(ptr, -1, dist[j]);
1510                 inc = INTALIGN(*(int32 *) ptr);
1511                 ptr += inc;
1512                 count += inc;
1513         } while ((j = next_tuple(i + 1, indx, span)) != -1);
1514         return count;
1515 }
1516
1517 static char *
1518 array_seek(char *ptr, int eltsize, int nitems)
1519 {
1520         int                     i;
1521
1522         if (eltsize > 0)
1523                 return ptr + eltsize * nitems;
1524         for (i = 0; i < nitems; i++)
1525                 ptr += INTALIGN(*(int32 *) ptr);
1526         return ptr;
1527 }
1528
1529 static int
1530 array_read(char *destptr, int eltsize, int nitems, char *srcptr)
1531 {
1532         int                     i,
1533                                 inc,
1534                                 tmp;
1535
1536         if (eltsize > 0)
1537         {
1538                 memmove(destptr, srcptr, eltsize * nitems);
1539                 return eltsize * nitems;
1540         }
1541         for (i = inc = 0; i < nitems; i++)
1542         {
1543                 tmp = (INTALIGN(*(int32 *) srcptr));
1544                 memmove(destptr, srcptr, tmp);
1545                 srcptr += tmp;
1546                 destptr += tmp;
1547                 inc += tmp;
1548         }
1549         return inc;
1550 }
1551
1552 static void
1553 _LOArrayRange(int *st,
1554                           int *endp,
1555                           int bsize,
1556                           int srcfd,
1557                           int destfd,
1558                           ArrayType *array,
1559                           int isSrcLO,
1560                           bool *isNull)
1561 {
1562         int                     n,
1563                            *dim,
1564                                 st_pos,
1565                                 prod[MAXDIM];
1566         int                     span[MAXDIM],
1567                                 dist[MAXDIM],
1568                                 indx[MAXDIM];
1569         int                     i,
1570                                 j,
1571                                 inc,
1572                                 tmp,
1573                            *lb,
1574                                 offset;
1575
1576         n = ARR_NDIM(array);
1577         dim = ARR_DIMS(array);
1578         lb = ARR_LBOUND(array);
1579         for (i = 0; i < n; st[i] -= lb[i], endp[i] -= lb[i], i++);
1580
1581         mda_get_prod(n, dim, prod);
1582         st_pos = tuple2linear(n, st, prod);
1583         offset = st_pos * bsize;
1584         if (lo_lseek(srcfd, offset, SEEK_SET) < 0)
1585                 return;
1586         mda_get_range(n, span, st, endp);
1587         mda_get_offset_values(n, dist, prod, span);
1588         for (i = 0; i < n; indx[i++] = 0);
1589         for (i = n - 1, inc = bsize; i >= 0; inc *= span[i--])
1590                 if (dist[i])
1591                         break;
1592         j = n - 1;
1593         do
1594         {
1595                 offset += (dist[j] * bsize);
1596                 if (lo_lseek(srcfd, offset, SEEK_SET) < 0)
1597                         return;
1598                 tmp = _LOtransfer((char **) &srcfd, inc, 1, (char **) &destfd, isSrcLO, 1);
1599                 if (tmp < inc)
1600                         return;
1601                 offset += inc;
1602         } while ((j = next_tuple(i + 1, indx, span)) != -1);
1603 }
1604
1605
1606 static void
1607 _ReadArray(int *st,
1608                    int *endp,
1609                    int bsize,
1610                    int srcfd,
1611                    int destfd,
1612                    ArrayType *array,
1613                    int isDestLO,
1614                    bool *isNull)
1615 {
1616         int                     n,
1617                            *dim,
1618                                 st_pos,
1619                                 prod[MAXDIM];
1620         int                     span[MAXDIM],
1621                                 dist[MAXDIM],
1622                                 indx[MAXDIM];
1623         int                     i,
1624                                 j,
1625                                 inc,
1626                                 tmp,
1627                            *lb,
1628                                 offset;
1629
1630         n = ARR_NDIM(array);
1631         dim = ARR_DIMS(array);
1632         lb = ARR_LBOUND(array);
1633         for (i = 0; i < n; st[i] -= lb[i], endp[i] -= lb[i], i++);
1634
1635         mda_get_prod(n, dim, prod);
1636         st_pos = tuple2linear(n, st, prod);
1637         offset = st_pos * bsize;
1638         if (lo_lseek(srcfd, offset, SEEK_SET) < 0)
1639                 return;
1640         mda_get_range(n, span, st, endp);
1641         mda_get_offset_values(n, dist, prod, span);
1642         for (i = 0; i < n; indx[i++] = 0);
1643         for (i = n - 1, inc = bsize; i >= 0; inc *= span[i--])
1644                 if (dist[i])
1645                         break;
1646         j = n - 1;
1647         do
1648         {
1649                 offset += (dist[j] * bsize);
1650                 if (lo_lseek(srcfd, offset, SEEK_SET) < 0)
1651                         return;
1652                 tmp = _LOtransfer((char **) &destfd, inc, 1, (char **) &srcfd, 1, isDestLO);
1653                 if (tmp < inc)
1654                         return;
1655                 offset += inc;
1656         } while ((j = next_tuple(i + 1, indx, span)) != -1);
1657 }
1658
1659
1660 int
1661 _LOtransfer(char **destfd,
1662                         int size,
1663                         int nitems,
1664                         char **srcfd,
1665                         int isSrcLO,
1666                         int isDestLO)
1667 {
1668 #define MAX_READ (512 * 1024)
1669 #define min(a, b) (a < b ? a : b)
1670         struct varlena *v = NULL;
1671         int                     tmp,
1672                                 inc,
1673                                 resid;
1674
1675         inc = nitems * size;
1676         if (isSrcLO && isDestLO && inc > 0)
1677                 for (tmp = 0, resid = inc;
1678                          resid > 0 && (inc = min(resid, MAX_READ)) > 0; resid -= inc)
1679                 {
1680 #ifdef LOARRAY
1681                         v = (struct varlena *) LOread((int) *srcfd, inc);
1682                         if (VARSIZE(v) - VARHDRSZ < inc)
1683                         {
1684                                 pfree(v);
1685                                 return -1;
1686                         }
1687                         tmp += LOwrite((int) *destfd, v);
1688 #endif
1689                         pfree(v);
1690
1691                 }
1692         else if (!isSrcLO && isDestLO)
1693         {
1694                 tmp = lo_write((int) *destfd, *srcfd, inc);
1695                 *srcfd = *srcfd + tmp;
1696         }
1697         else if (isSrcLO && !isDestLO)
1698         {
1699                 tmp = lo_read((int) *srcfd, *destfd, inc);
1700                 *destfd = *destfd + tmp;
1701         }
1702         else
1703         {
1704                 memmove(*destfd, *srcfd, inc);
1705                 tmp = inc;
1706                 *srcfd += inc;
1707                 *destfd += inc;
1708         }
1709         return tmp;
1710 #undef MAX_READ
1711 }
1712
1713 char *
1714 _array_newLO(int *fd, int flag)
1715 {
1716         char       *p;
1717         char            saveName[NAME_LEN];
1718
1719         p = (char *) palloc(NAME_LEN);
1720         sprintf(p, "/Arry.%d", newoid());
1721         strcpy(saveName, p);
1722 #ifdef LOARRAY
1723         if ((*fd = LOcreat(saveName, 0600, flag)) < 0)
1724                 elog(ERROR, "Large object create failed");
1725 #endif
1726         return p;
1727 }