OSDN Git Service

Export the external file reader used in COPY FROM as APIs.
authorItagaki Takahiro <itagaki.takahiro@gmail.com>
Wed, 16 Feb 2011 02:19:11 +0000 (11:19 +0900)
committerItagaki Takahiro <itagaki.takahiro@gmail.com>
Wed, 16 Feb 2011 02:19:11 +0000 (11:19 +0900)
They are expected to be used by extension modules like file_fdw.
There are no user-visible changes.

Itagaki Takahiro
Reviewed and tested by Kevin Grittner and Noah Misch.

src/backend/commands/copy.c
src/include/commands/copy.h

index 3350ca0..9f7263d 100644 (file)
@@ -93,13 +93,11 @@ typedef struct CopyStateData
        FILE       *copy_file;          /* used if copy_dest == COPY_FILE */
        StringInfo      fe_msgbuf;              /* used for all dests during COPY TO, only for
                                                                 * dest == COPY_NEW_FE in COPY FROM */
-       bool            fe_copy;                /* true for all FE copy dests */
        bool            fe_eof;                 /* true if detected end of copy data */
        EolType         eol_type;               /* EOL type of input */
        int                     client_encoding;        /* remote side's character encoding */
        bool            need_transcoding;               /* client encoding diff from server? */
        bool            encoding_embeds_ascii;  /* ASCII can be non-first byte? */
-       uint64          processed;              /* # of tuples processed */
 
        /* parameters from the COPY command */
        Relation        rel;                    /* relation to copy to or from */
@@ -119,19 +117,36 @@ typedef struct CopyStateData
        bool       *force_quote_flags;          /* per-column CSV FQ flags */
        bool       *force_notnull_flags;        /* per-column CSV FNN flags */
 
-       /* these are just for error messages, see copy_in_error_callback */
+       /* these are just for error messages, see CopyFromErrorCallback */
        const char *cur_relname;        /* table name for error messages */
        int                     cur_lineno;             /* line number for error messages */
        const char *cur_attname;        /* current att for error messages */
        const char *cur_attval;         /* current att value for error messages */
 
        /*
+        * Working state for COPY TO/FROM
+        */
+       MemoryContext copycontext;      /* per-copy execution context */
+
+       /*
         * Working state for COPY TO
         */
        FmgrInfo   *out_functions;      /* lookup info for output functions */
        MemoryContext rowcontext;       /* per-row evaluation context */
 
        /*
+        * Working state for COPY FROM
+        */
+       AttrNumber      num_defaults;
+       bool            file_has_oids;
+       FmgrInfo        oid_in_function;
+       Oid                     oid_typioparam;
+       FmgrInfo   *in_functions;       /* array of input functions for each attrs */
+       Oid                *typioparams;        /* array of element types for in_functions */
+       int                *defmap;                     /* array of default att numbers */
+       ExprState **defexprs;           /* array of default att expressions */
+
+       /*
         * These variables are used to reduce overhead in textual COPY FROM.
         *
         * attribute_buf holds the separated, de-escaped text for each field of
@@ -169,13 +184,12 @@ typedef struct CopyStateData
        int                     raw_buf_len;    /* total # of bytes stored */
 } CopyStateData;
 
-typedef CopyStateData *CopyState;
-
 /* DestReceiver for COPY (SELECT) TO */
 typedef struct
 {
        DestReceiver pub;                       /* publicly-known function pointers */
        CopyState       cstate;                 /* CopyStateData for the command */
+       uint64          processed;              /* # of tuples processed */
 } DR_copy;
 
 
@@ -248,11 +262,17 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
 
 
 /* non-export function prototypes */
-static void DoCopyTo(CopyState cstate);
-static void CopyTo(CopyState cstate);
+static CopyState BeginCopy(bool is_from, Relation rel, Node *raw_query,
+                               const char *queryString, List *attnamelist, List *options);
+static void EndCopy(CopyState cstate);
+static CopyState BeginCopyTo(Relation rel, Node *query, const char *queryString,
+                               const char *filename, List *attnamelist, List *options);
+static void EndCopyTo(CopyState cstate);
+static uint64 DoCopyTo(CopyState cstate);
+static uint64 CopyTo(CopyState cstate);
 static void CopyOneRowTo(CopyState cstate, Oid tupleOid,
                         Datum *values, bool *nulls);
-static void CopyFrom(CopyState cstate);
+static uint64 CopyFrom(CopyState cstate);
 static bool CopyReadLine(CopyState cstate);
 static bool CopyReadLineText(CopyState cstate);
 static int CopyReadAttributesText(CopyState cstate);
@@ -700,6 +720,102 @@ CopyLoadRawBuf(CopyState cstate)
  * input/output stream. The latter could be either stdin/stdout or a
  * socket, depending on whether we're running under Postmaster control.
  *
+ * Do not allow a Postgres user without superuser privilege to read from
+ * or write to a file.
+ *
+ * Do not allow the copy if user doesn't have proper permission to access
+ * the table or the specifically requested columns.
+ */
+uint64
+DoCopy(const CopyStmt *stmt, const char *queryString)
+{
+       CopyState       cstate;
+       bool            is_from = stmt->is_from;
+       bool            pipe = (stmt->filename == NULL);
+       Relation        rel;
+       uint64          processed;
+
+       /* Disallow file COPY except to superusers. */
+       if (!pipe && !superuser())
+               ereport(ERROR,
+                               (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+                                errmsg("must be superuser to COPY to or from a file"),
+                                errhint("Anyone can COPY to stdout or from stdin. "
+                                                "psql's \\copy command also works for anyone.")));
+
+       if (stmt->relation)
+       {
+               TupleDesc               tupDesc;
+               AclMode                 required_access = (is_from ? ACL_INSERT : ACL_SELECT);
+               RangeTblEntry  *rte;
+               List               *attnums;
+               ListCell           *cur;
+
+               Assert(!stmt->query);
+
+               /* Open and lock the relation, using the appropriate lock type. */
+               rel = heap_openrv(stmt->relation,
+                                                        (is_from ? RowExclusiveLock : AccessShareLock));
+
+               rte = makeNode(RangeTblEntry);
+               rte->rtekind = RTE_RELATION;
+               rte->relid = RelationGetRelid(rel);
+               rte->requiredPerms = required_access;
+
+               tupDesc = RelationGetDescr(rel);
+               attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist);
+               foreach (cur, attnums)
+               {
+                       int             attno = lfirst_int(cur) -
+                                                       FirstLowInvalidHeapAttributeNumber;
+
+                       if (is_from)
+                               rte->modifiedCols = bms_add_member(rte->modifiedCols, attno);
+                       else
+                               rte->selectedCols = bms_add_member(rte->selectedCols, attno);
+               }
+               ExecCheckRTPerms(list_make1(rte), true);
+       }
+       else
+       {
+               Assert(stmt->query);
+
+               rel = NULL;
+       }
+
+       if (is_from)
+       {
+               /* check read-only transaction */
+               if (XactReadOnly && rel->rd_backend != MyBackendId)
+                       PreventCommandIfReadOnly("COPY FROM");
+
+               cstate = BeginCopyFrom(rel, stmt->filename,
+                                                          stmt->attlist, stmt->options);
+               processed = CopyFrom(cstate);   /* copy from file to database */
+               EndCopyFrom(cstate);
+       }
+       else
+       {
+               cstate = BeginCopyTo(rel, stmt->query, queryString, stmt->filename,
+                                                        stmt->attlist, stmt->options);
+               processed = DoCopyTo(cstate);   /* copy from database to file */
+               EndCopyTo(cstate);
+       }
+
+       /*
+        * Close the relation. If reading, we can release the AccessShareLock we
+        * got; if writing, we should hold the lock until end of transaction to
+        * ensure that updates will be committed before lock is released.
+        */
+       if (rel != NULL)
+               heap_close(rel, (is_from ? NoLock : AccessShareLock));
+
+       return processed;
+}
+
+/*
+ * Common setup routines used by BeginCopyFrom and BeginCopyTo.
+ *
  * Iff <binary>, unload or reload in the binary format, as opposed to the
  * more wasteful but more robust and portable text format.
  *
@@ -711,35 +827,42 @@ CopyLoadRawBuf(CopyState cstate)
  *
  * If in the text format, delimit columns with delimiter <delim> and print
  * NULL values as <null_print>.
- *
- * Do not allow a Postgres user without superuser privilege to read from
- * or write to a file.
- *
- * Do not allow the copy if user doesn't have proper permission to access
- * the table or the specifically requested columns.
  */
