OSDN Git Service

here it is as requested by Bruce.
authorBruce Momjian <bruce@momjian.us>
Sat, 21 Oct 2000 15:55:29 +0000 (15:55 +0000)
committerBruce Momjian <bruce@momjian.us>
Sat, 21 Oct 2000 15:55:29 +0000 (15:55 +0000)
I tested it restoring my database with > 100000 BLOBS, and dumping it out.
But unfortunatly I can not restore it back due to problems in pg_dump.

--
Sincerely Yours,
Denis Perchine

src/backend/catalog/Makefile
src/backend/catalog/indexing.c
src/backend/catalog/pg_largeobject.c [new file with mode: 0644]
src/backend/libpq/be-fsstubs.c
src/backend/storage/large_object/inv_api.c
src/bin/pg_dump/pg_dump.c
src/include/catalog/catname.h
src/include/catalog/indexing.h
src/include/catalog/pg_largeobject.h [new file with mode: 0644]
src/include/storage/large_object.h

index 022e41b..1ac8586 100644 (file)
@@ -2,7 +2,7 @@
 #
 # Makefile for catalog
 #
-# $Header: /cvsroot/pgsql/src/backend/catalog/Makefile,v 1.28 2000/10/20 21:03:42 petere Exp $
+# $Header: /cvsroot/pgsql/src/backend/catalog/Makefile,v 1.29 2000/10/21 15:55:21 momjian Exp $
 #
 #-------------------------------------------------------------------------
 
@@ -11,7 +11,8 @@ top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
 OBJS = catalog.o heap.o index.o indexing.o aclchk.o \
-       pg_aggregate.o pg_operator.o pg_proc.o pg_type.o
+       pg_aggregate.o pg_largeobject.o pg_operator.o pg_proc.o \
+       pg_type.o
 
 BKIFILES = global.bki template1.bki global.description template1.description
 
@@ -29,7 +30,7 @@ TEMPLATE1_BKI_SRCS := $(addprefix $(top_srcdir)/src/include/catalog/,\
        pg_proc.h pg_type.h pg_attribute.h pg_class.h \
        pg_inherits.h pg_index.h pg_statistic.h \
        pg_operator.h pg_opclass.h pg_am.h pg_amop.h pg_amproc.h \
-       pg_language.h \
+       pg_language.h pg_largeobject.h \
        pg_aggregate.h pg_ipl.h pg_inheritproc.h \
        pg_rewrite.h pg_listener.h pg_description.h indexing.h \
     )
index 5647c83..faa3dc6 100644 (file)
@@ -9,7 +9,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/catalog/indexing.c,v 1.69 2000/10/08 03:53:13 momjian Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/catalog/indexing.c,v 1.70 2000/10/21 15:55:21 momjian Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -51,6 +51,8 @@ char     *Name_pg_inherits_indices[Num_pg_inherits_indices] =
 {InheritsRelidSeqnoIndex};
 char      *Name_pg_language_indices[Num_pg_language_indices] =
 {LanguageOidIndex, LanguageNameIndex};
+char      *Name_pg_largeobject_indices[Num_pg_largeobject_indices] =
+{LargeobjectLOIdIndex, LargeobjectLOIdPNIndex};
 char      *Name_pg_listener_indices[Num_pg_listener_indices] =
 {ListenerPidRelnameIndex};
 char      *Name_pg_opclass_indices[Num_pg_opclass_indices] =
diff --git a/src/backend/catalog/pg_largeobject.c b/src/backend/catalog/pg_largeobject.c
new file mode 100644 (file)
index 0000000..ace63d3
--- /dev/null
@@ -0,0 +1,135 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_largeobject.c
+ *       routines to support manipulation of the pg_largeobject relation
+ *
+ * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *       $Header: /cvsroot/pgsql/src/backend/catalog/pg_largeobject.c,v 1.3 2000/10/21 15:55:21 momjian Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/genam.h"
+#include "access/heapam.h"
+#include "catalog/catname.h"
+#include "catalog/indexing.h"
+#include "catalog/pg_largeobject.h"
+#include "miscadmin.h"
+#include "utils/fmgroids.h"
+
+bytea *_byteain(const char *data, int32 size);
+
+bytea *_byteain(const char *data, int32 size) {
+       bytea   *result;
+
+       result = (bytea *)palloc(size + VARHDRSZ);
+       result->vl_len = size + VARHDRSZ;
+       if (size > 0)
+               memcpy(result->vl_dat, data, size);
+       
+       return result;
+}
+
+Oid LargeobjectCreate(Oid loid) {
+       Oid             retval;
+       Relation                pg_largeobject;
+       HeapTuple       ntup = (HeapTuple) palloc(sizeof(HeapTupleData));
+       Relation                idescs[Num_pg_largeobject_indices];
+       Datum           values[Natts_pg_largeobject];
+       char            nulls[Natts_pg_largeobject];
+       int             i;
+
+       for (i=0; i<Natts_pg_largeobject; i++) {
+               nulls[i] = ' ';
+               values[i] = (Datum)NULL;
+       }
+
+       i = 0;
+       values[i++] = ObjectIdGetDatum(loid);
+       values[i++] = Int32GetDatum(0);
+       values[i++] = (Datum) _byteain(NULL, 0);
+       
+       pg_largeobject = heap_openr(LargeobjectRelationName, RowExclusiveLock);
+       ntup = heap_formtuple(pg_largeobject->rd_att, values, nulls);
+       retval = heap_insert(pg_largeobject, ntup);
+
+       if (!IsIgnoringSystemIndexes()) {
+               CatalogOpenIndices(Num_pg_largeobject_indices, Name_pg_largeobject_indices, idescs);
+               CatalogIndexInsert(idescs, Num_pg_largeobject_indices, pg_largeobject, ntup);
+               CatalogCloseIndices(Num_pg_largeobject_indices, idescs);
+       }
+       
+       heap_close(pg_largeobject, RowExclusiveLock);
+       heap_freetuple(ntup);
+       
+       CommandCounterIncrement();
+
+       return retval;
+}
+
+void LargeobjectDrop(Oid loid) {
+       Relation        pg_largeobject;
+       Relation        pg_lo_id;
+       ScanKeyData     skey;
+       IndexScanDesc   sd = (IndexScanDesc) NULL;
+       RetrieveIndexResult     indexRes;
+       int     found = 0;
+
+       ScanKeyEntryInitialize(&skey,
+                                           (bits16) 0x0,
+                                           (AttrNumber) 1,
+                                           (RegProcedure) F_OIDEQ,
+                                           ObjectIdGetDatum(loid));
+
+       pg_largeobject = heap_openr(LargeobjectRelationName, RowShareLock);
+       pg_lo_id = index_openr(LargeobjectLOIdIndex);
+
+       sd = index_beginscan(pg_lo_id, false, 1, &skey);
+
+       while((indexRes = index_getnext(sd, ForwardScanDirection))) {
+               found++;
+               heap_delete(pg_largeobject, &indexRes->heap_iptr, NULL);
+               pfree(indexRes);
+       }
+
+       index_endscan(sd);
+
+       index_close(pg_lo_id);
+       heap_close(pg_largeobject, RowShareLock);
+       if (found == 0)
+               elog(ERROR, "LargeobjectDrop: large object %d not found", loid);
+}
+
+int LargeobjectFind(Oid loid) {
+       int     retval = 0;
+       Relation        pg_lo_id;
+       ScanKeyData     skey;
+       IndexScanDesc   sd = (IndexScanDesc) NULL;
+       RetrieveIndexResult     indexRes;
+
+       ScanKeyEntryInitialize(&skey,
+                                           (bits16) 0x0,
+                                           (AttrNumber) 1,
+                                           (RegProcedure) F_OIDEQ,
+                                           ObjectIdGetDatum(loid));
+
+       pg_lo_id = index_openr(LargeobjectLOIdIndex);
+
+       sd = index_beginscan(pg_lo_id, false, 1, &skey);
+
+       if ((indexRes = index_getnext(sd, ForwardScanDirection))) {
+               retval = 1;
+               pfree(indexRes);
+       }
+
+       index_endscan(sd);
+
+       index_close(pg_lo_id);
+       return retval;
+}
+
index 321eebb..30b5001 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/libpq/be-fsstubs.c,v 1.52 2000/10/08 03:53:13 momjian Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/libpq/be-fsstubs.c,v 1.53 2000/10/21 15:55:22 momjian Exp $
  *
  * NOTES
  *       This should be moved to a more appropriate place.  It is here
@@ -267,7 +267,7 @@ lo_creat(PG_FUNCTION_ARGS)
                PG_RETURN_OID(InvalidOid);
        }
 
-       lobjId = RelationGetRelid(lobjDesc->heap_r);
+       lobjId = lobjDesc->id;
 
        inv_close(lobjDesc);
 
@@ -512,8 +512,10 @@ lo_commit(bool isCommit)
        {
                if (cookies[i] != NULL)
                {
+/*
                        if (isCommit)
                                inv_cleanindex(cookies[i]);
+*/
                        cookies[i] = NULL;
                }
        }
