From: Joe Conway Date: Sun, 24 Jan 2010 22:19:38 +0000 (+0000) Subject: Rewrite dblink_record_internal() and dblink_fetch() to use a tuplestore X-Git-Tag: REL9_0_0~1070 X-Git-Url: http://git.osdn.net/view?a=commitdiff_plain;h=56cbb611ec749ba867a4cfc09c8b7df0f4446620;p=pg-rex%2Fsyncrep.git Rewrite dblink_record_internal() and dblink_fetch() to use a tuplestore (SFRM_Materialize mode) to return tuples. Since we don't return from the dblink function in tuplestore mode, release the PGresult with a PG_CATCH block on error. Also rearrange to share the same code to materialize the tuplestore. Patch by Takahiro Itagaki. --- diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index 3fc6b60c2a..ded7832f33 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -8,7 +8,7 @@ * Darko Prenosil * Shridhar Daithankar * - * $PostgreSQL: pgsql/contrib/dblink/dblink.c,v 1.86 2010/01/02 16:57:32 momjian Exp $ + * $PostgreSQL: pgsql/contrib/dblink/dblink.c,v 1.87 2010/01/24 22:19:38 joe Exp $ * Copyright (c) 2001-2010, PostgreSQL Global Development Group * ALL RIGHTS RESERVED; * @@ -80,6 +80,7 @@ typedef struct remoteConn * Internal declarations */ static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async); +static void materializeResult(FunctionCallInfo fcinfo, PGresult *res); static remoteConn *getConnectionByName(const char *name); static HTAB *createConnHash(void); static void createNewConnection(const char *name, remoteConn *rconn); @@ -504,200 +505,94 @@ PG_FUNCTION_INFO_V1(dblink_fetch); Datum dblink_fetch(PG_FUNCTION_ARGS) { - FuncCallContext *funcctx; - TupleDesc tupdesc = NULL; - int call_cntr; - int max_calls; - AttInMetadata *attinmeta; - PGresult *res = NULL; - MemoryContext oldcontext; - char *conname = NULL; - remoteConn *rconn = NULL; + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + PGresult *res = NULL; + char *conname = NULL; + remoteConn *rconn = NULL; + PGconn *conn = NULL; + StringInfoData buf; + char *curname = NULL; + int howmany = 0; + bool fail = true; /* default to backward compatible */ DBLINK_INIT; - /* stuff done only on the first call of the function */ - if (SRF_IS_FIRSTCALL()) + if (PG_NARGS() == 4) { - PGconn *conn = NULL; - StringInfoData buf; - char *curname = NULL; - int howmany = 0; - bool fail = true; /* default to backward compatible */ + /* text,text,int,bool */ + conname = text_to_cstring(PG_GETARG_TEXT_PP(0)); + curname = text_to_cstring(PG_GETARG_TEXT_PP(1)); + howmany = PG_GETARG_INT32(2); + fail = PG_GETARG_BOOL(3); - if (PG_NARGS() == 4) + rconn = getConnectionByName(conname); + if (rconn) + conn = rconn->conn; + } + else if (PG_NARGS() == 3) + { + /* text,text,int or text,int,bool */ + if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID) + { + curname = text_to_cstring(PG_GETARG_TEXT_PP(0)); + howmany = PG_GETARG_INT32(1); + fail = PG_GETARG_BOOL(2); + conn = pconn->conn; + } + else { - /* text,text,int,bool */ conname = text_to_cstring(PG_GETARG_TEXT_PP(0)); curname = text_to_cstring(PG_GETARG_TEXT_PP(1)); howmany = PG_GETARG_INT32(2); - fail = PG_GETARG_BOOL(3); rconn = getConnectionByName(conname); if (rconn) conn = rconn->conn; } - else if (PG_NARGS() == 3) - { - /* text,text,int or text,int,bool */ - if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID) - { - curname = text_to_cstring(PG_GETARG_TEXT_PP(0)); - howmany = PG_GETARG_INT32(1); - fail = PG_GETARG_BOOL(2); - conn = pconn->conn; - } - else - { - conname = text_to_cstring(PG_GETARG_TEXT_PP(0)); - curname = text_to_cstring(PG_GETARG_TEXT_PP(1)); - howmany = PG_GETARG_INT32(2); - - rconn = getConnectionByName(conname); - if (rconn) - conn = rconn->conn; - } - } - else if (PG_NARGS() == 2) - { - /* text,int */ - curname = text_to_cstring(PG_GETARG_TEXT_PP(0)); - howmany = PG_GETARG_INT32(1); - conn = pconn->conn; - } - - if (!conn) - DBLINK_CONN_NOT_AVAIL; - - initStringInfo(&buf); - appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname); - - /* create a function context for cross-call persistence */ - funcctx = SRF_FIRSTCALL_INIT(); - - /* - * Try to execute the query. Note that since libpq uses malloc, the - * PGresult will be long-lived even though we are still in a - * short-lived memory context. - */ - res = PQexec(conn, buf.data); - if (!res || - (PQresultStatus(res) != PGRES_COMMAND_OK && - PQresultStatus(res) != PGRES_TUPLES_OK)) - { - dblink_res_error(conname, res, "could not fetch from cursor", fail); - SRF_RETURN_DONE(funcctx); - } - else if (PQresultStatus(res) == PGRES_COMMAND_OK) - { - /* cursor does not exist - closed already or bad name */ - PQclear(res); - ereport(ERROR, - (errcode(ERRCODE_INVALID_CURSOR_NAME), - errmsg("cursor \"%s\" does not exist", curname))); - } - - funcctx->max_calls = PQntuples(res); - - /* got results, keep track of them */ - funcctx->user_fctx = res; - - /* get a tuple descriptor for our result type */ - switch (get_call_result_type(fcinfo, NULL, &tupdesc)) - { - case TYPEFUNC_COMPOSITE: - /* success */ - break; - case TYPEFUNC_RECORD: - /* failed to determine actual type of RECORD */ - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("function returning record called in context " - "that cannot accept type record"))); - break; - default: - /* result type isn't composite */ - elog(ERROR, "return type must be a row type"); - break; - } - - /* check result and tuple descriptor have the same number of columns */ - if (PQnfields(res) != tupdesc->natts) - ereport(ERROR, - (errcode(ERRCODE_DATATYPE_MISMATCH), - errmsg("remote query result rowtype does not match " - "the specified FROM clause rowtype"))); - - /* - * fast track when no results. We could exit earlier, but then we'd - * not report error if the result tuple type is wrong. - */ - if (funcctx->max_calls < 1) - { - PQclear(res); - SRF_RETURN_DONE(funcctx); - } - - /* - * switch to memory context appropriate for multiple function calls, - * so we can make long-lived copy of tupdesc etc - */ - oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); - - /* make sure we have a persistent copy of the tupdesc */ - tupdesc = CreateTupleDescCopy(tupdesc); + } + else if (PG_NARGS() == 2) + { + /* text,int */ + curname = text_to_cstring(PG_GETARG_TEXT_PP(0)); + howmany = PG_GETARG_INT32(1); + conn = pconn->conn; + } - /* store needed metadata for subsequent calls */ - attinmeta = TupleDescGetAttInMetadata(tupdesc); - funcctx->attinmeta = attinmeta; + if (!conn) + DBLINK_CONN_NOT_AVAIL; - MemoryContextSwitchTo(oldcontext); - } + /* let the caller know we're sending back a tuplestore */ + rsinfo->returnMode = SFRM_Materialize; + rsinfo->setResult = NULL; + rsinfo->setDesc = NULL; - /* stuff done on every call of the function */ - funcctx = SRF_PERCALL_SETUP(); + initStringInfo(&buf); + appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname); /* - * initialize per-call variables + * Try to execute the query. Note that since libpq uses malloc, the + * PGresult will be long-lived even though we are still in a + * short-lived memory context. */ - call_cntr = funcctx->call_cntr; - max_calls = funcctx->max_calls; - - res = (PGresult *) funcctx->user_fctx; - attinmeta = funcctx->attinmeta; - tupdesc = attinmeta->tupdesc; - - if (call_cntr < max_calls) /* do when there is more left to send */ + res = PQexec(conn, buf.data); + if (!res || + (PQresultStatus(res) != PGRES_COMMAND_OK && + PQresultStatus(res) != PGRES_TUPLES_OK)) { - char **values; - HeapTuple tuple; - Datum result; - int i; - int nfields = PQnfields(res); - - values = (char **) palloc(nfields * sizeof(char *)); - for (i = 0; i < nfields; i++) - { - if (PQgetisnull(res, call_cntr, i) == 0) - values[i] = PQgetvalue(res, call_cntr, i); - else - values[i] = NULL; - } - - /* build the tuple */ - tuple = BuildTupleFromCStrings(attinmeta, values); - - /* make the tuple into a datum */ - result = HeapTupleGetDatum(tuple); - - SRF_RETURN_NEXT(funcctx, result); + dblink_res_error(conname, res, "could not fetch from cursor", fail); + return (Datum) 0; } - else + else if (PQresultStatus(res) == PGRES_COMMAND_OK) { - /* do when there is no more left */ + /* cursor does not exist - closed already or bad name */ PQclear(res); - SRF_RETURN_DONE(funcctx); + ereport(ERROR, + (errcode(ERRCODE_INVALID_CURSOR_NAME), + errmsg("cursor \"%s\" does not exist", curname))); } + + materializeResult(fcinfo, res); + return (Datum) 0; } /* @@ -749,147 +644,156 @@ dblink_get_result(PG_FUNCTION_ARGS) static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) { - FuncCallContext *funcctx; - TupleDesc tupdesc = NULL; - int call_cntr; - int max_calls; - AttInMetadata *attinmeta; - char *msg; - PGresult *res = NULL; - bool is_sql_cmd = false; - char *sql_cmd_status = NULL; - MemoryContext oldcontext; - bool freeconn = false; + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + char *msg; + PGresult *res = NULL; + PGconn *conn = NULL; + char *connstr = NULL; + char *sql = NULL; + char *conname = NULL; + remoteConn *rconn = NULL; + bool fail = true; /* default to backward compatible */ + bool freeconn = false; + + /* check to see if caller supports us returning a tuplestore */ + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-valued function called in context that cannot accept a set"))); + if (!(rsinfo->allowedModes & SFRM_Materialize)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mode required, but it is not " \ + "allowed in this context"))); DBLINK_INIT; - /* stuff done only on the first call of the function */ - if (SRF_IS_FIRSTCALL()) + if (!is_async) { - PGconn *conn = NULL; - char *connstr = NULL; - char *sql = NULL; - char *conname = NULL; - remoteConn *rconn = NULL; - bool fail = true; /* default to backward compatible */ - - /* create a function context for cross-call persistence */ - funcctx = SRF_FIRSTCALL_INIT(); - - /* - * switch to memory context appropriate for multiple function calls - */ - oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); - - if (!is_async) + if (PG_NARGS() == 3) { - if (PG_NARGS() == 3) - { - /* text,text,bool */ - DBLINK_GET_CONN; - sql = text_to_cstring(PG_GETARG_TEXT_PP(1)); - fail = PG_GETARG_BOOL(2); - } - else if (PG_NARGS() == 2) - { - /* text,text or text,bool */ - if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID) - { - conn = pconn->conn; - sql = text_to_cstring(PG_GETARG_TEXT_PP(0)); - fail = PG_GETARG_BOOL(1); - } - else - { - DBLINK_GET_CONN; - sql = text_to_cstring(PG_GETARG_TEXT_PP(1)); - } - } - else if (PG_NARGS() == 1) - { - /* text */ - conn = pconn->conn; - sql = text_to_cstring(PG_GETARG_TEXT_PP(0)); - } - else - /* shouldn't happen */ - elog(ERROR, "wrong number of arguments"); + /* text,text,bool */ + DBLINK_GET_CONN; + sql = text_to_cstring(PG_GETARG_TEXT_PP(1)); + fail = PG_GETARG_BOOL(2); } - else /* is_async */ + else if (PG_NARGS() == 2) { - /* get async result */ - if (PG_NARGS() == 2) + /* text,text or text,bool */ + if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID) { - /* text,bool */ - DBLINK_GET_CONN; + conn = pconn->conn; + sql = text_to_cstring(PG_GETARG_TEXT_PP(0)); fail = PG_GETARG_BOOL(1); } - else if (PG_NARGS() == 1) + else { - /* text */ DBLINK_GET_CONN; + sql = text_to_cstring(PG_GETARG_TEXT_PP(1)); } - else - /* shouldn't happen */ - elog(ERROR, "wrong number of arguments"); } - - if (!conn) - DBLINK_CONN_NOT_AVAIL; - - /* synchronous query, or async result retrieval */ - if (!is_async) - res = PQexec(conn, sql); + else if (PG_NARGS() == 1) + { + /* text */ + conn = pconn->conn; + sql = text_to_cstring(PG_GETARG_TEXT_PP(0)); + } else + /* shouldn't happen */ + elog(ERROR, "wrong number of arguments"); + } + else /* is_async */ + { + /* get async result */ + if (PG_NARGS() == 2) { - res = PQgetResult(conn); - /* NULL means we're all done with the async results */ - if (!res) - { - MemoryContextSwitchTo(oldcontext); - SRF_RETURN_DONE(funcctx); - } + /* text,bool */ + DBLINK_GET_CONN; + fail = PG_GETARG_BOOL(1); } - - if (!res || - (PQresultStatus(res) != PGRES_COMMAND_OK && - PQresultStatus(res) != PGRES_TUPLES_OK)) + else if (PG_NARGS() == 1) { - if (freeconn) - PQfinish(conn); - dblink_res_error(conname, res, "could not execute query", fail); - MemoryContextSwitchTo(oldcontext); - SRF_RETURN_DONE(funcctx); + /* text */ + DBLINK_GET_CONN; } + else + /* shouldn't happen */ + elog(ERROR, "wrong number of arguments"); + } + + if (!conn) + DBLINK_CONN_NOT_AVAIL; + + /* let the caller know we're sending back a tuplestore */ + rsinfo->returnMode = SFRM_Materialize; + rsinfo->setResult = NULL; + rsinfo->setDesc = NULL; + + /* synchronous query, or async result retrieval */ + if (!is_async) + res = PQexec(conn, sql); + else + { + res = PQgetResult(conn); + /* NULL means we're all done with the async results */ + if (!res) + return (Datum) 0; + } + + /* if needed, close the connection to the database and cleanup */ + if (freeconn) + PQfinish(conn); + + if (!res || + (PQresultStatus(res) != PGRES_COMMAND_OK && + PQresultStatus(res) != PGRES_TUPLES_OK)) + { + dblink_res_error(conname, res, "could not execute query", fail); + return (Datum) 0; + } + + materializeResult(fcinfo, res); + return (Datum) 0; +} + +/* + * Materialize the PGresult to return them as the function result. + * The res will be released in this function. + */ +static void +materializeResult(FunctionCallInfo fcinfo, PGresult *res) +{ + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + + Assert(rsinfo->returnMode == SFRM_Materialize); + + PG_TRY(); + { + TupleDesc tupdesc; + bool is_sql_cmd = false; + int ntuples; + int nfields; if (PQresultStatus(res) == PGRES_COMMAND_OK) { is_sql_cmd = true; - /* need a tuple descriptor representing one TEXT column */ + /* + * need a tuple descriptor representing one TEXT column to + * return the command status string as our result tuple + */ tupdesc = CreateTemplateTupleDesc(1, false); TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status", TEXTOID, -1, 0); - - /* - * and save a copy of the command status string to return as our - * result tuple - */ - sql_cmd_status = PQcmdStatus(res); - funcctx->max_calls = 1; + ntuples = 1; + nfields = 1; } else - funcctx->max_calls = PQntuples(res); - - /* got results, keep track of them */ - funcctx->user_fctx = res; + { + Assert(PQresultStatus(res) == PGRES_TUPLES_OK); - /* if needed, close the connection to the database and cleanup */ - if (freeconn) - PQfinish(conn); + is_sql_cmd = false; - if (!is_sql_cmd) - { /* get a tuple descriptor for our result type */ switch (get_call_result_type(fcinfo, NULL, &tupdesc)) { @@ -911,87 +815,78 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) /* make sure we have a persistent copy of the tupdesc */ tupdesc = CreateTupleDescCopy(tupdesc); + ntuples = PQntuples(res); + nfields = PQnfields(res); } /* * check result and tuple descriptor have the same number of columns */ - if (PQnfields(res) != tupdesc->natts) + if (nfields != tupdesc->natts) ereport(ERROR, (errcode(ERRCODE_DATATYPE_MISMATCH), errmsg("remote query result rowtype does not match " "the specified FROM clause rowtype"))); - /* fast track when no results */ - if (funcctx->max_calls < 1) + if (ntuples > 0) { - if (res) - PQclear(res); + AttInMetadata *attinmeta; + Tuplestorestate *tupstore; + MemoryContext oldcontext; + int row; + char **values; + + attinmeta = TupleDescGetAttInMetadata(tupdesc); + + oldcontext = MemoryContextSwitchTo( + rsinfo->econtext->ecxt_per_query_memory); + tupstore = tuplestore_begin_heap(true, false, work_mem); + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; MemoryContextSwitchTo(oldcontext); - SRF_RETURN_DONE(funcctx); - } - - /* store needed metadata for subsequent calls */ - attinmeta = TupleDescGetAttInMetadata(tupdesc); - funcctx->attinmeta = attinmeta; - - MemoryContextSwitchTo(oldcontext); - - } - - /* stuff done on every call of the function */ - funcctx = SRF_PERCALL_SETUP(); - - /* - * initialize per-call variables - */ - call_cntr = funcctx->call_cntr; - max_calls = funcctx->max_calls; - res = (PGresult *) funcctx->user_fctx; - attinmeta = funcctx->attinmeta; - tupdesc = attinmeta->tupdesc; + values = (char **) palloc(nfields * sizeof(char *)); - if (call_cntr < max_calls) /* do when there is more left to send */ - { - char **values; - HeapTuple tuple; - Datum result; + /* put all tuples into the tuplestore */ + for (row = 0; row < ntuples; row++) + { + HeapTuple tuple; - if (!is_sql_cmd) - { - int i; - int nfields = PQnfields(res); + if (!is_sql_cmd) + { + int i; - values = (char **) palloc(nfields * sizeof(char *)); - for (i = 0; i < nfields; i++) - { - if (PQgetisnull(res, call_cntr, i) == 0) - values[i] = PQgetvalue(res, call_cntr, i); + for (i = 0; i < nfields; i++) + { + if (PQgetisnull(res, row, i)) + values[i] = NULL; + else + values[i] = PQgetvalue(res, row, i); + } + } else - values[i] = NULL; - } - } - else - { - values = (char **) palloc(1 * sizeof(char *)); - values[0] = sql_cmd_status; - } + { + values[0] = PQcmdStatus(res); + } - /* build the tuple */ - tuple = BuildTupleFromCStrings(attinmeta, values); + /* build the tuple and put it into the tuplestore. */ + tuple = BuildTupleFromCStrings(attinmeta, values); + tuplestore_puttuple(tupstore, tuple); + } - /* make the tuple into a datum */ - result = HeapTupleGetDatum(tuple); + /* clean up and return the tuplestore */ + tuplestore_donestoring(tupstore); + } - SRF_RETURN_NEXT(funcctx, result); + PQclear(res); } - else + PG_CATCH(); { - /* do when there is no more left */ + /* be sure to release the libpq result */ PQclear(res); - SRF_RETURN_DONE(funcctx); + PG_RE_THROW(); } + PG_END_TRY(); } /*