-uint64
-DoCopy(const CopyStmt *stmt, const char *queryString)
+static CopyState
+BeginCopy(bool is_from,
+                 Relation rel,
+                 Node *raw_query,
+                 const char *queryString,
+                 List *attnamelist,
+                 List *options)
 {
        CopyState       cstate;
-       bool            is_from = stmt->is_from;
-       bool            pipe = (stmt->filename == NULL);
-       List       *attnamelist = stmt->attlist;
        List       *force_quote = NIL;
        List       *force_notnull = NIL;
        bool            force_quote_all = false;
        bool            format_specified = false;
-       AclMode         required_access = (is_from ? ACL_INSERT : ACL_SELECT);
        ListCell   *option;
        TupleDesc       tupDesc;
        int                     num_phys_attrs;
-       uint64          processed;
+       MemoryContext oldcontext;
 
        /* Allocate workspace and zero all fields */
        cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
 
+       /*
+        * We allocate everything used by a cstate in a new memory context.
+        * This would avoid memory leaks repeated uses of COPY in a query.
+        */
+       cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
+                                                                                               "COPY",
+                                                                                               ALLOCSET_DEFAULT_MINSIZE,
+                                                                                               ALLOCSET_DEFAULT_INITSIZE,
+                                                                                               ALLOCSET_DEFAULT_MAXSIZE);
+
+       oldcontext = MemoryContextSwitchTo(cstate->copycontext);
+
        /* Extract options from the statement node tree */
-       foreach(option, stmt->options)
+       foreach(option, options)
        {
                DefElem    *defel = (DefElem *) lfirst(option);
 
@@ -980,51 +1103,14 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
                                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                 errmsg("CSV quote character must not appear in the NULL specification")));
 
-       /* Disallow file COPY except to superusers. */
-       if (!pipe && !superuser())
-               ereport(ERROR,
-                               (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
-                                errmsg("must be superuser to COPY to or from a file"),
-                                errhint("Anyone can COPY to stdout or from stdin. "
-                                                "psql's \\copy command also works for anyone.")));
-
-       if (stmt->relation)
+       if (rel)
        {
-               RangeTblEntry  *rte;
-               List               *attnums;
-               ListCell           *cur;
-
-               Assert(!stmt->query);
-               cstate->queryDesc = NULL;
+               Assert(!raw_query);
 
-               /* Open and lock the relation, using the appropriate lock type. */
-               cstate->rel = heap_openrv(stmt->relation,
-                                                        (is_from ? RowExclusiveLock : AccessShareLock));
+               cstate->rel = rel;
 
                tupDesc = RelationGetDescr(cstate->rel);
 
-               /* Check relation permissions. */
-               rte = makeNode(RangeTblEntry);
-               rte->rtekind = RTE_RELATION;
-               rte->relid = RelationGetRelid(cstate->rel);
-               rte->requiredPerms = required_access;
-
-               attnums = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
-               foreach (cur, attnums)
-               {
-                       int             attno = lfirst_int(cur) - FirstLowInvalidHeapAttributeNumber;
-
-                       if (is_from)
-                               rte->modifiedCols = bms_add_member(rte->modifiedCols, attno);
-                       else
-                               rte->selectedCols = bms_add_member(rte->selectedCols, attno);
-               }
-               ExecCheckRTPerms(list_make1(rte), true);
-
-               /* check read-only transaction */
-               if (XactReadOnly && is_from && cstate->rel->rd_backend != MyBackendId)
-                       PreventCommandIfReadOnly("COPY FROM");
-
                /* Don't allow COPY w/ OIDs to or from a table without them */
                if (cstate->oids && !cstate->rel->rd_rel->relhasoids)
                        ereport(ERROR,
@@ -1058,7 +1144,7 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
                 * function and is executed repeatedly.  (See also the same hack in
                 * DECLARE CURSOR and PREPARE.)  XXX FIXME someday.
                 */
-               rewritten = pg_analyze_and_rewrite((Node *) copyObject(stmt->query),
+               rewritten = pg_analyze_and_rewrite((Node *) copyObject(raw_query),
                                                                                   queryString, NULL, 0);
 
                /* We don't expect more or less than one result query */
@@ -1160,14 +1246,6 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
                }
        }
 
-       /* Set up variables to avoid per-attribute overhead. */
-       initStringInfo(&cstate->attribute_buf);
-       initStringInfo(&cstate->line_buf);
-       cstate->line_buf_converted = false;
-       cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
-       cstate->raw_buf_index = cstate->raw_buf_len = 0;
-       cstate->processed = 0;
-
        /*
         * Set up encoding conversion info.  Even if the client and server
         * encodings are the same, we must apply pg_client_to_server() to validate
@@ -1181,84 +1259,75 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
        cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->client_encoding);
 
        cstate->copy_dest = COPY_FILE;          /* default */
-       cstate->filename = stmt->filename;
 
-       if (is_from)
-               CopyFrom(cstate);               /* copy from file to database */
-       else
-               DoCopyTo(cstate);               /* copy from database to file */
+       MemoryContextSwitchTo(oldcontext);
 
-       /*
-        * Close the relation or query.  If reading, we can release the
-        * AccessShareLock we got; if writing, we should hold the lock until end
-        * of transaction to ensure that updates will be committed before lock is
-        * released.
-        */
-       if (cstate->rel)
-               heap_close(cstate->rel, (is_from ? NoLock : AccessShareLock));
-       else
-       {
-               /* Close down the query and free resources. */
-               ExecutorEnd(cstate->queryDesc);
-               FreeQueryDesc(cstate->queryDesc);
-               PopActiveSnapshot();
-       }
+       return cstate;
+}
 
-       /* Clean up storage (probably not really necessary) */
-       processed = cstate->processed;
+/*
+ * Release resources allocated in a cstate for COPY TO/FROM.
+ */
+static void
+EndCopy(CopyState cstate)
+{
+       if (cstate->filename != NULL && FreeFile(cstate->copy_file))
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("could not close file \"%s\": %m",
+                                               cstate->filename)));
 
