Repair Large Object bugs demonstrated by Ian Grant's example. inv_write
authorTom Lane <tgl@sss.pgh.pa.us>
Thu, 15 Jun 2000 06:07:34 +0000 (06:07 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Thu, 15 Jun 2000 06:07:34 +0000 (06:07 +0000)
was inappropriately relying on rel->rd_nblocks to tell if the LO is
empty (apparently a hack to get around a long-dead index bug), causing
misbehavior on a written-but-never-vacuumed LO.  Also, inv_read failed
to cope gracefully with 'holes' (unwritten regions) in the object.

src/backend/storage/large_object/inv_api.c

index 506a12a0a9e89cf1a59acb2f26472a70a1152a52..07269dcada1fae58301907f6131824d12dc31a0b 100644 (file)
@@ -9,7 +9,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/storage/large_object/inv_api.c,v 1.69 2000/06/05 07:28:45 tgl Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/storage/large_object/inv_api.c,v 1.70 2000/06/15 06:07:34 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -187,6 +187,7 @@ inv_create(int flags)
        retval->idesc = RelationGetDescr(indr);
        retval->offset = retval->lowbyte = retval->highbyte = 0;
        ItemPointerSetInvalid(&(retval->htid));
+       retval->flags = 0;
 
        if (flags & INV_WRITE)
        {
@@ -198,7 +199,7 @@ inv_create(int flags)
                LockRelation(r, ShareLock);
                retval->flags = IFS_RDLOCK;
        }
-       retval->flags |= IFS_ATEOF;
+       retval->flags |= IFS_ATEOF;     /* since we know the object is empty */
 
        return retval;
 }
@@ -235,6 +236,7 @@ inv_open(Oid lobjId, int flags)
        retval->idesc = RelationGetDescr(indrel);
        retval->offset = retval->lowbyte = retval->highbyte = 0;
        ItemPointerSetInvalid(&(retval->htid));
+       retval->flags = 0;
 
        if (flags & INV_WRITE)
        {
@@ -373,14 +375,8 @@ inv_seek(LargeObjectDesc *obj_desc, int offset, int whence)
        if (whence == SEEK_CUR)
        {
                offset += obj_desc->offset;             /* calculate absolute position */
-               return inv_seek(obj_desc, offset, SEEK_SET);
        }
-
-       /*
-        * if you seek past the end (offset > 0) I have no clue what happens
-        * :-(                            B.L.   9/1/93
-        */
-       if (whence == SEEK_END)
+       else if (whence == SEEK_END)
        {
                /* need read lock for getsize */
                if (!(obj_desc->flags & IFS_RDLOCK))
@@ -391,8 +387,8 @@ inv_seek(LargeObjectDesc *obj_desc, int offset, int whence)
                offset += _inv_getsize(obj_desc->heap_r,
                                                           obj_desc->hdesc,
                                                           obj_desc->index_r);
-               return inv_seek(obj_desc, offset, SEEK_SET);
        }
+       /* now we can assume that the operation is SEEK_SET */
 
        /*
         * Whenever we do a seek, we turn off the EOF flag bit to force
@@ -416,9 +412,6 @@ inv_seek(LargeObjectDesc *obj_desc, int offset, int whence)
         * stores the offset of the last byte that appears on it, and we have
         * an index on this.
         */
-
-
-       /* right now, just assume that the operation is SEEK_SET */
        if (obj_desc->iscan != (IndexScanDesc) NULL)
        {
                d = Int32GetDatum(offset);
@@ -426,7 +419,6 @@ inv_seek(LargeObjectDesc *obj_desc, int offset, int whence)
        }
        else
        {
-
                ScanKeyEntryInitialize(&skey, 0x0, 1, F_INT4GE,
                                                           Int32GetDatum(offset));
 
@@ -489,9 +481,27 @@ inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
 
                /* copy the data from this block into the buffer */
                d = heap_getattr(&tuple, 2, obj_desc->hdesc, &isNull);
+               fsblock = (struct varlena *) DatumGetPointer(d);
                ReleaseBuffer(buffer);
 
-               fsblock = (struct varlena *) DatumGetPointer(d);
+               /*
+                * If block starts beyond current seek point, then we are looking
+                * at a "hole" (unwritten area) in the object.  Return zeroes for
+                * the "hole".
+                */
+               if (obj_desc->offset < obj_desc->lowbyte)
+               {
+                       int             nzeroes = obj_desc->lowbyte - obj_desc->offset;
+
+                       if (nzeroes > (nbytes - nread))
+                               nzeroes = (nbytes - nread);
+                       MemSet(buf, 0, nzeroes);
+                       buf += nzeroes;
+                       nread += nzeroes;
+                       obj_desc->offset += nzeroes;
+                       if (nread >= nbytes)
+                               break;
+               }
 
                off = obj_desc->offset - obj_desc->lowbyte;
                ncopy = obj_desc->highbyte - obj_desc->offset + 1;
@@ -537,14 +547,11 @@ inv_write(LargeObjectDesc *obj_desc, char *buf, int nbytes)
                Buffer          buffer;
 
                /*
-                * Fetch the current inversion file system block.  If the class
-                * storing the inversion file is empty, we don't want to do an
-                * index lookup, since index lookups choke on empty files (should
-                * be fixed someday).
+                * Fetch the current inversion file system block.  We can skip
+                * the work if we already know we are at EOF.
                 */
 
-               if ((obj_desc->flags & IFS_ATEOF)
-                       || obj_desc->heap_r->rd_nblocks == 0)
+               if (obj_desc->flags & IFS_ATEOF)
                        tuple.t_data = NULL;
                else
                        inv_fetchtup(obj_desc, &tuple, &buffer);
@@ -659,6 +666,7 @@ inv_fetchtup(LargeObjectDesc *obj_desc, HeapTuple tuple, Buffer *buffer)
                }
                else
                        index_rescan(obj_desc->iscan, false, &skey);
+
                do
                {
                        res = index_getnext(obj_desc->iscan, ForwardScanDirection);
@@ -1149,7 +1157,8 @@ inv_indextup(LargeObjectDesc *obj_desc, HeapTuple tuple)
                pfree(res);
 }
 
-/*
+#ifdef NOT_USED
+
 static void
 DumpPage(Page page, int blkno)
 {
@@ -1239,7 +1248,8 @@ ItemPointerFormExternal(ItemPointer pointer)
 
                return itemPointerString;
 }
-*/
+
+#endif
 
 static int
 _inv_getsize(Relation hreln, TupleDesc hdesc, Relation ireln)