* Darko Prenosil <Darko.Prenosil@finteh.hr>
* Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
*
- * $PostgreSQL: pgsql/contrib/dblink/dblink.c,v 1.86 2010/01/02 16:57:32 momjian Exp $
+ * $PostgreSQL: pgsql/contrib/dblink/dblink.c,v 1.87 2010/01/24 22:19:38 joe Exp $
* Copyright (c) 2001-2010, PostgreSQL Global Development Group
* ALL RIGHTS RESERVED;
*
* Internal declarations
*/
static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
+static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);
static remoteConn *getConnectionByName(const char *name);
static HTAB *createConnHash(void);
static void createNewConnection(const char *name, remoteConn *rconn);
Datum
dblink_fetch(PG_FUNCTION_ARGS)
{
- FuncCallContext *funcctx;
- TupleDesc tupdesc = NULL;
- int call_cntr;
- int max_calls;
- AttInMetadata *attinmeta;
- PGresult *res = NULL;
- MemoryContext oldcontext;
- char *conname = NULL;
- remoteConn *rconn = NULL;
+ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ PGresult *res = NULL;
+ char *conname = NULL;
+ remoteConn *rconn = NULL;
+ PGconn *conn = NULL;
+ StringInfoData buf;
+ char *curname = NULL;
+ int howmany = 0;
+ bool fail = true; /* default to backward compatible */
DBLINK_INIT;
- /* stuff done only on the first call of the function */
- if (SRF_IS_FIRSTCALL())
+ if (PG_NARGS() == 4)
{
- PGconn *conn = NULL;
- StringInfoData buf;
- char *curname = NULL;
- int howmany = 0;
- bool fail = true; /* default to backward compatible */
+ /* text,text,int,bool */
+ conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
+ curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
+ howmany = PG_GETARG_INT32(2);
+ fail = PG_GETARG_BOOL(3);
- if (PG_NARGS() == 4)
+ rconn = getConnectionByName(conname);
+ if (rconn)
+ conn = rconn->conn;
+ }
+ else if (PG_NARGS() == 3)
+ {
+ /* text,text,int or text,int,bool */
+ if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
+ {
+ curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
+ howmany = PG_GETARG_INT32(1);
+ fail = PG_GETARG_BOOL(2);
+ conn = pconn->conn;
+ }
+ else
{
- /* text,text,int,bool */
conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
howmany = PG_GETARG_INT32(2);
- fail = PG_GETARG_BOOL(3);
rconn = getConnectionByName(conname);
if (rconn)
conn = rconn->conn;
}
- else if (PG_NARGS() == 3)
- {
- /* text,text,int or text,int,bool */
- if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
- {
- curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
- howmany = PG_GETARG_INT32(1);
- fail = PG_GETARG_BOOL(2);
- conn = pconn->conn;
- }
- else
- {
- conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
- curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
- howmany = PG_GETARG_INT32(2);
-
- rconn = getConnectionByName(conname);
- if (rconn)
- conn = rconn->conn;
- }
- }
- else if (PG_NARGS() == 2)
- {
- /* text,int */
- curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
- howmany = PG_GETARG_INT32(1);
- conn = pconn->conn;
- }
-
- if (!conn)
- DBLINK_CONN_NOT_AVAIL;
-
- initStringInfo(&buf);
- appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
-
- /* create a function context for cross-call persistence */
- funcctx = SRF_FIRSTCALL_INIT();
-
- /*
- * Try to execute the query. Note that since libpq uses malloc, the
- * PGresult will be long-lived even though we are still in a
- * short-lived memory context.
- */
- res = PQexec(conn, buf.data);
- if (!res ||
- (PQresultStatus(res) != PGRES_COMMAND_OK &&
- PQresultStatus(res) != PGRES_TUPLES_OK))
- {
- dblink_res_error(conname, res, "could not fetch from cursor", fail);
- SRF_RETURN_DONE(funcctx);
- }
- else if (PQresultStatus(res) == PGRES_COMMAND_OK)
- {
- /* cursor does not exist - closed already or bad name */
- PQclear(res);
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_CURSOR_NAME),
- errmsg("cursor \"%s\" does not exist", curname)));
- }
-
- funcctx->max_calls = PQntuples(res);
-
- /* got results, keep track of them */
- funcctx->user_fctx = res;
-
- /* get a tuple descriptor for our result type */
- switch (get_call_result_type(fcinfo, NULL, &tupdesc))
- {
- case TYPEFUNC_COMPOSITE:
- /* success */
- break;
- case TYPEFUNC_RECORD:
- /* failed to determine actual type of RECORD */
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("function returning record called in context "
- "that cannot accept type record")));
- break;
- default:
- /* result type isn't composite */
- elog(ERROR, "return type must be a row type");
- break;
- }
-
- /* check result and tuple descriptor have the same number of columns */
- if (PQnfields(res) != tupdesc->natts)
- ereport(ERROR,
- (errcode(ERRCODE_DATATYPE_MISMATCH),
- errmsg("remote query result rowtype does not match "
- "the specified FROM clause rowtype")));
-
- /*
- * fast track when no results. We could exit earlier, but then we'd
- * not report error if the result tuple type is wrong.
- */
- if (funcctx->max_calls < 1)
- {
- PQclear(res);
- SRF_RETURN_DONE(funcctx);
- }
-
- /*
- * switch to memory context appropriate for multiple function calls,
- * so we can make long-lived copy of tupdesc etc
- */
- oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
-
- /* make sure we have a persistent copy of the tupdesc */
- tupdesc = CreateTupleDescCopy(tupdesc);
+ }
+ else if (PG_NARGS() == 2)
+ {
+ /* text,int */
+ curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
+ howmany = PG_GETARG_INT32(1);
+ conn = pconn->conn;
+ }
- /* store needed metadata for subsequent calls */
- attinmeta = TupleDescGetAttInMetadata(tupdesc);
- funcctx->attinmeta = attinmeta;
+ if (!conn)
+ DBLINK_CONN_NOT_AVAIL;
- MemoryContextSwitchTo(oldcontext);
- }
+ /* let the caller know we're sending back a tuplestore */
+ rsinfo->returnMode = SFRM_Materialize;
+ rsinfo->setResult = NULL;
+ rsinfo->setDesc = NULL;
- /* stuff done on every call of the function */
- funcctx = SRF_PERCALL_SETUP();
+ initStringInfo(&buf);
+ appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
/*
- * initialize per-call variables
+ * Try to execute the query. Note that since libpq uses malloc, the
+ * PGresult will be long-lived even though we are still in a
+ * short-lived memory context.
*/
- call_cntr = funcctx->call_cntr;
- max_calls = funcctx->max_calls;
-
- res = (PGresult *) funcctx->user_fctx;
- attinmeta = funcctx->attinmeta;
- tupdesc = attinmeta->tupdesc;
-
- if (call_cntr < max_calls) /* do when there is more left to send */
+ res = PQexec(conn, buf.data);
+ if (!res ||
+ (PQresultStatus(res) != PGRES_COMMAND_OK &&
+ PQresultStatus(res) != PGRES_TUPLES_OK))
{
- char **values;
- HeapTuple tuple;
- Datum result;
- int i;
- int nfields = PQnfields(res);
-
- values = (char **) palloc(nfields * sizeof(char *));
- for (i = 0; i < nfields; i++)
- {
- if (PQgetisnull(res, call_cntr, i) == 0)
- values[i] = PQgetvalue(res, call_cntr, i);
- else
- values[i] = NULL;
- }
-
- /* build the tuple */
- tuple = BuildTupleFromCStrings(attinmeta, values);
-
- /* make the tuple into a datum */
- result = HeapTupleGetDatum(tuple);
-
- SRF_RETURN_NEXT(funcctx, result);
+ dblink_res_error(conname, res, "could not fetch from cursor", fail);
+ return (Datum) 0;
}
- else
+ else if (PQresultStatus(res) == PGRES_COMMAND_OK)
{
- /* do when there is no more left */
+ /* cursor does not exist - closed already or bad name */
PQclear(res);
- SRF_RETURN_DONE(funcctx);
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_CURSOR_NAME),
+ errmsg("cursor \"%s\" does not exist", curname)));
}
+
+ materializeResult(fcinfo, res);
+ return (Datum) 0;
}
/*
static Datum
dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
{
- FuncCallContext *funcctx;
- TupleDesc tupdesc = NULL;
- int call_cntr;
- int max_calls;
- AttInMetadata *attinmeta;
- char *msg;
- PGresult *res = NULL;
- bool is_sql_cmd = false;
- char *sql_cmd_status = NULL;
- MemoryContext oldcontext;
- bool freeconn = false;
+ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ char *msg;
+ PGresult *res = NULL;
+ PGconn *conn = NULL;
+ char *connstr = NULL;
+ char *sql = NULL;
+ char *conname = NULL;
+ remoteConn *rconn = NULL;
+ bool fail = true; /* default to backward compatible */
+ bool freeconn = false;
+
+ /* check to see if caller supports us returning a tuplestore */
+ if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("set-valued function called in context that cannot accept a set")));
+ if (!(rsinfo->allowedModes & SFRM_Materialize))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("materialize mode required, but it is not " \
+ "allowed in this context")));
DBLINK_INIT;
- /* stuff done only on the first call of the function */
- if (SRF_IS_FIRSTCALL())
+ if (!is_async)
{
- PGconn *conn = NULL;
- char *connstr = NULL;
- char *sql = NULL;
- char *conname = NULL;
- remoteConn *rconn = NULL;
- bool fail = true; /* default to backward compatible */
-
- /* create a function context for cross-call persistence */
- funcctx = SRF_FIRSTCALL_INIT();
-
- /*
- * switch to memory context appropriate for multiple function calls
- */
- oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
-
- if (!is_async)
+ if (PG_NARGS() == 3)
{
- if (PG_NARGS() == 3)
- {
- /* text,text,bool */
- DBLINK_GET_CONN;
- sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
- fail = PG_GETARG_BOOL(2);
- }
- else if (PG_NARGS() == 2)
- {
- /* text,text or text,bool */
- if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
- {
- conn = pconn->conn;
- sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
- fail = PG_GETARG_BOOL(1);
- }
- else
- {
- DBLINK_GET_CONN;
- sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
- }
- }
- else if (PG_NARGS() == 1)
- {
- /* text */
- conn = pconn->conn;
- sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
- }
- else
- /* shouldn't happen */
- elog(ERROR, "wrong number of arguments");
+ /* text,text,bool */
+ DBLINK_GET_CONN;
+ sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
+ fail = PG_GETARG_BOOL(2);
}
- else /* is_async */
+ else if (PG_NARGS() == 2)
{
- /* get async result */
- if (PG_NARGS() == 2)
+ /* text,text or text,bool */
+ if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
{
- /* text,bool */
- DBLINK_GET_CONN;
+ conn = pconn->conn;
+ sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
fail = PG_GETARG_BOOL(1);
}
- else if (PG_NARGS() == 1)
+ else
{
- /* text */
DBLINK_GET_CONN;
+ sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
}
- else
- /* shouldn't happen */
- elog(ERROR, "wrong number of arguments");
}
-
- if (!conn)
- DBLINK_CONN_NOT_AVAIL;
-
- /* synchronous query, or async result retrieval */
- if (!is_async)
- res = PQexec(conn, sql);
+ else if (PG_NARGS() == 1)
+ {
+ /* text */
+ conn = pconn->conn;
+ sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
+ }
else
+ /* shouldn't happen */
+ elog(ERROR, "wrong number of arguments");
+ }
+ else /* is_async */
+ {
+ /* get async result */
+ if (PG_NARGS() == 2)
{
- res = PQgetResult(conn);
- /* NULL means we're all done with the async results */
- if (!res)
- {
- MemoryContextSwitchTo(oldcontext);
- SRF_RETURN_DONE(funcctx);
- }
+ /* text,bool */
+ DBLINK_GET_CONN;
+ fail = PG_GETARG_BOOL(1);
}
-
- if (!res ||
- (PQresultStatus(res) != PGRES_COMMAND_OK &&
- PQresultStatus(res) != PGRES_TUPLES_OK))
+ else if (PG_NARGS() == 1)
{
- if (freeconn)
- PQfinish(conn);
- dblink_res_error(conname, res, "could not execute query", fail);
- MemoryContextSwitchTo(oldcontext);
- SRF_RETURN_DONE(funcctx);
+ /* text */
+ DBLINK_GET_CONN;
}
+ else
+ /* shouldn't happen */
+ elog(ERROR, "wrong number of arguments");
+ }
+
+ if (!conn)
+ DBLINK_CONN_NOT_AVAIL;
+
+ /* let the caller know we're sending back a tuplestore */
+ rsinfo->returnMode = SFRM_Materialize;
+ rsinfo->setResult = NULL;
+ rsinfo->setDesc = NULL;
+
+ /* synchronous query, or async result retrieval */
+ if (!is_async)
+ res = PQexec(conn, sql);
+ else
+ {
+ res = PQgetResult(conn);
+ /* NULL means we're all done with the async results */
+ if (!res)
+ return (Datum) 0;
+ }
+
+ /* if needed, close the connection to the database and cleanup */
+ if (freeconn)
+ PQfinish(conn);
+
+ if (!res ||
+ (PQresultStatus(res) != PGRES_COMMAND_OK &&
+ PQresultStatus(res) != PGRES_TUPLES_OK))
+ {
+ dblink_res_error(conname, res, "could not execute query", fail);
+ return (Datum) 0;
+ }
+
+ materializeResult(fcinfo, res);
+ return (Datum) 0;
+}
+
+/*
+ * Materialize the PGresult to return them as the function result.
+ * The res will be released in this function.
+ */
+static void
+materializeResult(FunctionCallInfo fcinfo, PGresult *res)
+{
+ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+
+ Assert(rsinfo->returnMode == SFRM_Materialize);
+
+ PG_TRY();
+ {
+ TupleDesc tupdesc;
+ bool is_sql_cmd = false;
+ int ntuples;
+ int nfields;
if (PQresultStatus(res) == PGRES_COMMAND_OK)
{
is_sql_cmd = true;
- /* need a tuple descriptor representing one TEXT column */
+ /*
+ * need a tuple descriptor representing one TEXT column to
+ * return the command status string as our result tuple
+ */
tupdesc = CreateTemplateTupleDesc(1, false);
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
TEXTOID, -1, 0);
-
- /*
- * and save a copy of the command status string to return as our
- * result tuple
- */
- sql_cmd_status = PQcmdStatus(res);
- funcctx->max_calls = 1;
+ ntuples = 1;
+ nfields = 1;
}
else
- funcctx->max_calls = PQntuples(res);
-
- /* got results, keep track of them */
- funcctx->user_fctx = res;
+ {
+ Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
- /* if needed, close the connection to the database and cleanup */
- if (freeconn)
- PQfinish(conn);
+ is_sql_cmd = false;
- if (!is_sql_cmd)
- {
/* get a tuple descriptor for our result type */
switch (get_call_result_type(fcinfo, NULL, &tupdesc))
{
/* make sure we have a persistent copy of the tupdesc */
tupdesc = CreateTupleDescCopy(tupdesc);
+ ntuples = PQntuples(res);
+ nfields = PQnfields(res);
}
/*
* check result and tuple descriptor have the same number of columns
*/
- if (PQnfields(res) != tupdesc->natts)
+ if (nfields != tupdesc->natts)
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("remote query result rowtype does not match "
"the specified FROM clause rowtype")));
- /* fast track when no results */
- if (funcctx->max_calls < 1)
+ if (ntuples > 0)
{
- if (res)
- PQclear(res);
+ AttInMetadata *attinmeta;
+ Tuplestorestate *tupstore;
+ MemoryContext oldcontext;
+ int row;
+ char **values;
+
+ attinmeta = TupleDescGetAttInMetadata(tupdesc);
+
+ oldcontext = MemoryContextSwitchTo(
+ rsinfo->econtext->ecxt_per_query_memory);
+ tupstore = tuplestore_begin_heap(true, false, work_mem);
+ rsinfo->setResult = tupstore;
+ rsinfo->setDesc = tupdesc;
MemoryContextSwitchTo(oldcontext);
- SRF_RETURN_DONE(funcctx);
- }
-
- /* store needed metadata for subsequent calls */
- attinmeta = TupleDescGetAttInMetadata(tupdesc);
- funcctx->attinmeta = attinmeta;
-
- MemoryContextSwitchTo(oldcontext);
-
- }
-
- /* stuff done on every call of the function */
- funcctx = SRF_PERCALL_SETUP();
-
- /*
- * initialize per-call variables
- */
- call_cntr = funcctx->call_cntr;
- max_calls = funcctx->max_calls;
- res = (PGresult *) funcctx->user_fctx;
- attinmeta = funcctx->attinmeta;
- tupdesc = attinmeta->tupdesc;
+ values = (char **) palloc(nfields * sizeof(char *));
- if (call_cntr < max_calls) /* do when there is more left to send */
- {
- char **values;
- HeapTuple tuple;
- Datum result;
+ /* put all tuples into the tuplestore */
+ for (row = 0; row < ntuples; row++)
+ {
+ HeapTuple tuple;
- if (!is_sql_cmd)
- {
- int i;
- int nfields = PQnfields(res);
+ if (!is_sql_cmd)
+ {
+ int i;
- values = (char **) palloc(nfields * sizeof(char *));
- for (i = 0; i < nfields; i++)
- {
- if (PQgetisnull(res, call_cntr, i) == 0)
- values[i] = PQgetvalue(res, call_cntr, i);
+ for (i = 0; i < nfields; i++)
+ {
+ if (PQgetisnull(res, row, i))
+ values[i] = NULL;
+ else
+ values[i] = PQgetvalue(res, row, i);
+ }
+ }
else
- values[i] = NULL;
- }
- }
- else
- {
- values = (char **) palloc(1 * sizeof(char *));
- values[0] = sql_cmd_status;
- }
+ {
+ values[0] = PQcmdStatus(res);
+ }
- /* build the tuple */
- tuple = BuildTupleFromCStrings(attinmeta, values);
+ /* build the tuple and put it into the tuplestore. */
+ tuple = BuildTupleFromCStrings(attinmeta, values);
+ tuplestore_puttuple(tupstore, tuple);
+ }
- /* make the tuple into a datum */
- result = HeapTupleGetDatum(tuple);
+ /* clean up and return the tuplestore */
+ tuplestore_donestoring(tupstore);
+ }
- SRF_RETURN_NEXT(funcctx, result);
+ PQclear(res);
}
- else
+ PG_CATCH();
{
- /* do when there is no more left */
+ /* be sure to release the libpq result */
PQclear(res);
- SRF_RETURN_DONE(funcctx);
+ PG_RE_THROW();
}
+ PG_END_TRY();
}
/*