-       pfree(cstate->attribute_buf.data);
-       pfree(cstate->line_buf.data);
-       pfree(cstate->raw_buf);
+       MemoryContextDelete(cstate->copycontext);
        pfree(cstate);
-
-       return processed;
 }
 
-
 /*
- * This intermediate routine exists mainly to localize the effects of setjmp
- * so we don't need to plaster a lot of variables with "volatile".
+ * Setup CopyState to read tuples from a table or a query for COPY TO.
  */
-static void
-DoCopyTo(CopyState cstate)
+static CopyState
+BeginCopyTo(Relation rel,
+                       Node *query,
+                       const char *queryString,
+                       const char *filename,
+                       List *attnamelist,
+                       List *options)
 {
-       bool            pipe = (cstate->filename == NULL);
+       CopyState       cstate;
+       bool            pipe = (filename == NULL);
+       MemoryContext oldcontext;
 
-       if (cstate->rel)
+       if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
        {
-               if (cstate->rel->rd_rel->relkind != RELKIND_RELATION)
-               {
-                       if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
-                               ereport(ERROR,
-                                               (errcode(ERRCODE_WRONG_OBJECT_TYPE),
-                                                errmsg("cannot copy from view \"%s\"",
-                                                               RelationGetRelationName(cstate->rel)),
-                                                errhint("Try the COPY (SELECT ...) TO variant.")));
-                       else if (cstate->rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
-                               ereport(ERROR,
-                                               (errcode(ERRCODE_WRONG_OBJECT_TYPE),
-                                                errmsg("cannot copy from foreign table \"%s\"",
-                                                               RelationGetRelationName(cstate->rel)),
-                                                errhint("Try the COPY (SELECT ...) TO variant.")));
-                       else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
-                               ereport(ERROR,
-                                               (errcode(ERRCODE_WRONG_OBJECT_TYPE),
-                                                errmsg("cannot copy from sequence \"%s\"",
-                                                               RelationGetRelationName(cstate->rel))));
-                       else
-                               ereport(ERROR,
-                                               (errcode(ERRCODE_WRONG_OBJECT_TYPE),
-                                                errmsg("cannot copy from non-table relation \"%s\"",
-                                                               RelationGetRelationName(cstate->rel))));
-               }
+               if (rel->rd_rel->relkind == RELKIND_VIEW)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                                        errmsg("cannot copy from view \"%s\"",
+                                                       RelationGetRelationName(rel)),
+                                        errhint("Try the COPY (SELECT ...) TO variant.")));
+               else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                                        errmsg("cannot copy from foreign table \"%s\"",
+                                                       RelationGetRelationName(rel)),
+                                        errhint("Try the COPY (SELECT ...) TO variant.")));
+               else if (rel->rd_rel->relkind == RELKIND_SEQUENCE)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                                        errmsg("cannot copy from sequence \"%s\"",
+                                                       RelationGetRelationName(rel))));
+               else
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                                        errmsg("cannot copy from non-table relation \"%s\"",
+                                                       RelationGetRelationName(rel))));
        }
 
+       cstate = BeginCopy(false, rel, query, queryString, attnamelist, options);
+       oldcontext = MemoryContextSwitchTo(cstate->copycontext);
+
        if (pipe)
        {
-               if (whereToSendOutput == DestRemote)
-                       cstate->fe_copy = true;
-               else
+               if (whereToSendOutput != DestRemote)
                        cstate->copy_file = stdout;
        }
        else
@@ -1270,11 +1339,12 @@ DoCopyTo(CopyState cstate)
                 * Prevent write to relative path ... too easy to shoot oneself in the
                 * foot by overwriting a database file ...
                 */
-               if (!is_absolute_path(cstate->filename))
+               if (!is_absolute_path(filename))
                        ereport(ERROR,
                                        (errcode(ERRCODE_INVALID_NAME),
                                         errmsg("relative path not allowed for COPY to file")));
 
+               cstate->filename = pstrdup(filename);
                oumask = umask(S_IWGRP | S_IWOTH);
                cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
                umask(oumask);
@@ -1292,14 +1362,30 @@ DoCopyTo(CopyState cstate)
                                         errmsg("\"%s\" is a directory", cstate->filename)));
        }
 
+       MemoryContextSwitchTo(oldcontext);
+
+       return cstate;
+}
+
+/*
+ * This intermediate routine exists mainly to localize the effects of setjmp
+ * so we don't need to plaster a lot of variables with "volatile".
+ */
+static uint64
+DoCopyTo(CopyState cstate)
+{
+       bool            pipe = (cstate->filename == NULL);
+       bool            fe_copy = (pipe && whereToSendOutput == DestRemote);
+       uint64          processed;
+
        PG_TRY();
        {
-               if (cstate->fe_copy)
+               if (fe_copy)
                        SendCopyBegin(cstate);
 
-               CopyTo(cstate);
+               processed = CopyTo(cstate);
 
-               if (cstate->fe_copy)
+               if (fe_copy)
                        SendCopyEnd(cstate);
        }
        PG_CATCH();
@@ -1314,26 +1400,38 @@ DoCopyTo(CopyState cstate)
        }
        PG_END_TRY();
 
-       if (!pipe)
+       return processed;
+}
+
+/*
+ * Clean up storage and release resources for COPY TO.
+ */
+static void
+EndCopyTo(CopyState cstate)
+{
+       if (cstate->queryDesc != NULL)
        {
-               if (FreeFile(cstate->copy_file))
-                       ereport(ERROR,
-                                       (errcode_for_file_access(),
-                                        errmsg("could not close file \"%s\": %m",
-                                                       cstate->filename)));
+               /* Close down the query and free resources. */
+               ExecutorEnd(cstate->queryDesc);
+               FreeQueryDesc(cstate->queryDesc);
+               PopActiveSnapshot();
        }
+
+       /* Clean up storage */
+       EndCopy(cstate);
 }
 
 /*
  * Copy from relation or query TO file.
  */
-static void
+static uint64
 CopyTo(CopyState cstate)
 {
        TupleDesc       tupDesc;
        int                     num_phys_attrs;
        Form_pg_attribute *attr;
        ListCell   *cur;
+       uint64          processed;
 
        if (cstate->rel)
                tupDesc = RelationGetDescr(cstate->rel);
@@ -1439,6 +1537,7 @@ CopyTo(CopyState cstate)
 
                scandesc = heap_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL);
 
+               processed = 0;
                while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL)
                {
                        CHECK_FOR_INTERRUPTS();
@@ -1448,14 +1547,19 @@ CopyTo(CopyState cstate)
 
                        /* Format and send the data */
                        CopyOneRowTo(cstate, HeapTupleGetOid(tuple), values, nulls);
+                       processed++;
                }
 
                heap_endscan(scandesc);
+
+               pfree(values);
+               pfree(nulls);
        }
        else
        {
                /* run the plan --- the dest receiver will send tuples */
                ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L);
+               processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
        }
 
        if (cstate->binary)
@@ -1467,6 +1571,8 @@ CopyTo(CopyState cstate)
        }
 
        MemoryContextDelete(cstate->rowcontext);