index 11c5d7f..a245c81 100644 (file)
@@ -9,7 +9,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/storage/large_object/inv_api.c,v 1.76 2000/10/08 03:53:14 momjian Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/storage/large_object/inv_api.c,v 1.77 2000/10/21 15:55:24 momjian Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include "access/genam.h"
 #include "access/heapam.h"
 #include "access/nbtree.h"
+#include "access/htup.h"
 #include "catalog/catalog.h"
+#include "catalog/catname.h"
 #include "catalog/heap.h"
 #include "catalog/index.h"
+#include "catalog/indexing.h"
 #include "catalog/pg_opclass.h"
+#include "catalog/pg_largeobject.h"
 #include "catalog/pg_type.h"
 #include "libpq/libpq-fs.h"
 #include "miscadmin.h"
 #include "storage/large_object.h"
 #include "storage/smgr.h"
 #include "utils/fmgroids.h"
-#include "utils/relcache.h"
+#include "utils/builtins.h"
 
-/*
- *     Warning, Will Robinson...  In order to pack data into an inversion
- *     file as densely as possible, we violate the class abstraction here.
- *     When we're appending a new tuple to the end of the table, we check
- *     the last page to see how much data we can put on it.  If it's more
- *     than IMINBLK, we write enough to fill the page.  This limits external
- *     fragmentation.  In no case can we write more than IMAXBLK, since
- *     the 8K postgres page size less overhead leaves only this much space
- *     for data.
- */
+#include <errno.h>
 
-/*
- *             In order to prevent buffer leak on transaction commit, large object
- *             scan index handling has been modified. Indexes are persistant inside
- *             a transaction but may be closed between two calls to this API (when
- *             transaction is committed while object is opened, or when no
- *             transaction is active). Scan indexes are thus now reinitialized using
- *             the object current offset. [PA]
- *
- *             Some cleanup has been also done for non freed memory.
- *
- *             For subsequent notes, [PA] is Pascal André <andre@via.ecp.fr>
- */
+#define        IBLKSIZE                (MaxTupleSize - MinHeapTupleBitmapSize - sizeof(int32) * 3)
 
-#define IFREESPC(p)            (PageGetFreeSpace(p) - \
-                                MAXALIGN(offsetof(HeapTupleHeaderData,t_bits)) - \
-                                MAXALIGN(sizeof(struct varlena) + sizeof(int32)) - \
-                                sizeof(double))
-#define IMAXBLK                        8092
-#define IMINBLK                        512
-
-/* non-export function prototypes */
-static HeapTuple inv_newtuple(LargeObjectDesc *obj_desc, Buffer buffer,
-                        Page page, char *dbuf, int nwrite);
-static void inv_fetchtup(LargeObjectDesc *obj_desc, HeapTuple tuple, Buffer *buffer);
-static int     inv_wrnew(LargeObjectDesc *obj_desc, char *buf, int nbytes);
-static int inv_wrold(LargeObjectDesc *obj_desc, char *dbuf, int nbytes,
-                 HeapTuple tuple, Buffer buffer);
-static void inv_indextup(LargeObjectDesc *obj_desc, HeapTuple tuple);
-static int     _inv_getsize(Relation hreln, TupleDesc hdesc, Relation ireln);
+/* Defined in backend/storage/catalog/large_object.c */
+bytea *_byteain(const char *data, int32 size);
+
+static int32 getbytealen(bytea *data) {
+       if (VARSIZE(data) < VARHDRSZ)
+               elog(ERROR, "getbytealen: VARSIZE(data) < VARHDRSZ. This is internal error.");
+  return (VARSIZE(data) - VARHDRSZ);
+}
 
 /*
  *     inv_create -- create a new large object.
@@ -84,19 +60,13 @@ static int  _inv_getsize(Relation hreln, TupleDesc hdesc, Relation ireln);
  *             Returns:
  *               large object descriptor, appropriately filled in.
  */
+
 LargeObjectDesc *
 inv_create(int flags)
 {
+       int             file_oid;
        LargeObjectDesc *retval;
-       Oid                     file_oid;
-       Relation        r;
-       Relation        indr;
-       TupleDesc       tupdesc;
-       IndexInfo  *indexInfo;
-       Oid                     classObjectId[1];
-       char            objname[NAMEDATALEN];
-       char            indname[NAMEDATALEN];
-
+       
        /*
         * add one here since the pg_class tuple created will have the next
         * oid and we want to have the relation name to correspond to the
@@ -104,104 +74,25 @@ inv_create(int flags)
         */
        file_oid = newoid() + 1;
 
-       /* come up with some table names */
-       sprintf(objname, "xinv%u", file_oid);
-       sprintf(indname, "xinx%u", file_oid);
-
-       if (RelnameFindRelid(objname) != InvalidOid)
-               elog(ERROR,
-                 "internal error: %s already exists -- cannot create large obj",
-                        objname);
-       if (RelnameFindRelid(indname) != InvalidOid)
-               elog(ERROR,
-                 "internal error: %s already exists -- cannot create large obj",
-                        indname);
-
-       /* this is pretty painful...  want a tuple descriptor */
-       tupdesc = CreateTemplateTupleDesc(2);
-       TupleDescInitEntry(tupdesc, (AttrNumber) 1,
-                                          "olastbye",
-                                          INT4OID,
-                                          -1, 0, false);
-       TupleDescInitEntry(tupdesc, (AttrNumber) 2,
-                                          "odata",
-                                          BYTEAOID,
-                                          -1, 0, false);
-
-       /*
-        * First create the table to hold the inversion large object.  It will
-        * be located on whatever storage manager the user requested.
-        */
-
-       heap_create_with_catalog(objname, tupdesc, RELKIND_LOBJECT,
-                                                        false, false);
-
-       /* make the relation visible in this transaction */
-       CommandCounterIncrement();
-
-       /*--------------------
-        * We hold AccessShareLock on any large object we have open
-        * by inv_create or inv_open; it is released by inv_close.
-        * Note this will not conflict with ExclusiveLock or ShareLock
-        * that we acquire when actually reading/writing; it just prevents
-        * deletion of the large object while we have it open.
-        *--------------------
-        */
-       r = heap_openr(objname, AccessShareLock);
-
-       /*
-        * Now create a btree index on the relation's olastbyte attribute to
-        * make seeks go faster.
-        */
-       indexInfo = makeNode(IndexInfo);
-       indexInfo->ii_NumIndexAttrs = 1;
-       indexInfo->ii_NumKeyAttrs = 1;
-       indexInfo->ii_KeyAttrNumbers[0] = 1;
-       indexInfo->ii_Predicate = NULL;
-       indexInfo->ii_FuncOid = InvalidOid;
-       indexInfo->ii_Unique = false;
-
-       classObjectId[0] = INT4_OPS_OID;
-
-       index_create(objname, indname, indexInfo,
-                                BTREE_AM_OID, classObjectId,
-                                false, false, false);
-
-       /* make the index visible in this transaction */
-       CommandCounterIncrement();
-
-       indr = index_openr(indname);
-
-       if (!RelationIsValid(indr))
-       {
-               elog(ERROR, "cannot create index for large obj on %s under inversion",
-                        DatumGetCString(DirectFunctionCall1(smgrout,
-                                                        Int16GetDatum(DEFAULT_SMGR))));
-       }
+       if (LargeobjectFind(file_oid) == 1)
+               elog(ERROR, "inv_create: large object %d already exists. This is internal error.", file_oid);
 
        retval = (LargeObjectDesc *) palloc(sizeof(LargeObjectDesc));
 
-       retval->heap_r = r;
-       retval->index_r = indr;
-       retval->iscan = (IndexScanDesc) NULL;
-       retval->hdesc = RelationGetDescr(r);
-       retval->idesc = RelationGetDescr(indr);
-       retval->offset = retval->lowbyte = retval->highbyte = 0;
-       ItemPointerSetInvalid(&(retval->htid));
-       retval->flags = 0;
-
-       if (flags & INV_WRITE)
-       {
-               LockRelation(r, ExclusiveLock);
+       if (flags & INV_WRITE) {
                retval->flags = IFS_WRLOCK | IFS_RDLOCK;
-       }
-       else if (flags & INV_READ)
-       {
-               LockRelation(r, ShareLock);
+               retval->heap_r = heap_openr(LargeobjectRelationName, RowExclusiveLock);
+       } else if (flags & INV_READ) {
                retval->flags = IFS_RDLOCK;
-       }
-       retval->flags |= IFS_ATEOF;     /* since we know the object is empty */
-
+               retval->heap_r = heap_openr(LargeobjectRelationName, AccessShareLock);
+       } else
+               elog(ERROR, "inv_create: invalid flags: %d", flags);
+
+       retval->flags |= IFS_ATEOF;
+       retval->index_r = index_openr(LargeobjectLOIdPNIndex);
+       retval->offset = 0;
+       retval->id = file_oid;
+       (void)LargeobjectCreate(file_oid);
        return retval;
 }
 