+
+       return processed;
 }
 
 /*
@@ -1558,16 +1664,16 @@ CopyOneRowTo(CopyState cstate, Oid tupleOid, Datum *values, bool *nulls)
        CopySendEndOfRow(cstate);
 
        MemoryContextSwitchTo(oldcontext);
-
-       cstate->processed++;
 }
 
 
 /*
  * error context callback for COPY FROM
+ *
+ * The argument for the error context must be CopyState.
  */
-static void
-copy_in_error_callback(void *arg)
+void
+CopyFromErrorCallback(void *arg)
 {
        CopyState       cstate = (CopyState) arg;
 
@@ -1669,41 +1775,23 @@ limit_printout_length(const char *str)
 /*
  * Copy FROM file to relation.
  */
-static void
+static uint64
 CopyFrom(CopyState cstate)
 {
-       bool            pipe = (cstate->filename == NULL);
        HeapTuple       tuple;
        TupleDesc       tupDesc;
-       Form_pg_attribute *attr;
-       AttrNumber      num_phys_attrs,
-                               attr_count,
-                               num_defaults;
-       FmgrInfo   *in_functions;
-       FmgrInfo        oid_in_function;
-       Oid                *typioparams;
-       Oid                     oid_typioparam;
-       int                     attnum;
-       int                     i;
-       Oid                     in_func_oid;
        Datum      *values;
        bool       *nulls;
-       int                     nfields;
-       char      **field_strings;
-       bool            done = false;
-       bool            isnull;
        ResultRelInfo *resultRelInfo;
        EState     *estate = CreateExecutorState(); /* for ExecConstraints() */
+       ExprContext *econtext;
        TupleTableSlot *slot;
-       bool            file_has_oids;
-       int                *defmap;
-       ExprState **defexprs;           /* array of default att expressions */
-       ExprContext *econtext;          /* used for ExecEvalExpr for default atts */
        MemoryContext oldcontext = CurrentMemoryContext;
        ErrorContextCallback errcontext;
        CommandId       mycid = GetCurrentCommandId(true);
        int                     hi_options = 0; /* start with default heap_insert options */
        BulkInsertState bistate;
+       uint64          processed = 0;
 
        Assert(cstate->rel);
 
@@ -1731,6 +1819,8 @@ CopyFrom(CopyState cstate)
                                                        RelationGetRelationName(cstate->rel))));
        }
 
+       tupDesc = RelationGetDescr(cstate->rel);
+
        /*----------
         * Check to see if we can avoid writing WAL
         *
@@ -1766,48 +1856,16 @@ CopyFrom(CopyState cstate)
                        hi_options |= HEAP_INSERT_SKIP_WAL;
        }
 
-       if (pipe)
-       {
-               if (whereToSendOutput == DestRemote)
-                       ReceiveCopyBegin(cstate);
-               else
-                       cstate->copy_file = stdin;
-       }
-       else
-       {
-               struct stat st;
-
-               cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
-
-               if (cstate->copy_file == NULL)
-                       ereport(ERROR,
-                                       (errcode_for_file_access(),
-                                        errmsg("could not open file \"%s\" for reading: %m",
-                                                       cstate->filename)));
-
-               fstat(fileno(cstate->copy_file), &st);
-               if (S_ISDIR(st.st_mode))
-                       ereport(ERROR,
-                                       (errcode(ERRCODE_WRONG_OBJECT_TYPE),
-                                        errmsg("\"%s\" is a directory", cstate->filename)));
-       }
-
-       tupDesc = RelationGetDescr(cstate->rel);
-       attr = tupDesc->attrs;
-       num_phys_attrs = tupDesc->natts;
-       attr_count = list_length(cstate->attnumlist);
-       num_defaults = 0;
-
-       /*
-        * We need a ResultRelInfo so we can use the regular executor's
-        * index-entry-making machinery.  (There used to be a huge amount of code
-        * here that basically duplicated execUtils.c ...)
-        */
-       resultRelInfo = makeNode(ResultRelInfo);
-       resultRelInfo->ri_RangeTableIndex = 1;          /* dummy */
-       resultRelInfo->ri_RelationDesc = cstate->rel;
-       resultRelInfo->ri_TrigDesc = CopyTriggerDesc(cstate->rel->trigdesc);
-       if (resultRelInfo->ri_TrigDesc)
+       /*
+        * We need a ResultRelInfo so we can use the regular executor's
+        * index-entry-making machinery.  (There used to be a huge amount of code
+        * here that basically duplicated execUtils.c ...)
+        */
+       resultRelInfo = makeNode(ResultRelInfo);
+       resultRelInfo->ri_RangeTableIndex = 1;          /* dummy */
+       resultRelInfo->ri_RelationDesc = cstate->rel;
+       resultRelInfo->ri_TrigDesc = CopyTriggerDesc(cstate->rel->trigdesc);
+       if (resultRelInfo->ri_TrigDesc)
        {
                resultRelInfo->ri_TrigFunctions = (FmgrInfo *)
                        palloc0(resultRelInfo->ri_TrigDesc->numtriggers * sizeof(FmgrInfo));
@@ -1826,8 +1884,191 @@ CopyFrom(CopyState cstate)
        slot = ExecInitExtraTupleSlot(estate);
        ExecSetSlotDescriptor(slot, tupDesc);
 
+       /* Prepare to catch AFTER triggers. */
+       AfterTriggerBeginQuery();
+
+       /*
+        * Check BEFORE STATEMENT insertion triggers. It's debateable whether we
+        * should do this for COPY, since it's not really an "INSERT" statement as
+        * such. However, executing these triggers maintains consistency with the
+        * EACH ROW triggers that we already fire on COPY.
+        */
+       ExecBSInsertTriggers(estate, resultRelInfo);
+
+       values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
+       nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
+
+       bistate = GetBulkInsertState();
        econtext = GetPerTupleExprContext(estate);
 
+       /* Set up callback to identify error line number */
+       errcontext.callback = CopyFromErrorCallback;
+       errcontext.arg = (void *) cstate;
+       errcontext.previous = error_context_stack;
+       error_context_stack = &errcontext;
+
+       for (;;)
+       {
+               bool            skip_tuple;
+               Oid                     loaded_oid = InvalidOid;
+
+               CHECK_FOR_INTERRUPTS();
+
+               /* Reset the per-tuple exprcontext */
+               ResetPerTupleExprContext(estate);
+
+               /* Switch into its memory context */
+               MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+
+               if (!NextCopyFrom(cstate, econtext, values, nulls, &loaded_oid))
+                       break;
+
+               /* And now we can form the input tuple. */
+               tuple = heap_form_tuple(tupDesc, values, nulls);
+
+               if (loaded_oid != InvalidOid)
+                       HeapTupleSetOid(tuple, loaded_oid);
+
+               /* Triggers and stuff need to be invoked in query context. */
+               MemoryContextSwitchTo(oldcontext);
+
+               skip_tuple = false;
+
+               /* BEFORE ROW INSERT Triggers */
+               if (resultRelInfo->ri_TrigDesc &&
+                       resultRelInfo->ri_TrigDesc->trig_insert_before_row)
+               {
+                       HeapTuple       newtuple;
+
+                       newtuple = ExecBRInsertTriggers(estate, resultRelInfo, tuple);
+
+                       if (newtuple == NULL)           /* "do nothing" */
+                               skip_tuple = true;
+                       else if (newtuple != tuple) /* modified by Trigger(s) */
+                       {
+                               heap_freetuple(tuple);
+                               tuple = newtuple;
+                       }
+               }
+
+               if (!skip_tuple)
+               {
+                       List       *recheckIndexes = NIL;
+
+                       /* Place tuple in tuple slot */
+                       ExecStoreTuple(tuple, slot, InvalidBuffer, false);
+
+                       /* Check the constraints of the tuple */
+                       if (cstate->rel->rd_att->constr)
+                               ExecConstraints(resultRelInfo, slot, estate);
+
+                       /* OK, store the tuple and create index entries for it */
+                       heap_insert(cstate->rel, tuple, mycid, hi_options, bistate);
+
+                       if (resultRelInfo->ri_NumIndices > 0)
+                               recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
+                                                                                                          estate);
+
+                       /* AFTER ROW INSERT Triggers */
+                       ExecARInsertTriggers(estate, resultRelInfo, tuple,
+                                                                recheckIndexes);
+
+                       list_free(recheckIndexes);
+
+                       /*
+                        * We count only tuples not suppressed by a BEFORE INSERT trigger;
+                        * this is the same definition used by execMain.c for counting
+                        * tuples inserted by an INSERT command.
+                        */
+                       processed++;
+               }
+       }
+
+       /* Done, clean up */
+       error_context_stack = errcontext.previous;
+
+       FreeBulkInsertState(bistate);
+
+       MemoryContextSwitchTo(oldcontext);
+
+       /* Execute AFTER STATEMENT insertion triggers */
+       ExecASInsertTriggers(estate, resultRelInfo);
+
+       /* Handle queued AFTER triggers */
+       AfterTriggerEndQuery(estate);
+
+       pfree(values);
+       pfree(nulls);
+
+       ExecResetTupleTable(estate->es_tupleTable, false);
+
+       ExecCloseIndices(resultRelInfo);
+
+       FreeExecutorState(estate);
+
+       /*
+        * If we skipped writing WAL, then we need to sync the heap (but not
+        * indexes since those use WAL anyway)
+        */
+       if (hi_options & HEAP_INSERT_SKIP_WAL)
+               heap_sync(cstate->rel);
+
+       return processed;
+}
+
+/*
+ * Setup to read tuples from a file for COPY FROM.
+ *
+ * 'rel': Used as a template for the tuples
+ * 'filename': Name of server-local file to read
+ * 'attnamelist': List of char *, columns to include. NIL selects all cols.
+ * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
+ *
+ * Returns a CopyState, to be passed to NextCopyFrom and related functions.
+ */
+CopyState
+BeginCopyFrom(Relation rel,
+                         const char *filename,
+                         List *attnamelist,
+                         List *options)
+{
+       CopyState       cstate;
+       bool            pipe = (filename == NULL);
+       TupleDesc       tupDesc;
+       Form_pg_attribute *attr;
+       AttrNumber      num_phys_attrs,
+                               num_defaults;
+       FmgrInfo   *in_functions;
+       Oid                *typioparams;
+       int                     attnum;
+       Oid                     in_func_oid;
+       int                *defmap;
+       ExprState **defexprs;
+       MemoryContext oldcontext;
+
+       cstate = BeginCopy(true, rel, NULL, NULL, attnamelist, options);
+       oldcontext = MemoryContextSwitchTo(cstate->copycontext);
+
+       /* Initialize state variables */
+       cstate->fe_eof = false;
+       cstate->eol_type = EOL_UNKNOWN;
+       cstate->cur_relname = RelationGetRelationName(cstate->rel);
+       cstate->cur_lineno = 0;
+       cstate->cur_attname = NULL;
+       cstate->cur_attval = NULL;
+
+       /* Set up variables to avoid per-attribute overhead. */
+       initStringInfo(&cstate->attribute_buf);
+       initStringInfo(&cstate->line_buf);
+       cstate->line_buf_converted = false;
+       cstate->raw_buf = (char *) palloc(RAW_BUF_SIZE + 1);
+       cstate->raw_buf_index = cstate->raw_buf_len = 0;
+
+       tupDesc = RelationGetDescr(cstate->rel);
+       attr = tupDesc->attrs;
+       num_phys_attrs = tupDesc->natts;
+       num_defaults = 0;
+
        /*
         * Pick up the required catalog information for each attribute in the
         * relation, including the input function, the element type (to pass to
@@ -1863,27 +2104,54 @@ CopyFrom(CopyState cstate)
 
                        if (defexpr != NULL)
                        {
-                               defexprs[num_defaults] = ExecPrepareExpr((Expr *) defexpr,
-                                                                                                                estate);
+                               /* Initialize expressions in copycontext. */
+                               defexprs[num_defaults] = ExecInitExpr(
+                                                               expression_planner((Expr *) defexpr), NULL);
                                defmap[num_defaults] = attnum - 1;
                                num_defaults++;
                        }
                }
        }
 