@@ -209,46 +100,24 @@ LargeObjectDesc *
 inv_open(Oid lobjId, int flags)
 {
        LargeObjectDesc *retval;
-       Relation        r;
-       char       *indname;
-       Relation        indrel;
-
-       r = heap_open(lobjId, AccessShareLock);
 
-       indname = pstrdup(RelationGetRelationName(r));
-
-       /*
-        * hack hack hack...  we know that the fourth character of the
-        * relation name is a 'v', and that the fourth character of the index
-        * name is an 'x', and that they're otherwise identical.
-        */
-       indname[3] = 'x';
-       indrel = index_openr(indname);
+       if (LargeobjectFind(lobjId) == 0)
+               elog(ERROR, "inv_open: large object %d not found", lobjId);
+       
+       retval = (LargeObjectDesc *)palloc(sizeof(LargeObjectDesc));
 
-       if (!RelationIsValid(indrel))
-               return (LargeObjectDesc *) NULL;
-
-       retval = (LargeObjectDesc *) palloc(sizeof(LargeObjectDesc));
-
-       retval->heap_r = r;
-       retval->index_r = indrel;
-       retval->iscan = (IndexScanDesc) NULL;
-       retval->hdesc = RelationGetDescr(r);
-       retval->idesc = RelationGetDescr(indrel);
-       retval->offset = retval->lowbyte = retval->highbyte = 0;
-       ItemPointerSetInvalid(&(retval->htid));
-       retval->flags = 0;
-
-       if (flags & INV_WRITE)
-       {
-               LockRelation(r, ExclusiveLock);
+       if (flags & INV_WRITE) {
                retval->flags = IFS_WRLOCK | IFS_RDLOCK;
-       }
-       else if (flags & INV_READ)
-       {
-               LockRelation(r, ShareLock);
+               retval->heap_r = heap_openr(LargeobjectRelationName, RowExclusiveLock);
+       } else if (flags & INV_READ) {
                retval->flags = IFS_RDLOCK;
-       }
+               retval->heap_r = heap_openr(LargeobjectRelationName, AccessShareLock);
+       } else
+               elog(ERROR, "inv_open: invalid flags: %d", flags);
+
+       retval->index_r = index_openr(LargeobjectLOIdPNIndex);
+       retval->offset = 0;
+       retval->id = lobjId;
 
        return retval;
 }
@@ -261,15 +130,11 @@ inv_close(LargeObjectDesc *obj_desc)
 {
        Assert(PointerIsValid(obj_desc));
 
-       if (obj_desc->iscan != (IndexScanDesc) NULL)
-       {
-               index_endscan(obj_desc->iscan);
-               obj_desc->iscan = NULL;
-       }
-
+       if (obj_desc->flags & IFS_WRLOCK)
+               heap_close(obj_desc->heap_r, RowExclusiveLock);
+       else if (obj_desc->flags & IFS_RDLOCK)
+               heap_close(obj_desc->heap_r, AccessShareLock);
        index_close(obj_desc->index_r);
-       heap_close(obj_desc->heap_r, AccessShareLock);
-
        pfree(obj_desc);
 }
 
@@ -281,24 +146,7 @@ inv_close(LargeObjectDesc *obj_desc)
 int
 inv_drop(Oid lobjId)
 {
-       Relation        r;
-
-       r = RelationIdGetRelation(lobjId);
-       if (!RelationIsValid(r))
-               return -1;
-
-       if (r->rd_rel->relkind != RELKIND_LOBJECT)
-       {
-               /* drop relcache refcount from RelationIdGetRelation */
-               RelationDecrementReferenceCount(r);
-               return -1;
-       }
-
-       /*
-        * Since heap_drop_with_catalog will destroy the relcache entry,
-        * there's no need to drop the refcount in this path.
-        */
-       heap_drop_with_catalog(RelationGetRelationName(r), false);
+       LargeobjectDrop(lobjId);
        return 1;
 }
 
@@ -364,71 +212,75 @@ inv_stat(LargeObjectDesc *obj_desc, struct pgstat * stbuf)
 
 #endif
 