-       /* Prepare to catch AFTER triggers. */
-       AfterTriggerBeginQuery();
+       /* We keep those variables in cstate. */
+       cstate->in_functions = in_functions;
+       cstate->typioparams = typioparams;
+       cstate->defmap = defmap;
+       cstate->defexprs = defexprs;
+       cstate->num_defaults = num_defaults;
 
-       /*
-        * Check BEFORE STATEMENT insertion triggers. It's debateable whether we
-        * should do this for COPY, since it's not really an "INSERT" statement as
-        * such. However, executing these triggers maintains consistency with the
-        * EACH ROW triggers that we already fire on COPY.
-        */
-       ExecBSInsertTriggers(estate, resultRelInfo);
+       if (pipe)
+       {
+               if (whereToSendOutput == DestRemote)
+                       ReceiveCopyBegin(cstate);
+               else
+                       cstate->copy_file = stdin;
+       }
+       else
+       {
+               struct stat st;
+
+               cstate->filename = pstrdup(filename);
+               cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
+
+               if (cstate->copy_file == NULL)
+                       ereport(ERROR,
+                                       (errcode_for_file_access(),
+                                        errmsg("could not open file \"%s\" for reading: %m",
+                                                       cstate->filename)));
+
+               fstat(fileno(cstate->copy_file), &st);
+               if (S_ISDIR(st.st_mode))
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                                        errmsg("\"%s\" is a directory", cstate->filename)));
+       }
 
        if (!cstate->binary)
-               file_has_oids = cstate->oids;   /* must rely on user to tell us... */
+       {
+               /* must rely on user to tell us... */
+               cstate->file_has_oids = cstate->oids;
+       }
        else
        {
                /* Read and verify binary header */
@@ -1901,7 +2169,7 @@ CopyFrom(CopyState cstate)
                        ereport(ERROR,
                                        (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
                                         errmsg("invalid COPY file header (missing flags)")));
-               file_has_oids = (tmp & (1 << 16)) != 0;
+               cstate->file_has_oids = (tmp & (1 << 16)) != 0;
                tmp &= ~(1 << 16);
                if ((tmp >> 16) != 0)
                        ereport(ERROR,
@@ -1923,358 +2191,315 @@ CopyFrom(CopyState cstate)
                }
        }
 
-       if (file_has_oids && cstate->binary)
+       if (cstate->file_has_oids && cstate->binary)
        {
                getTypeBinaryInputInfo(OIDOID,
-                                                          &in_func_oid, &oid_typioparam);
-               fmgr_info(in_func_oid, &oid_in_function);
+                                                          &in_func_oid, &cstate->oid_typioparam);
+               fmgr_info(in_func_oid, &cstate->oid_in_function);
        }
 
-       values = (Datum *) palloc(num_phys_attrs * sizeof(Datum));
-       nulls = (bool *) palloc(num_phys_attrs * sizeof(bool));
-
        /* create workspace for CopyReadAttributes results */
-       nfields = file_has_oids ? (attr_count + 1) : attr_count;
-       if (! cstate->binary)
+       if (!cstate->binary)
        {
+               AttrNumber      attr_count = list_length(cstate->attnumlist);
+               int     nfields = cstate->file_has_oids ? (attr_count + 1) : attr_count;
+
                cstate->max_fields = nfields;
                cstate->raw_fields = (char **) palloc(nfields * sizeof(char *));
        }
 
-       /* Initialize state variables */
-       cstate->fe_eof = false;
-       cstate->eol_type = EOL_UNKNOWN;
-       cstate->cur_relname = RelationGetRelationName(cstate->rel);
-       cstate->cur_lineno = 0;
-       cstate->cur_attname = NULL;
-       cstate->cur_attval = NULL;
+       MemoryContextSwitchTo(oldcontext);
 
-       bistate = GetBulkInsertState();
+       return cstate;
+}
 
-       /* Set up callback to identify error line number */
-       errcontext.callback = copy_in_error_callback;
-       errcontext.arg = (void *) cstate;
-       errcontext.previous = error_context_stack;
-       error_context_stack = &errcontext;
+/*
+ * Read raw fields in the next line for COPY FROM in text or csv mode.
+ * Return false if no more lines.
+ *
+ * An internal temporary buffer is returned via 'fields'. It is valid until
+ * the next call of the function. Since the function returns all raw fields
+ * in the input file, 'nfields' could be different from the number of columns
+ * in the relation.
+ *
+ * NOTE: force_not_null option are not applied to the returned fields.
+ */
+bool
+NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields)
+{
+       int                     fldct;
+       bool            done;
+
+       /* only available for text or csv input */
+       Assert(!cstate->binary);
 
        /* on input just throw the header line away */
-       if (cstate->header_line)
+       if (cstate->cur_lineno == 0 && cstate->header_line)
        {
                cstate->cur_lineno++;
-               done = CopyReadLine(cstate);
+               if (CopyReadLine(cstate))
+                       return false;   /* done */
        }
 
-       while (!done)
-       {
-               bool            skip_tuple;
-               Oid                     loaded_oid = InvalidOid;
+       cstate->cur_lineno++;
 
-               CHECK_FOR_INTERRUPTS();
+       /* Actually read the line into memory here */
+       done = CopyReadLine(cstate);
 
-               cstate->cur_lineno++;
+       /*
+        * EOF at start of line means we're done.  If we see EOF after
+        * some characters, we act as though it was newline followed by
+        * EOF, ie, process the line and then exit loop on next iteration.
+        */
+       if (done && cstate->line_buf.len == 0)
+               return false;
 
-               /* Reset the per-tuple exprcontext */
-               ResetPerTupleExprContext(estate);
+       /* Parse the line into de-escaped field values */
+       if (cstate->csv_mode)
+               fldct = CopyReadAttributesCSV(cstate);
+       else
+               fldct = CopyReadAttributesText(cstate);
 
-               /* Switch into its memory context */
-               MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+       *fields = cstate->raw_fields;
+       *nfields = fldct;
+       return true;
+}
 
-               /* Initialize all values for row to NULL */
-               MemSet(values, 0, num_phys_attrs * sizeof(Datum));
-               MemSet(nulls, true, num_phys_attrs * sizeof(bool));
+/*
+ * Read next tuple from file for COPY FROM. Return false if no more tuples.
+ *
+ * 'econtext' is used to evaluate default expression for each columns not
+ * read from the file. It can be NULL when no default values are used, i.e.
+ * when all columns are read from the file.
+ *
+ * 'values' and 'nulls' arrays must be the same length as columns of the
+ * relation passed to BeginCopyFrom. This function fills the arrays.
+ * Oid of the tuple is returned with 'tupleOid' separately.
+ */
+bool
+NextCopyFrom(CopyState cstate, ExprContext *econtext,
+                        Datum *values, bool *nulls, Oid *tupleOid)
+{
+       TupleDesc       tupDesc;
+       Form_pg_attribute *attr;
+       AttrNumber      num_phys_attrs,
+                               attr_count,
+                               num_defaults = cstate->num_defaults;
+       FmgrInfo   *in_functions = cstate->in_functions;
+       Oid                *typioparams = cstate->typioparams;
+       int                     i;
+       int         nfields;
+       bool            isnull;
+       bool            file_has_oids = cstate->file_has_oids;
+       int                *defmap = cstate->defmap;
+       ExprState **defexprs = cstate->defexprs;
 
-               if (!cstate->binary)
-               {
-                       ListCell   *cur;
-                       int                     fldct;
-                       int                     fieldno;
-                       char       *string;
+       tupDesc = RelationGetDescr(cstate->rel);
+       attr = tupDesc->attrs;
+       num_phys_attrs = tupDesc->natts;
+       attr_count = list_length(cstate->attnumlist);
+       nfields = file_has_oids ? (attr_count + 1) : attr_count;
 
-                       /* Actually read the line into memory here */
-                       done = CopyReadLine(cstate);
+       /* Initialize all values for row to NULL */
+       MemSet(values, 0, num_phys_attrs * sizeof(Datum));
+       MemSet(nulls, true, num_phys_attrs * sizeof(bool));
 
-                       /*
-                        * EOF at start of line means we're done.  If we see EOF after
-                        * some characters, we act as though it was newline followed by
-                        * EOF, ie, process the line and then exit loop on next iteration.
-                        */
-                       if (done && cstate->line_buf.len == 0)
-                               break;
+       if (!cstate->binary)
+       {
+               char      **field_strings;
+               ListCell   *cur;
+               int                     fldct;
+               int                     fieldno;
+               char       *string;
 
-                       /* Parse the line into de-escaped field values */
-                       if (cstate->csv_mode)
-                               fldct = CopyReadAttributesCSV(cstate);
-                       else
-                               fldct = CopyReadAttributesText(cstate);
+               /* read raw fields in the next line */
+               if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
+                       return false;
+
+               /* check for overflowing fields */
+               if (nfields > 0 && fldct > nfields)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                                        errmsg("extra data after last expected column")));
 
-                       /* check for overflowing fields */
-                       if (nfields > 0 && fldct > nfields)
+               fieldno = 0;
+
+               /* Read the OID field if present */
+               if (file_has_oids)
+               {
+                       if (fieldno >= fldct)
                                ereport(ERROR,
                                                (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                                                errmsg("extra data after last expected column")));
+                                                errmsg("missing data for OID column")));
+                       string = field_strings[fieldno++];
 
-                       fieldno = 0;
-                       field_strings = cstate->raw_fields;
-
-                       /* Read the OID field if present */
-                       if (file_has_oids)
+                       if (string == NULL)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                                                errmsg("null OID in COPY data")));
+                       else if (cstate->oids && tupleOid != NULL)
                        {
-                               if (fieldno >= fldct)
-                                       ereport(ERROR,
-                                                       (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                                                        errmsg("missing data for OID column")));
-                               string = field_strings[fieldno++];
-
-                               if (string == NULL)
-                                       ereport(ERROR,
-                                                       (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                                                        errmsg("null OID in COPY data")));
-                               else
-                               {
-                                       cstate->cur_attname = "oid";
-                                       cstate->cur_attval = string;
-                                       loaded_oid = DatumGetObjectId(DirectFunctionCall1(oidin,
+                               cstate->cur_attname = "oid";
+                               cstate->cur_attval = string;
+                               *tupleOid = DatumGetObjectId(DirectFunctionCall1(oidin,
                                                                                                   CStringGetDatum(string)));
-                                       if (loaded_oid == InvalidOid)
-                                               ereport(ERROR,
-                                                               (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                                                                errmsg("invalid OID in COPY data")));
-                                       cstate->cur_attname = NULL;
-                                       cstate->cur_attval = NULL;
-                               }
-                       }
-
-                       /* Loop to read the user attributes on the line. */
-                       foreach(cur, cstate->attnumlist)
-                       {
-                               int                     attnum = lfirst_int(cur);
-                               int                     m = attnum - 1;
-
-                               if (fieldno >= fldct)
+                               if (*tupleOid == InvalidOid)
                                        ereport(ERROR,
                                                        (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                                                        errmsg("missing data for column \"%s\"",
-                                                                       NameStr(attr[m]->attname))));
-                               string = field_strings[fieldno++];
-
-                               if (cstate->csv_mode && string == NULL &&
-                                       cstate->force_notnull_flags[m])
-                               {
-                                       /* Go ahead and read the NULL string */
-                                       string = cstate->null_print;
-                               }
-
-                               cstate->cur_attname = NameStr(attr[m]->attname);
-                               cstate->cur_attval = string;
-                               values[m] = InputFunctionCall(&in_functions[m],
-                                                                                         string,
-                                                                                         typioparams[m],
-                                                                                         attr[m]->atttypmod);
-                               if (string != NULL)
-                                       nulls[m] = false;
+                                                        errmsg("invalid OID in COPY data")));
                                cstate->cur_attname = NULL;
                                cstate->cur_attval = NULL;
                        }