-int
-inv_seek(LargeObjectDesc *obj_desc, int offset, int whence)
-{
-       int                     oldOffset;
-       Datum           d;
-       ScanKeyData skey;
+static uint32 inv_getsize(LargeObjectDesc *obj_desc) {
+       uint32                  found = 0;
+       uint32                  lastbyte = 0;
+       ScanKeyData             skey;
+       IndexScanDesc           sd = (IndexScanDesc) NULL;
+       RetrieveIndexResult     indexRes;
+       HeapTupleData           tuple;
+       Buffer                  buffer;
+       Form_pg_largeobject     data;
 
        Assert(PointerIsValid(obj_desc));
 
-       if (whence == SEEK_CUR)
-       {
-               offset += obj_desc->offset;             /* calculate absolute position */
-       }
-       else if (whence == SEEK_END)
-       {
-               /* need read lock for getsize */
-               if (!(obj_desc->flags & IFS_RDLOCK))
-               {
-                       LockRelation(obj_desc->heap_r, ShareLock);
-                       obj_desc->flags |= IFS_RDLOCK;
-               }
-               offset += _inv_getsize(obj_desc->heap_r,
-                                                          obj_desc->hdesc,
-                                                          obj_desc->index_r);
+       ScanKeyEntryInitialize(&skey,
+                                           (bits16) 0x0,
+                                           (AttrNumber) 1,
+                                           (RegProcedure) F_OIDEQ,
+                                           ObjectIdGetDatum(obj_desc->id));
+
+       sd = index_beginscan(obj_desc->index_r, true, 1, &skey);
+       tuple.t_datamcxt = CurrentMemoryContext;
+       tuple.t_data = NULL;
+       while ((indexRes = index_getnext(sd, ForwardScanDirection))) {
+               tuple.t_self = indexRes->heap_iptr;
+               heap_fetch(obj_desc->heap_r, SnapshotNow, &tuple, &buffer);
+               pfree(indexRes);
+               if (tuple.t_data == NULL)
+                       continue;
+               found++;
+               data = (Form_pg_largeobject) GETSTRUCT(&tuple);
+               lastbyte = data->pageno * IBLKSIZE + getbytealen(&(data->data));
+               ReleaseBuffer(buffer);
+               break;
        }
-       /* now we can assume that the operation is SEEK_SET */
-
-       /*
-        * Whenever we do a seek, we turn off the EOF flag bit to force
-        * ourselves to check for real on the next read.
-        */
-
-       obj_desc->flags &= ~IFS_ATEOF;
-       oldOffset = obj_desc->offset;
-       obj_desc->offset = offset;
+       
+       index_endscan(sd);
 
-       /* try to avoid doing any work, if we can manage it */
-       if (offset >= obj_desc->lowbyte
-               && offset <= obj_desc->highbyte
-               && oldOffset <= obj_desc->highbyte
-               && obj_desc->iscan != (IndexScanDesc) NULL)
-               return offset;
+       if (found == 0)
+               elog(ERROR, "inv_getsize: large object %d not found", obj_desc->id);
+       return lastbyte;
+}
 
-       /*
-        * To do a seek on an inversion file, we start an index scan that will
-        * bring us to the right place.  Each tuple in an inversion file
-        * stores the offset of the last byte that appears on it, and we have
-        * an index on this.
-        */
-       if (obj_desc->iscan != (IndexScanDesc) NULL)
-       {
-               d = Int32GetDatum(offset);
-               btmovescan(obj_desc->iscan, d);
-       }
-       else
-       {
-               ScanKeyEntryInitialize(&skey, 0x0, 1, F_INT4GE,
-                                                          Int32GetDatum(offset));
+int
+inv_seek(LargeObjectDesc *obj_desc, int offset, int whence)
+{
+       Assert(PointerIsValid(obj_desc));
 
-               obj_desc->iscan = index_beginscan(obj_desc->index_r,
-                                                                                 (bool) 0, (uint16) 1,
-                                                                                 &skey);
+       switch (whence) {
+               case SEEK_SET:
+                       if (offset < 0)
+                               elog(ERROR, "inv_seek: invalid offset: %d", offset);
+                       obj_desc->offset = offset;
+                       break;
+               case SEEK_CUR:
+                       if ((obj_desc->offset + offset) < 0)
+                               elog(ERROR, "inv_seek: invalid offset: %d", offset);
+                       obj_desc->offset += offset;
+                       break;
+               case SEEK_END:
+                       {
+                               int4 size = inv_getsize(obj_desc);
+                               if (offset > size)
+                                       elog(ERROR, "inv_seek: invalid offset");
+                               obj_desc->offset = size - offset;
+                       }
+                       break;
+               default:
+                       elog(ERROR, "inv_seek: invalid whence: %d", whence);
        }
-
-       return offset;
+       return obj_desc->offset;
 }
 
 int
@@ -442,862 +294,259 @@ inv_tell(LargeObjectDesc *obj_desc)
 int
 inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
 {
-       HeapTupleData tuple;
-       int                     nread;
-       int                     off;
-       int                     ncopy;
-       Datum           d;
-       struct varlena *fsblock;
-       bool            isNull;
+       uint32                  nread = 0;
+       uint32                  n;
+       uint32                  off;
+       uint32                  len;
+       uint32                  found = 0;
+       uint32                  pageno = obj_desc->offset / IBLKSIZE; 
+       ScanKeyData             skey[2];
+       IndexScanDesc           sd = (IndexScanDesc) NULL;
+       RetrieveIndexResult     indexRes;
+       HeapTupleData           tuple;
+       Buffer                  buffer;
+       Form_pg_largeobject     data;
 
        Assert(PointerIsValid(obj_desc));
        Assert(buf != NULL);
 
-       /* if we're already at EOF, we don't need to do any work here */
-       if (obj_desc->flags & IFS_ATEOF)
-               return 0;
-
-       /* make sure we obey two-phase locking */
-       if (!(obj_desc->flags & IFS_RDLOCK))
-       {
-               LockRelation(obj_desc->heap_r, ShareLock);
-               obj_desc->flags |= IFS_RDLOCK;
-       }
-
-       nread = 0;
-
-       /* fetch a block at a time */
-       while (nread < nbytes)
-       {
-               Buffer          buffer;
-
-               /* fetch an inversion file system block */
-               inv_fetchtup(obj_desc, &tuple, &buffer);
+       ScanKeyEntryInitialize(&skey[0],
+                                           (bits16) 0x0,
+                                           (AttrNumber) 1,
+                                           (RegProcedure) F_OIDEQ,
+                                           ObjectIdGetDatum(obj_desc->id));
+
+       ScanKeyEntryInitialize(&skey[1],
+                                           (bits16) 0x0,
+                                           (AttrNumber) 2,
+                                           (RegProcedure) F_INT4GE,
+                                           Int32GetDatum(pageno));
+
+       sd = index_beginscan(obj_desc->index_r, false, 2, skey);
+       tuple.t_datamcxt = CurrentMemoryContext;
+       tuple.t_data = NULL;
+       while ((indexRes = index_getnext(sd, ForwardScanDirection))) {
+               tuple.t_self = indexRes->heap_iptr;
+               heap_fetch(obj_desc->heap_r, SnapshotNow, &tuple, &buffer);
+               pfree(indexRes);
 
                if (tuple.t_data == NULL)
-               {
-                       obj_desc->flags |= IFS_ATEOF;
-                       break;
+                       continue;
+               
+               found++;
+               data = (Form_pg_largeobject) GETSTRUCT(&tuple);
+               if (data->pageno != pageno) {
+                       ReleaseBuffer(buffer);
+                       index_endscan(sd);
+                       return 0;
                }
 
-               /* copy the data from this block into the buffer */
-               d = heap_getattr(&tuple, 2, obj_desc->hdesc, &isNull);
-               fsblock = (struct varlena *) DatumGetPointer(d);
-               ReleaseBuffer(buffer);
-
-               /*
-                * If block starts beyond current seek point, then we are looking
-                * at a "hole" (unwritten area) in the object.  Return zeroes for
-                * the "hole".
-                */
-               if (obj_desc->offset < obj_desc->lowbyte)
-               {
-                       int             nzeroes = obj_desc->lowbyte - obj_desc->offset;
-
-                       if (nzeroes > (nbytes - nread))
-                               nzeroes = (nbytes - nread);
-                       MemSet(buf, 0, nzeroes);
-                       buf += nzeroes;
-                       nread += nzeroes;
-                       obj_desc->offset += nzeroes;
-                       if (nread >= nbytes)
-                               break;
+               len = getbytealen(&(data->data));
+               off = obj_desc->offset % IBLKSIZE;
+               if (off == len) {
+                       ReleaseBuffer(buffer);
+                       break;
                }
+               if (off > len) {
+                       ReleaseBuffer(buffer);
+                       index_endscan(sd);
+                       return 0;
+               }
+               n = len - off;
 
-               off = obj_desc->offset - obj_desc->lowbyte;
-               ncopy = obj_desc->highbyte - obj_desc->offset + 1;
-               if (ncopy > (nbytes - nread))
-                       ncopy = (nbytes - nread);
-               memmove(buf, &(fsblock->vl_dat[off]), ncopy);
+               n = (n < (nbytes - nread)) ? n : (nbytes - nread);
+               memcpy(buf + nread, VARDATA(&(data->data)) + off, n);
+               nread += n;
+               obj_desc->offset += n;
 
-               /* move pointers past the amount we just read */
-               buf += ncopy;
-               nread += ncopy;
-               obj_desc->offset += ncopy;
+               ReleaseBuffer(buffer);
+               pageno++;
+               if (nread == nbytes)
+                       break;
        }
 
+       index_endscan(sd);
+
+       if (found == 0)
+               return 0;
+
        return nread;
 }
 
-int
-inv_write(LargeObjectDesc *obj_desc, char *buf, int nbytes)
-{
-       HeapTupleData tuple;
-       int                     nwritten;
-       int                     tuplen;
+static int inv_write_existing(LargeObjectDesc *obj_desc, char *buf, int nbytes, int *found) {
+       uint32                  n = 0;
+       uint32                  off;
+       uint32                  len;
+       int                     i;
+       HeapTupleData           tuple;
+       HeapTuple               newtup;
+       Buffer                  buffer;
+       Form_pg_largeobject     data;
+       ScanKeyData             skey[2];
+       IndexScanDesc           sd = (IndexScanDesc) NULL;
+       RetrieveIndexResult     indexRes;
+       Relation                idescs[Num_pg_largeobject_indices];
+       Datum                   values[Natts_pg_largeobject];
+       char                    nulls[Natts_pg_largeobject];
+       char                    replace[Natts_pg_largeobject];
 
        Assert(PointerIsValid(obj_desc));
        Assert(buf != NULL);
 
-       /*
-        * Make sure we obey two-phase locking.  A write lock entitles you to
-        * read the relation, as well.
-        */
-
-       if (!(obj_desc->flags & IFS_WRLOCK))
-       {
-               LockRelation(obj_desc->heap_r, ExclusiveLock);
-               obj_desc->flags |= (IFS_WRLOCK | IFS_RDLOCK);
-       }
-
-       nwritten = 0;
-
-       /* write a block at a time */
-       while (nwritten < nbytes)
-       {
-               Buffer          buffer;
-
-               /*
-                * Fetch the current inversion file system block.  We can skip
-                * the work if we already know we are at EOF.
-                */
-
-               if (obj_desc->flags & IFS_ATEOF)
-                       tuple.t_data = NULL;
-               else
-                       inv_fetchtup(obj_desc, &tuple, &buffer);
-
-               /* either append or replace a block, as required */
-               if (tuple.t_data == NULL)
-                       tuplen = inv_wrnew(obj_desc, buf, nbytes - nwritten);
-               else
-               {
-                       if (obj_desc->offset > obj_desc->highbyte)
-                       {
-                               tuplen = inv_wrnew(obj_desc, buf, nbytes - nwritten);
-                               ReleaseBuffer(buffer);
-                       }
-                       else
-                               tuplen = inv_wrold(obj_desc, buf, nbytes - nwritten, &tuple, buffer);
+       ScanKeyEntryInitialize(&skey[0],
+                                           (bits16) 0,
+                                           (AttrNumber) 1,
+                                           (RegProcedure) F_OIDEQ,
+                                           ObjectIdGetDatum(obj_desc->id));
 
-                       /*
-                        * inv_wrold() has already issued WriteBuffer() which has
-                        * decremented local reference counter (LocalRefCount). So we
-                        * should not call ReleaseBuffer() here. -- Tatsuo 99/2/4
-                        */
-               }
+       ScanKeyEntryInitialize(&skey[1],
+                                           (bits16) 0x0,
+                                           (AttrNumber) 2,
+                                           (RegProcedure) F_INT4EQ,
+                                           Int32GetDatum(obj_desc->offset / IBLKSIZE));
 
-               /* move pointers past the amount we just wrote */
-               buf += tuplen;
-               nwritten += tuplen;
-               obj_desc->offset += tuplen;
+       CommandCounterIncrement();
+       sd = index_beginscan(obj_desc->index_r, false, 2, skey);
+       tuple.t_datamcxt = CurrentMemoryContext;
+       tuple.t_data = NULL;
+       while ((indexRes = index_getnext(sd, ForwardScanDirection))) {
+               tuple.t_self = indexRes->heap_iptr;
+               heap_fetch(obj_desc->heap_r, SnapshotNow, &tuple, &buffer);
+               pfree(indexRes);
+               if (tuple.t_data != NULL)
+                       break;
        }
 
-       /* that's it */
-       return nwritten;
-}
-
-/*
- * inv_cleanindex
- *              Clean opened indexes for large objects, and clears current result.
- *              This is necessary on transaction commit in order to prevent buffer
- *              leak.
- *              This function must be called for each opened large object.
- *              [ PA, 7/17/98 ]
- */
-void
-inv_cleanindex(LargeObjectDesc *obj_desc)
-{
-       Assert(PointerIsValid(obj_desc));
-
-       if (obj_desc->iscan == (IndexScanDesc) NULL)
-               return;
-
-       index_endscan(obj_desc->iscan);
-       obj_desc->iscan = (IndexScanDesc) NULL;
-
-       ItemPointerSetInvalid(&(obj_desc->htid));
-}
-
-/*
- *     inv_fetchtup -- Fetch an inversion file system block.
- *
- *             This routine finds the file system block containing the offset
- *             recorded in the obj_desc structure.  Later, we need to think about
- *             the effects of non-functional updates (can you rewrite the same
- *             block twice in a single transaction?), but for now, we won't bother.
- *
- *             Parameters:
- *                             obj_desc -- the object descriptor.
- *                             bufP -- pointer to a buffer in the buffer cache; caller
- *                                             must free this.
- *
- *             Returns:
- *                             A heap tuple containing the desired block, or NULL if no
- *                             such tuple exists.
- */
-static void
-inv_fetchtup(LargeObjectDesc *obj_desc, HeapTuple tuple, Buffer *buffer)
-{
-       RetrieveIndexResult res;
-       Datum           d;
-       int                     firstbyte,
-                               lastbyte;
-       struct varlena *fsblock;
-       bool            isNull;
-
-       /*
-        * If we've exhausted the current block, we need to get the next one.
-        * When we support time travel and non-functional updates, we will
-        * need to loop over the blocks, rather than just have an 'if', in
-        * order to find the one we're really interested in.
-        */
-
-       if (obj_desc->offset > obj_desc->highbyte
-               || obj_desc->offset < obj_desc->lowbyte
-               || !ItemPointerIsValid(&(obj_desc->htid)))
-       {
-               ScanKeyData skey;
-
-               ScanKeyEntryInitialize(&skey, 0x0, 1, F_INT4GE,
-                                                          Int32GetDatum(obj_desc->offset));
-
-               /* initialize scan key if not done */
-               if (obj_desc->iscan == (IndexScanDesc) NULL)
-               {
-
-                       /*
-                        * As scan index may be prematurely closed (on commit), we
-                        * must use object current offset (was 0) to reinitialize the
-                        * entry [ PA ].
-                        */
-                       obj_desc->iscan = index_beginscan(obj_desc->index_r,
-                                                                                         (bool) 0, (uint16) 1,
-                                                                                         &skey);
-               }
-               else
-                       index_rescan(obj_desc->iscan, false, &skey);
-
-               do
-               {
-                       res = index_getnext(obj_desc->iscan, ForwardScanDirection);
-
-                       if (res == (RetrieveIndexResult) NULL)
-                       {
-                               ItemPointerSetInvalid(&(obj_desc->htid));
-                               tuple->t_datamcxt = NULL;
-                               tuple->t_data = NULL;
-                               return;
-                       }
+       index_endscan(sd);
+       if (tuple.t_data == NULL)
+               return 0;
+       
+       (*found)++;
+       data = (Form_pg_largeobject) GETSTRUCT(&tuple);
+       off = obj_desc->offset % IBLKSIZE;
+       len = getbytealen(&(data->data));
 
-                       /*
-                        * For time travel, we need to use the actual time qual here,
-                        * rather that NowTimeQual.  We currently have no way to pass
-                        * a time qual in.
-                        *
-                        * This is now valid for snapshot !!! And should be fixed in some
-                        * way...       - vadim 07/28/98
-                        *
-                        */
-                       tuple->t_self = res->heap_iptr;
-                       heap_fetch(obj_desc->heap_r, SnapshotNow, tuple, buffer);
-                       pfree(res);
-               } while (tuple->t_data == NULL);
-
-               /* remember this tid -- we may need it for later reads/writes */
-               ItemPointerCopy(&(tuple->t_self), &obj_desc->htid);
-       }
-       else
-       {
-               tuple->t_self = obj_desc->htid;
-               heap_fetch(obj_desc->heap_r, SnapshotNow, tuple, buffer);
-               if (tuple->t_data == NULL)
-                       elog(ERROR, "inv_fetchtup: heap_fetch failed");
+       if (len > IBLKSIZE) {
+               ReleaseBuffer(buffer);
+               elog(FATAL, "Internal error: len > IBLKSIZE");
        }
 
-       /*
-        * By here, we have the heap tuple we're interested in.  We cache the
-        * upper and lower bounds for this block in the object descriptor and
-        * return the tuple.
-        */
-
-       d = heap_getattr(tuple, 1, obj_desc->hdesc, &isNull);
-       lastbyte = (int32) DatumGetInt32(d);
-       d = heap_getattr(tuple, 2, obj_desc->hdesc, &isNull);
-       fsblock = (struct varlena *) DatumGetPointer(d);
-
-       /*
-        * order of + and - is important -- these are unsigned quantites near
-        * 0
-        */
-       firstbyte = (lastbyte + 1 + sizeof(fsblock->vl_len)) - fsblock->vl_len;
-
-       obj_desc->lowbyte = firstbyte;
-       obj_desc->highbyte = lastbyte;
-
-       return;
-}
-
-/*
- *     inv_wrnew() -- append a new filesystem block tuple to the inversion
- *                                     file.
- *
- *             In response to an inv_write, we append one or more file system
- *             blocks to the class containing the large object.  We violate the
- *             class abstraction here in order to pack things as densely as we
- *             are able.  We examine the last page in the relation, and write
- *             just enough to fill it, assuming that it has above a certain
- *             threshold of space available.  If the space available is less than
- *             the threshold, we allocate a new page by writing a big tuple.
- *
- *             By the time we get here, we know all the parameters passed in
- *             are valid, and that we hold the appropriate lock on the heap
- *             relation.
- *
- *             Parameters:
- *                             obj_desc: large object descriptor for which to append block.
- *                             buf: buffer containing data to write.
- *                             nbytes: amount to write
- *
- *             Returns:
- *                             number of bytes actually written to the new tuple.
- */
-static int
-inv_wrnew(LargeObjectDesc *obj_desc, char *buf, int nbytes)
-{
-       Relation        hr;
-       HeapTuple       ntup;
-       Buffer          buffer;
-       Page            page;
-       int                     nblocks;
-       int                     nwritten;
-
-       hr = obj_desc->heap_r;
-
-       /*
-        * Get the last block in the relation.  If there's no data in the
-        * relation at all, then we just get a new block.  Otherwise, we check
-        * the last block to see whether it has room to accept some or all of
-        * the data that the user wants to write.  If it doesn't, then we
-        * allocate a new block.
-        */
-
-       nblocks = RelationGetNumberOfBlocks(hr);
-
-       if (nblocks > 0)
-       {
-               buffer = ReadBuffer(hr, nblocks - 1);
-               page = BufferGetPage(buffer);
+       for (i=0; i<Natts_pg_largeobject; i++) {
+               nulls[i] = ' ';
+               replace[i] = ' ';
+               values[i] = (Datum)NULL;
        }
-       else
-       {
-               buffer = ReadBuffer(hr, P_NEW);
-               page = BufferGetPage(buffer);
-               PageInit(page, BufferGetPageSize(buffer), 0);
-       }
-
-       /*
-        * If the last page is too small to hold all the data, and it's too
-        * small to hold IMINBLK, then we allocate a new page.  If it will
-        * hold at least IMINBLK, but less than all the data requested, then
-        * we write IMINBLK here.  The caller is responsible for noticing that
-        * less than the requested number of bytes were written, and calling
-        * this routine again.
-        */
 
-       nwritten = IFREESPC(page);
-       if (nwritten < nbytes)
+       i = 0;
        {
-               if (nwritten < IMINBLK)
-               {
-                       ReleaseBuffer(buffer);
-                       buffer = ReadBuffer(hr, P_NEW);
-                       page = BufferGetPage(buffer);
-                       PageInit(page, BufferGetPageSize(buffer), 0);
-                       if (nbytes > IMAXBLK)
-                               nwritten = IMAXBLK;
+               char b[IBLKSIZE];
+               int4 rest = len - off;
+
+               memset(b, 0, IBLKSIZE); /* Can optimize later */
+               if ((off > 0) && (len > 0)) /* We start in the middle of the tuple */
+                       memcpy(b, VARDATA(&(data->data)), (off > len) ? len : off);
+               
+               if ((nbytes <= rest) || (len == IBLKSIZE)) {
+                       /* We will update inside existing tuple size */
+                       if (nbytes < rest)
+                               n = rest;
+                       else
+                               n = nbytes;
+                       memcpy(b + off, buf, n);
+                       if (n < rest) /* There's a rest of the tuple left */
+                               memcpy(b + off + n, VARDATA(&(data->data)) + off + n, rest - n);
+                       /* Update data only */
+                       replace[2] = 'r';
+                       values[2] = (Datum) _byteain(b, len);
+                } else {
+                       /* We will extend tuple */
+                       /* Do we fit into max tuple size */
+                       if (nbytes <= (IBLKSIZE - off))
+                               len = off + nbytes;
                        else
-                               nwritten = nbytes;
+                               len = IBLKSIZE;
+                       n = len - off;
+                       memcpy(b + off, buf, n);
+                       /* Update data */
+                       replace[2] = 'r';
+                       values[2] = (Datum) _byteain(b, len);
+                }
+
+               newtup = heap_modifytuple(&tuple, obj_desc->heap_r,
+                                         values, nulls, replace);
+
+               heap_update(obj_desc->heap_r, &newtup->t_self, newtup, NULL);
+               if (!IsIgnoringSystemIndexes()) {
+                       CatalogOpenIndices(Num_pg_largeobject_indices, Name_pg_largeobject_indices, idescs);
+                       CatalogIndexInsert(idescs, Num_pg_largeobject_indices, obj_desc->heap_r, newtup);
+                       CatalogCloseIndices(Num_pg_largeobject_indices, idescs);
                }
+               heap_freetuple(newtup);
        }
-       else
-               nwritten = nbytes;
-
-       /*
-        * Insert a new file system block tuple, index it, and write it out.
-        */
-
-       ntup = inv_newtuple(obj_desc, buffer, page, buf, nwritten);
-       inv_indextup(obj_desc, ntup);
-       heap_freetuple(ntup);
-
-       /* new tuple is inserted */
-       WriteBuffer(buffer);
-
-       return nwritten;
+       ReleaseBuffer(buffer);
+       
+       return n;
 }
 
-static int
-inv_wrold(LargeObjectDesc *obj_desc,
-                 char *dbuf,
-                 int nbytes,
-                 HeapTuple tuple,
-                 Buffer buffer)
-{
-       Relation        hr;
-       HeapTuple       ntup;
-       Buffer          newbuf;
-       Page            page;
-       Page            newpage;
-       int                     tupbytes;
-       Datum           d;
-       struct varlena *fsblock;
-       int                     nwritten,
-                               nblocks,
-                               freespc;
-       bool            isNull;
-       int                     keep_offset;
-       RetrieveIndexResult res;
-
-       /*
-        * Since we're using a no-overwrite storage manager, the way we
-        * overwrite blocks is to mark the old block invalid and append a new
-        * block.  First mark the old block invalid.  This violates the tuple
-        * abstraction.
-        */
-
-       TransactionIdStore(GetCurrentTransactionId(), &(tuple->t_data->t_xmax));
-       tuple->t_data->t_cmax = GetCurrentCommandId();
-       tuple->t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID);
-
-       /*
-        * If we're overwriting the entire block, we're lucky.  All we need to
-        * do is to insert a new block.
-        */
-
-       if (obj_desc->offset == obj_desc->lowbyte
-               && obj_desc->lowbyte + nbytes >= obj_desc->highbyte)
-       {
-               WriteBuffer(buffer);
-               return inv_wrnew(obj_desc, dbuf, nbytes);
-       }
-
-       /*
-        * By here, we need to overwrite part of the data in the current
-        * tuple.  In order to reduce the degree to which we fragment blocks,
-        * we guarantee that no block will be broken up due to an overwrite.
-        * This means that we need to allocate a tuple on a new page, if
-        * there's not room for the replacement on this one.
-        */
-
-       newbuf = buffer;
-       page = BufferGetPage(buffer);
-       newpage = BufferGetPage(newbuf);
-       hr = obj_desc->heap_r;
-       freespc = IFREESPC(page);
-       d = heap_getattr(tuple, 2, obj_desc->hdesc, &isNull);
-       fsblock = (struct varlena *) DatumGetPointer(d);
-       tupbytes = fsblock->vl_len - sizeof(fsblock->vl_len);
-
-       if (freespc < tupbytes)
-       {
-
-               /*
-                * First see if there's enough space on the last page of the table
-                * to put this tuple.
-                */
-
-               nblocks = RelationGetNumberOfBlocks(hr);
-
-               if (nblocks > 0)
-               {
-                       newbuf = ReadBuffer(hr, nblocks - 1);
-                       newpage = BufferGetPage(newbuf);
-               }
-               else
-               {
-                       newbuf = ReadBuffer(hr, P_NEW);
-                       newpage = BufferGetPage(newbuf);
-                       PageInit(newpage, BufferGetPageSize(newbuf), 0);
-               }
-
-               freespc = IFREESPC(newpage);
-
-               /*
-                * If there's no room on the last page, allocate a new last page
-                * for the table, and put it there.
-                */
-
-               if (freespc < tupbytes)
-               {
-                       ReleaseBuffer(newbuf);
-                       newbuf = ReadBuffer(hr, P_NEW);
-                       newpage = BufferGetPage(newbuf);
-                       PageInit(newpage, BufferGetPageSize(newbuf), 0);
-               }
-       }
-
-       nwritten = nbytes;
-       if (nwritten > obj_desc->highbyte - obj_desc->offset + 1)
-               nwritten = obj_desc->highbyte - obj_desc->offset + 1;
-       memmove(VARDATA(fsblock) + (obj_desc->offset - obj_desc->lowbyte),
-                       dbuf, nwritten);
-
-       /*
-        * we are rewriting the entire old block, therefore we reset offset to
-        * the lowbyte of the original block before jumping into
-        * inv_newtuple()
-        */
-       keep_offset = obj_desc->offset;
-       obj_desc->offset = obj_desc->lowbyte;
-       ntup = inv_newtuple(obj_desc, newbuf, newpage, VARDATA(fsblock),
-                                               tupbytes);
-       /* after we are done, we restore to the true offset */
-       obj_desc->offset = keep_offset;
-
-       /*
-        * By here, we have a page (newpage) that's guaranteed to have enough
-        * space on it to put the new tuple.  Call inv_newtuple to do the
-        * work.  Passing NULL as a buffer to inv_newtuple() keeps it from
-        * copying any data into the new tuple.  When it returns, the tuple is
-        * ready to receive data from the old tuple and the user's data
-        * buffer.
-        */
-/*
-       ntup = inv_newtuple(obj_desc, newbuf, newpage, (char *) NULL, tupbytes);
-       dptr = ((char *) ntup) + ntup->t_hoff -
-                               (sizeof(HeapTupleData) - offsetof(HeapTupleData, t_bits)) +
-                               sizeof(int4)
-                               + sizeof(fsblock->vl_len);
-
-       if (obj_desc->offset > obj_desc->lowbyte) {
-               memmove(dptr,
-                               &(fsblock->vl_dat[0]),
-                               obj_desc->offset - obj_desc->lowbyte);
-               dptr += obj_desc->offset - obj_desc->lowbyte;
+static int inv_write_append(LargeObjectDesc *obj_desc, char *buf, int nbytes) {
+       HeapTuple       ntup = (HeapTuple) palloc(sizeof(HeapTupleData));
+       Relation                idescs[Num_pg_largeobject_indices];
+       Datum           values[Natts_pg_largeobject];
+       char            nulls[Natts_pg_largeobject];
+       int             i;
+       uint32          len;
+       
+       for (i=0; i<Natts_pg_largeobject; i++) {
+               nulls[i] = ' ';
+               values[i] = (Datum)NULL;
        }
 
-
-       nwritten = nbytes;
-       if (nwritten > obj_desc->highbyte - obj_desc->offset + 1)
-               nwritten = obj_desc->highbyte - obj_desc->offset + 1;
-
-       memmove(dptr, dbuf, nwritten);
-       dptr += nwritten;
-
-       if (obj_desc->offset + nwritten < obj_desc->highbyte + 1) {
-*/
-/*
-               loc = (obj_desc->highbyte - obj_desc->offset)
-                               + nwritten;
-               sz = obj_desc->highbyte - (obj_desc->lowbyte + loc);
-
-               what's going on here?? - jolly
-*/
-/*
-               sz = (obj_desc->highbyte + 1) - (obj_desc->offset + nwritten);
-               memmove(&(fsblock->vl_dat[0]), dptr, sz);
+       i = 0;
+       values[i++] = ObjectIdGetDatum(obj_desc->id);
+       len = (nbytes > IBLKSIZE) ? IBLKSIZE : nbytes;
+               
+       values[i++] = Int32GetDatum(obj_desc->offset / IBLKSIZE);
+       values[i++] = (Datum) _byteain(buf, len);
+       
+       ntup = heap_formtuple(obj_desc->heap_r->rd_att, values, nulls);
+       heap_insert(obj_desc->heap_r, ntup);
+
+       if (!IsIgnoringSystemIndexes()) {
+               CatalogOpenIndices(Num_pg_largeobject_indices, Name_pg_largeobject_indices, idescs);
+               CatalogIndexInsert(idescs, Num_pg_largeobject_indices, obj_desc->heap_r, ntup);
+               CatalogCloseIndices(Num_pg_largeobject_indices, idescs);
        }
-*/
-
-
-       /* index the new tuple */
-       inv_indextup(obj_desc, ntup);
+       
        heap_freetuple(ntup);
-
-       /*
-        * move the scandesc forward so we don't reread the newly inserted
-        * tuple on the next index scan
-        */
-       res = NULL;
-       if (obj_desc->iscan)
-               res = index_getnext(obj_desc->iscan, ForwardScanDirection);
-
-       if (res)
-               pfree(res);
-
-       /*
-        * Okay, by here, a tuple for the new block is correctly placed,
-        * indexed, and filled.  Write the changed pages out.
-        */
-
-       WriteBuffer(buffer);
-       if (newbuf != buffer)
-               WriteBuffer(newbuf);
-
-       /* Tuple id is no longer valid */
-       ItemPointerSetInvalid(&(obj_desc->htid));
-
-       /* done */
-       return nwritten;
+       
+       return len;
 }
+       
+static int inv_write_int(LargeObjectDesc *obj_desc, char *buf, int nbytes) {
+       int                     nwritten = 0;
+       int                     found = 0;
+       
+       if (nbytes == 0)
+               return 0;
 
-static HeapTuple
-inv_newtuple(LargeObjectDesc *obj_desc,
-                        Buffer buffer,
-                        Page page,
-                        char *dbuf,
-                        int nwrite)
-{
-       HeapTuple       ntup = (HeapTuple) palloc(sizeof(HeapTupleData));
-       PageHeader      ph;
-       int                     tupsize;
-       int                     hoff;
-       Offset          lower;
-       Offset          upper;
-       ItemId          itemId;
-       OffsetNumber off;
-       OffsetNumber limit;
-       char       *attptr;
-
-       /* compute tuple size -- no nulls */
-       hoff = offsetof(HeapTupleHeaderData, t_bits);
-       hoff = MAXALIGN(hoff);
-
-       /* add in olastbyte, varlena.vl_len, varlena.vl_dat */
-       tupsize = hoff + (2 * sizeof(int32)) + nwrite;
-       tupsize = MAXALIGN(tupsize);
-
-       /*
-        * Allocate the tuple on the page, violating the page abstraction.
-        * This code was swiped from PageAddItem().
-        */
-
-       ph = (PageHeader) page;
-       limit = OffsetNumberNext(PageGetMaxOffsetNumber(page));
-
-       /* look for "recyclable" (unused & deallocated) ItemId */
-       for (off = FirstOffsetNumber; off < limit; off = OffsetNumberNext(off))
-       {
-               itemId = &ph->pd_linp[off - 1];
-               if ((((*itemId).lp_flags & LP_USED) == 0) &&
-                       ((*itemId).lp_len == 0))
-                       break;
+       nwritten = inv_write_existing(obj_desc, buf, nbytes, &found);
+       if (found > 0) {
+               obj_desc->offset += nwritten;
+               return nwritten;
        }
-
-       if (off > limit)
-               lower = (Offset) (((char *) (&ph->pd_linp[off])) - ((char *) page));
-       else if (off == limit)
-               lower = ph->pd_lower + sizeof(ItemIdData);
-       else
-               lower = ph->pd_lower;
-
-       upper = ph->pd_upper - tupsize;
-
-       itemId = &ph->pd_linp[off - 1];
-       (*itemId).lp_off = upper;
-       (*itemId).lp_len = tupsize;
-       (*itemId).lp_flags = LP_USED;
-       ph->pd_lower = lower;
-       ph->pd_upper = upper;
-
-       ntup->t_datamcxt = NULL;
-       ntup->t_data = (HeapTupleHeader) ((char *) page + upper);
-
-       /*
-        * Tuple is now allocated on the page.  Next, fill in the tuple
-        * header.      This block of code violates the tuple abstraction.
-        */
-
-       ntup->t_len = tupsize;
-       ItemPointerSet(&ntup->t_self, BufferGetBlockNumber(buffer), off);
-       ntup->t_data->t_oid = newoid();
-       TransactionIdStore(GetCurrentTransactionId(), &(ntup->t_data->t_xmin));
-       ntup->t_data->t_cmin = GetCurrentCommandId();
-       StoreInvalidTransactionId(&(ntup->t_data->t_xmax));
-       ntup->t_data->t_cmax = 0;
-       ntup->t_data->t_infomask = HEAP_XMAX_INVALID;
-       ntup->t_data->t_natts = 2;
-       ntup->t_data->t_hoff = hoff;
-
-       /* if a NULL is passed in, avoid the calculations below */
-       if (dbuf == NULL)
-               return ntup;
-
-       /*
-        * Finally, copy the user's data buffer into the tuple.  This violates
-        * the tuple and class abstractions.
-        */
-
-       attptr = ((char *) ntup->t_data) + hoff;
-       *((int32 *) attptr) = obj_desc->offset + nwrite - 1;
-       attptr += sizeof(int32);
-
-       /*
-        * *  mer fixed disk layout of varlenas to get rid of the need for
-        * this. *
-        *
-        * ((int32 *) attptr) = nwrite + sizeof(int32); *  attptr +=
-        * sizeof(int32);
-        */
-
-       *((int32 *) attptr) = nwrite + sizeof(int32);
-       attptr += sizeof(int32);
-
-       /*
-        * If a data buffer was passed in, then copy the data from the buffer
-        * to the tuple.  Some callers (eg, inv_wrold()) may not pass in a
-        * buffer, since they have to copy part of the old tuple data and part
-        * of the user's new data into the new tuple.
-        */
-
-       if (dbuf != (char *) NULL)
-               memmove(attptr, dbuf, nwrite);
-
-       /* keep track of boundary of current tuple */
-       obj_desc->lowbyte = obj_desc->offset;
-       obj_desc->highbyte = obj_desc->offset + nwrite - 1;
-
-       /* new tuple is filled -- return it */
-       return ntup;
-}
-
-static void
-inv_indextup(LargeObjectDesc *obj_desc, HeapTuple tuple)
-{
-       InsertIndexResult res;
-       Datum           v[1];
-       char            n[1];
-
-       n[0] = ' ';
-       v[0] = Int32GetDatum(obj_desc->highbyte);
-       res = index_insert(obj_desc->index_r, &v[0], &n[0],
-                                          &(tuple->t_self), obj_desc->heap_r);
-
-       if (res)
-               pfree(res);
-}
-
-#ifdef NOT_USED
-
-static void
-DumpPage(Page page, int blkno)
-{
-               ItemId                  lp;
-               HeapTuple               tup;
-               int                             flags, i, nline;
-               ItemPointerData pointerData;
-
-               printf("\t[subblock=%d]:lower=%d:upper=%d:special=%d\n", 0,
-                               ((PageHeader)page)->pd_lower, ((PageHeader)page)->pd_upper,
-                               ((PageHeader)page)->pd_special);
-
-               printf("\t:MaxOffsetNumber=%d\n",
-                          (int16) PageGetMaxOffsetNumber(page));
-
-               nline = (int16) PageGetMaxOffsetNumber(page);
-
-{
-               int             i;
-               char    *cp;
-
-               i = PageGetSpecialSize(page);
-               cp = PageGetSpecialPointer(page);
-
-               printf("\t:SpecialData=");
-
-               while (i > 0) {
-                               printf(" 0x%02x", *cp);
-                               cp += 1;
-                               i -= 1;
-               }
-               printf("\n");
-}
-               for (i = 0; i < nline; i++) {
-                               lp = ((PageHeader)page)->pd_linp + i;
-                               flags = (*lp).lp_flags;
-                               ItemPointerSet(&pointerData, blkno, 1 + i);
-                               printf("%s:off=%d:flags=0x%x:len=%d",
-                                               ItemPointerFormExternal(&pointerData), (*lp).lp_off,
-                                               flags, (*lp).lp_len);
-
-                               if (flags & LP_USED) {
-                                               HeapTupleData   htdata;
-
-                                               printf(":USED");
-
-                                               memmove((char *) &htdata,
-                                                               (char *) &((char *)page)[(*lp).lp_off],
-                                                               sizeof(htdata));
-
-                                               tup = &htdata;
-
-                                               printf("\n\t:ctid=%s:oid=%d",
-                                                               ItemPointerFormExternal(&tup->t_ctid),
-                                                               tup->t_oid);
-                                               printf(":natts=%d:thoff=%d:",
-                                                               tup->t_natts,
-                                                               tup->t_hoff);
-
-                                               printf("\n\t:cmin=%u:",
-                                                               tup->t_cmin);
-
-                                               printf("xmin=%u:", tup->t_xmin);
-
-                                               printf("\n\t:cmax=%u:",
-                                                               tup->t_cmax);
-
-                                               printf("xmax=%u:\n", tup->t_xmax);
-
-                               } else
-                                               putchar('\n');
-               }
-}
-
-static char*
-ItemPointerFormExternal(ItemPointer pointer)
-{
-               static char             itemPointerString[32];
-
-               if (!ItemPointerIsValid(pointer)) {
-                       memmove(itemPointerString, "<-,-,->", sizeof "<-,-,->");
-               } else {
-                       sprintf(itemPointerString, "<%u,%u>",
-                                       ItemPointerGetBlockNumber(pointer),
-                                       ItemPointerGetOffsetNumber(pointer));
-               }
-
-               return itemPointerString;
+       /* Looks like we are beyond the end of the file */
+       nwritten = inv_write_append(obj_desc, buf, nbytes);
+       obj_desc->offset += nwritten;
+       return nwritten;
 }
 
-#endif
+static int count = 0;
 
-static int
-_inv_getsize(Relation hreln, TupleDesc hdesc, Relation ireln)
-{
-       IndexScanDesc iscan;
-       RetrieveIndexResult res;
-       HeapTupleData tuple;
-       Datum           d;
-       long            size;
-       bool            isNull;
-       Buffer          buffer;
-
-       /* scan backwards from end */
-       iscan = index_beginscan(ireln, (bool) 1, 0, (ScanKey) NULL);
-
-       do
-       {
-               res = index_getnext(iscan, BackwardScanDirection);
-
-               /*
-                * If there are no more index tuples, then the relation is empty,
-                * so the file's size is zero.
-                */
-
-               if (res == (RetrieveIndexResult) NULL)
-               {
-                       index_endscan(iscan);
-                       return 0;
-               }
-
-               /*
-                * For time travel, we need to use the actual time qual here,
-                * rather that NowTimeQual.  We currently have no way to pass a
-                * time qual in.
-                */
-               tuple.t_self = res->heap_iptr;
-               heap_fetch(hreln, SnapshotNow, &tuple, &buffer);
-               pfree(res);
-       } while (tuple.t_data == NULL);
-
-       /* don't need the index scan anymore */
-       index_endscan(iscan);
-
-       /* get olastbyte attribute */
-       d = heap_getattr(&tuple, 1, hdesc, &isNull);
-       size = DatumGetInt32(d) + 1;
-       ReleaseBuffer(buffer);
+int
+inv_write(LargeObjectDesc *obj_desc, char *buf, int nbytes) {
+       int nwritten = 0;
+       while (nwritten < nbytes)
+               nwritten += inv_write_int(obj_desc, buf + nwritten, nbytes - nwritten);
 
-       return size;
+       return nwritten;
 }
index 83fd0e6..debf1f2 100644 (file)
@@ -22,7 +22,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/bin/pg_dump/pg_dump.c,v 1.170 2000/10/13 00:43:31 pjw Exp $
+ *       $Header: /cvsroot/pgsql/src/bin/pg_dump/pg_dump.c,v 1.171 2000/10/21 15:55:26 momjian Exp $
  *
  * Modifications - 6/10/96 - dave@bensoft.com - version 1.13.dhb
  *
@@ -1104,7 +1104,7 @@ dumpBlobs(Archive *AH, char* junkOid, void *junkVal)
                fprintf(stderr, "%s saving BLOBs\n", g_comment_start);
 
        /* Cursor to get all BLOB tables */
-    appendPQExpBuffer(oidQry, "Declare blobOid Cursor for SELECT oid from pg_class where relkind = '%c'", RELKIND_LOBJECT);
+    appendPQExpBuffer(oidQry, "Declare blobOid Cursor for SELECT loid from pg_largeobject");
 
        res = PQexec(g_conn, oidQry->data);
        if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
index 765f9b2..cb95771 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: catname.h,v 1.14 2000/10/08 03:53:15 momjian Exp $
+ * $Id: catname.h,v 1.15 2000/10/21 15:55:28 momjian Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -29,6 +29,7 @@
 #define  InheritsRelationName "pg_inherits"
 #define  InheritancePrecidenceListRelationName "pg_ipl"
 #define  LanguageRelationName "pg_language"
+#define  LargeobjectRelationName "pg_largeobject"
 #define  ListenerRelationName "pg_listener"
 #define  LogRelationName "pg_log"
 #define  OperatorClassRelationName "pg_opclass"
index d96115f..7bee3e0 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: indexing.h,v 1.42 2000/10/08 03:53:15 momjian Exp $
+ * $Id: indexing.h,v 1.43 2000/10/21 15:55:28 momjian Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -31,6 +31,7 @@
 #define Num_pg_index_indices           2
 #define Num_pg_inherits_indices                1
 #define Num_pg_language_indices                2
+#define Num_pg_largeobject_indices             2
 #define Num_pg_listener_indices                1
 #define Num_pg_opclass_indices         2
 #define Num_pg_operator_indices                2
@@ -62,6 +63,8 @@
 #define InheritsRelidSeqnoIndex                "pg_inherits_relid_seqno_index"
 #define LanguageNameIndex                      "pg_language_name_index"
 #define LanguageOidIndex                       "pg_language_oid_index"
+#define LargeobjectLOIdIndex                   "pg_largeobject_loid_index"
+#define LargeobjectLOIdPNIndex                 "pg_largeobject_loid_pn_index"
 #define ListenerPidRelnameIndex                "pg_listener_pid_relname_index"
 #define OpclassDeftypeIndex                    "pg_opclass_deftype_index"
 #define OpclassNameIndex                       "pg_opclass_name_index"
@@ -92,6 +95,7 @@ extern char *Name_pg_group_indices[];
 extern char *Name_pg_index_indices[];
 extern char *Name_pg_inherits_indices[];
 extern char *Name_pg_language_indices[];
+extern char *Name_pg_largeobject_indices[];
 extern char *Name_pg_listener_indices[];
 extern char *Name_pg_opclass_indices[];
 extern char *Name_pg_operator_indices[];
@@ -191,6 +195,8 @@ DECLARE_UNIQUE_INDEX(pg_index_indexrelid_index on pg_index using btree(indexreli
 DECLARE_UNIQUE_INDEX(pg_inherits_relid_seqno_index on pg_inherits using btree(inhrelid oid_ops, inhseqno int4_ops));
 DECLARE_UNIQUE_INDEX(pg_language_name_index on pg_language using btree(lanname name_ops));
 DECLARE_UNIQUE_INDEX(pg_language_oid_index on pg_language using btree(oid oid_ops));
+DECLARE_INDEX(pg_largeobject_loid_index on pg_largeobject using btree(loid oid_ops));
+DECLARE_UNIQUE_INDEX(pg_largeobject_loid_pn_index on pg_largeobject using btree(loid oid_ops, pageno int4_ops));
 DECLARE_UNIQUE_INDEX(pg_listener_pid_relname_index on pg_listener using btree(listenerpid int4_ops, relname name_ops));
 /* This column needs to allow multiple zero entries, but is in the cache */
 DECLARE_INDEX(pg_opclass_deftype_index on pg_opclass using btree(opcdeftype oid_ops));
diff --git a/src/include/catalog/pg_largeobject.h b/src/include/catalog/pg_largeobject.h
new file mode 100644 (file)
index 0000000..409aaf8
--- /dev/null
@@ -0,0 +1,63 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_largeobject.h
+ *       definition of the system "largeobject" relation (pg_largeobject)
+ *       along with the relation's initial contents.
+ *
+ *
+ * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * $Id: pg_largeobject.h,v 1.3 2000/10/21 15:55:28 momjian Exp $
+ *
+ * NOTES
+ *       the genbki.sh script reads this file and generates .bki
+ *       information from the DATA() statements.
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_LARGEOBJECT_H
+#define PG_LARGEOBJECT_H
+
+/* ----------------
+ *             postgres.h contains the system type definintions and the
+ *             CATALOG(), BOOTSTRAP and DATA() sugar words so this file
+ *             can be read by both genbki.sh and the C compiler.
+ * ----------------
+ */
+
+/* ----------------
+ *             pg_largeobject definition.  cpp turns this into
+ *             typedef struct FormData_pg_largeobject. Large object id
+ *             is stored in loid;
+ * ----------------
+ */
+
+CATALOG(pg_largeobject)
+{
+       Oid                     loid;
+       int4                    pageno;
+       bytea                   data;
+} FormData_pg_largeobject;
+
+/* ----------------
+ *             Form_pg_largeobject corresponds to a pointer to a tuple with
+ *             the format of pg_largeobject relation.
+ * ----------------
+ */
+typedef FormData_pg_largeobject *Form_pg_largeobject;
+
+/* ----------------
+ *             compiler constants for pg_largeobject
+ * ----------------
+ */
+#define Natts_pg_largeobject                   3
+#define Anum_pg_largeobject_loid               1
+#define Anum_pg_largeobject_pageno             2
+#define Anum_pg_largeobject_data               3
+
+Oid LargeobjectCreate(Oid loid);
+void LargeobjectDrop(Oid loid);
+int LargeobjectFind(Oid loid);
+
+#endif  /* PG_LARGEOBJECT_H */
index b3d3512..77990c5 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: large_object.h,v 1.15 2000/10/08 03:53:15 momjian Exp $
+ * $Id: large_object.h,v 1.16 2000/10/21 15:55:29 momjian Exp $
  *
  *-------------------------------------------------------------------------
  */
 /*
  * This structure will eventually have lots more stuff associated with it.
  */
-typedef struct LargeObjectDesc
-{
-       Relation        heap_r;                 /* heap relation */
-       Relation        index_r;                /* index relation on seqno attribute */
-       IndexScanDesc iscan;            /* index scan we're using */
-       TupleDesc       hdesc;                  /* heap relation tuple desc */
-       TupleDesc       idesc;                  /* index relation tuple desc */
-       uint32          lowbyte;                /* low byte on the current page */
-       uint32          highbyte;               /* high byte on the current page */
+typedef struct LargeObjectDesc {
+       Relation        heap_r;
+       Relation        index_r;
        uint32          offset;                 /* current seek pointer */
-       ItemPointerData htid;           /* tid of current heap tuple */
+       Oid             id;
 
 #define IFS_RDLOCK             (1 << 0)
 #define IFS_WRLOCK             (1 << 1)
@@ -55,7 +49,4 @@ extern int    inv_tell(LargeObjectDesc *obj_desc);
 extern int     inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes);
 extern int     inv_write(LargeObjectDesc *obj_desc, char *buf, int nbytes);
 
-/* added for buffer leak prevention [ PA ] */
-extern void inv_cleanindex(LargeObjectDesc *obj_desc);
-
 #endif  /* LARGE_OBJECT_H */