-
-                       Assert(fieldno == nfields);
                }
-               else
-               {
-                       /* binary */
-                       int16           fld_count;
-                       ListCell   *cur;
 
-                       if (!CopyGetInt16(cstate, &fld_count))
-                       {
-                               /* EOF detected (end of file, or protocol-level EOF) */
-                               done = true;
-                               break;
-                       }
-
-                       if (fld_count == -1)
-                       {
-                               /*
-                                * Received EOF marker.  In a V3-protocol copy, wait for
-                                * the protocol-level EOF, and complain if it doesn't come
-                                * immediately.  This ensures that we correctly handle
-                                * CopyFail, if client chooses to send that now.
-                                *
-                                * Note that we MUST NOT try to read more data in an
-                                * old-protocol copy, since there is no protocol-level EOF
-                                * marker then.  We could go either way for copy from file,
-                                * but choose to throw error if there's data after the EOF
-                                * marker, for consistency with the new-protocol case.
-                                */
-                               char    dummy;
-
-                               if (cstate->copy_dest != COPY_OLD_FE &&
-                                       CopyGetData(cstate, &dummy, 1, 1) > 0)
-                                       ereport(ERROR,
-                                                       (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                                                        errmsg("received copy data after EOF marker")));
-                               done = true;
-                               break;
-                       }
+               /* Loop to read the user attributes on the line. */
+               foreach(cur, cstate->attnumlist)
+               {
+                       int                     attnum = lfirst_int(cur);
+                       int                     m = attnum - 1;
 
-                       if (fld_count != attr_count)
+                       if (fieldno >= fldct)
                                ereport(ERROR,
                                                (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                                                errmsg("row field count is %d, expected %d",
-                                                               (int) fld_count, attr_count)));
-
-                       if (file_has_oids)
-                       {
-                               cstate->cur_attname = "oid";
-                               loaded_oid =
-                                       DatumGetObjectId(CopyReadBinaryAttribute(cstate,
-                                                                                                                        0,
-                                                                                                                        &oid_in_function,
-                                                                                                                        oid_typioparam,
-                                                                                                                        -1,
-                                                                                                                        &isnull));
-                               if (isnull || loaded_oid == InvalidOid)
-                                       ereport(ERROR,
-                                                       (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-                                                        errmsg("invalid OID in COPY data")));
-                               cstate->cur_attname = NULL;
-                       }
+                                                errmsg("missing data for column \"%s\"",
+                                                               NameStr(attr[m]->attname))));
+                       string = field_strings[fieldno++];
 
-                       i = 0;
-                       foreach(cur, cstate->attnumlist)
+                       if (cstate->csv_mode && string == NULL &&
+                               cstate->force_notnull_flags[m])
                        {
-                               int                     attnum = lfirst_int(cur);
-                               int                     m = attnum - 1;
-
-                               cstate->cur_attname = NameStr(attr[m]->attname);
-                               i++;
-                               values[m] = CopyReadBinaryAttribute(cstate,
-                                                                                                       i,
-                                                                                                       &in_functions[m],
-                                                                                                       typioparams[m],
-                                                                                                       attr[m]->atttypmod,
-                                                                                                       &nulls[m]);
-                               cstate->cur_attname = NULL;
+                               /* Go ahead and read the NULL string */
+                               string = cstate->null_print;
                        }
-               }
 
-               /*
-                * Now compute and insert any defaults available for the columns not
-                * provided by the input data.  Anything not processed here or above
-                * will remain NULL.
-                */
-               for (i = 0; i < num_defaults; i++)
-               {
-                       values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext,
-                                                                                        &nulls[defmap[i]], NULL);
+                       cstate->cur_attname = NameStr(attr[m]->attname);
+                       cstate->cur_attval = string;
+                       values[m] = InputFunctionCall(&in_functions[m],
+                                                                                 string,
+                                                                                 typioparams[m],
+                                                                                 attr[m]->atttypmod);
+                       if (string != NULL)
+                               nulls[m] = false;
+                       cstate->cur_attname = NULL;
+                       cstate->cur_attval = NULL;
                }
 
-               /* And now we can form the input tuple. */
-               tuple = heap_form_tuple(tupDesc, values, nulls);
-
-               if (cstate->oids && file_has_oids)
-                       HeapTupleSetOid(tuple, loaded_oid);
-
-               /* Triggers and stuff need to be invoked in query context. */
-               MemoryContextSwitchTo(oldcontext);
+               Assert(fieldno == nfields);
+       }
+       else
+       {
+               /* binary */
+               int16           fld_count;
+               ListCell   *cur;
 
-               skip_tuple = false;
+               cstate->cur_lineno++;
 
-               /* BEFORE ROW INSERT Triggers */
-               if (resultRelInfo->ri_TrigDesc &&
-                       resultRelInfo->ri_TrigDesc->trig_insert_before_row)
+               if (!CopyGetInt16(cstate, &fld_count))
                {
-                       HeapTuple       newtuple;
-
-                       newtuple = ExecBRInsertTriggers(estate, resultRelInfo, tuple);
-
-                       if (newtuple == NULL)           /* "do nothing" */
-                               skip_tuple = true;
-                       else if (newtuple != tuple) /* modified by Trigger(s) */
-                       {
-                               heap_freetuple(tuple);
-                               tuple = newtuple;
-                       }
+                       /* EOF detected (end of file, or protocol-level EOF) */
+                       return false;
                }
 
-               if (!skip_tuple)
+               if (fld_count == -1)
                {
-                       List       *recheckIndexes = NIL;
-
-                       /* Place tuple in tuple slot */
-                       ExecStoreTuple(tuple, slot, InvalidBuffer, false);
-
-                       /* Check the constraints of the tuple */
-                       if (cstate->rel->rd_att->constr)
-                               ExecConstraints(resultRelInfo, slot, estate);
-
-                       /* OK, store the tuple and create index entries for it */
-                       heap_insert(cstate->rel, tuple, mycid, hi_options, bistate);
-
-                       if (resultRelInfo->ri_NumIndices > 0)
-                               recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
-                                                                                                          estate);
-
-                       /* AFTER ROW INSERT Triggers */
-                       ExecARInsertTriggers(estate, resultRelInfo, tuple,
-                                                                recheckIndexes);
-
-                       list_free(recheckIndexes);
-
                        /*
-                        * We count only tuples not suppressed by a BEFORE INSERT trigger;
-                        * this is the same definition used by execMain.c for counting
-                        * tuples inserted by an INSERT command.
+                        * Received EOF marker.  In a V3-protocol copy, wait for
+                        * the protocol-level EOF, and complain if it doesn't come
+                        * immediately.  This ensures that we correctly handle
+                        * CopyFail, if client chooses to send that now.
+                        *
+                        * Note that we MUST NOT try to read more data in an
+                        * old-protocol copy, since there is no protocol-level EOF
+                        * marker then.  We could go either way for copy from file,
+                        * but choose to throw error if there's data after the EOF
+                        * marker, for consistency with the new-protocol case.
                         */
-                       cstate->processed++;
-               }
-       }
-
-       /* Done, clean up */
-       error_context_stack = errcontext.previous;
-
-       FreeBulkInsertState(bistate);
-
-       MemoryContextSwitchTo(oldcontext);
-
-       /* Execute AFTER STATEMENT insertion triggers */
-       ExecASInsertTriggers(estate, resultRelInfo);
-
-       /* Handle queued AFTER triggers */
-       AfterTriggerEndQuery(estate);
-
-       pfree(values);
-       pfree(nulls);
-       if (! cstate->binary)
-               pfree(cstate->raw_fields);
+                       char    dummy;
 
-       pfree(in_functions);
-       pfree(typioparams);
-       pfree(defmap);
-       pfree(defexprs);
-
-       ExecResetTupleTable(estate->es_tupleTable, false);
+                       if (cstate->copy_dest != COPY_OLD_FE &&
+                               CopyGetData(cstate, &dummy, 1, 1) > 0)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                                                errmsg("received copy data after EOF marker")));
+                       return false;
+               }
 
-       ExecCloseIndices(resultRelInfo);
+               if (fld_count != attr_count)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                                        errmsg("row field count is %d, expected %d",
+                                                       (int) fld_count, attr_count)));
 
-       FreeExecutorState(estate);
+               if (file_has_oids)
+               {
+                       Oid             loaded_oid;
+
+                       cstate->cur_attname = "oid";
+                       loaded_oid =
+                               DatumGetObjectId(CopyReadBinaryAttribute(cstate,
+                                                                                                                0,
+                                                                                                                &cstate->oid_in_function,
+                                                                                                                cstate->oid_typioparam,
+                                                                                                                -1,
+                                                                                                                &isnull));
+                       if (isnull || loaded_oid == InvalidOid)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                                                errmsg("invalid OID in COPY data")));
+                       cstate->cur_attname = NULL;
+                       if (cstate->oids && tupleOid != NULL)
+                               *tupleOid = loaded_oid;
+               }
 
-       if (!pipe)
-       {
-               if (FreeFile(cstate->copy_file))
-                       ereport(ERROR,
-                                       (errcode_for_file_access(),
-                                        errmsg("could not close file \"%s\": %m",
-                                                       cstate->filename)));
+               i = 0;
+               foreach(cur, cstate->attnumlist)
+               {
+                       int                     attnum = lfirst_int(cur);
+                       int                     m = attnum - 1;
+
+                       cstate->cur_attname = NameStr(attr[m]->attname);
+                       i++;
+                       values[m] = CopyReadBinaryAttribute(cstate,
+                                                                                               i,
+                                                                                               &in_functions[m],
+                                                                                               typioparams[m],
+                                                                                               attr[m]->atttypmod,
+                                                                                               &nulls[m]);
+                       cstate->cur_attname = NULL;
+               }
        }
 
        /*
-        * If we skipped writing WAL, then we need to sync the heap (but not
-        * indexes since those use WAL anyway)
+        * Now compute and insert any defaults available for the columns not
+        * provided by the input data.  Anything not processed here or above
+        * will remain NULL.
         */
-       if (hi_options & HEAP_INSERT_SKIP_WAL)
-               heap_sync(cstate->rel);
+       for (i = 0; i < num_defaults; i++)
+       {
+               /*
+                * The caller must supply econtext and have switched into the
+                * per-tuple memory context in it.
+                */
+               Assert(econtext != NULL);
+               Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
+
+               values[defmap[i]] = ExecEvalExpr(defexprs[i], econtext,
+                                                                                &nulls[defmap[i]], NULL);
+       }
+
+       return true;
 }
 
+/*
+ * Clean up storage and release resources for COPY FROM.
+ */
+void
+EndCopyFrom(CopyState cstate)
+{
+       /* No COPY FROM related resources except memory. */
+
+       EndCopy(cstate);
+}
 
 /*
  * Read the next input line and stash it in line_buf, with conversion to
@@ -3537,6 +3762,7 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
 
        /* And send the data */
        CopyOneRowTo(cstate, InvalidOid, slot->tts_values, slot->tts_isnull);
+       myState->processed++;
 }
 
 /*
index 9e2bbe8..afe4b5e 100644 (file)
 #ifndef COPY_H
 #define COPY_H
 
+#include "nodes/execnodes.h"
 #include "nodes/parsenodes.h"
 #include "tcop/dest.h"
 
 
+typedef struct CopyStateData  *CopyState;
+
 extern uint64 DoCopy(const CopyStmt *stmt, const char *queryString);
 
+extern CopyState BeginCopyFrom(Relation rel, const char *filename,
+                                                          List *attnamelist, List *options);
+extern void EndCopyFrom(CopyState cstate);
+extern bool NextCopyFrom(CopyState cstate, ExprContext *econtext,
+                                                Datum *values, bool *nulls, Oid *tupleOid);
+extern bool NextCopyFromRawFields(CopyState cstate,
+                                                                 char ***fields, int *nfields);
+extern void CopyFromErrorCallback(void *arg);
+
 extern DestReceiver *CreateCopyDestReceiver(void);
 
 #endif   /* COPY_H */