1 /****************************************************************************
3 * $Id: pending.c,v 1.1 2002/06/23 21:58:08 momjian Exp $
5 * This file contains a trigger for Postgresql-7.x to record changes to tables
6 * to a pending table for mirroring.
7 * All tables that should be mirrored should have this trigger hooked up to it.
9 * Written by Steven Singer (ssinger@navtechinc.com)
10 * (c) 2001-2002 Navtech Systems Support Inc.
11 * Released under the GNU Public License version 2. See COPYING.
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
20 ***************************************************************************/
21 #include <executor/spi.h>
22 #include <commands/trigger.h>
25 enum FieldUsage {PRIMARY=0,NONPRIMARY,ALL,NUM_FIELDUSAGE};
27 int storePending(char * cpTableName, HeapTuple tBeforeTuple,
28 HeapTuple tAfterTuple,
30 TriggerData * tpTrigdata,char cOp);
32 int storeKeyInfo(char * cpTableName, HeapTuple tTupleData, TupleDesc tTuplDesc,
33 TriggerData * tpTrigdata);
34 int storeData(char * cpTableName,HeapTuple tTupleData,TupleDesc tTupleDesc,
35 TriggerData * tpTrigData,int iIncludeKeyData);
37 int2vector * getPrimaryKey(Oid tblOid);
39 char * packageData(HeapTuple tTupleData, TupleDesc tTupleDecs,
40 TriggerData * tTrigData,
41 enum FieldUsage eKeyUsage );
43 #define BUFFER_SIZE 256
44 #define MAX_OID_LEN 10
47 extern Datum recordchange(PG_FUNCTION_ARGS);
48 PG_FUNCTION_INFO_V1(recordchange);
51 /*****************************************************************************
52 * The entry point for the trigger function.
53 * The Trigger takes a single SQL 'text' argument indicating the name of the
54 * table the trigger was applied to. If this name is incorrect so will the
56 ****************************************************************************/
57 Datum recordchange(PG_FUNCTION_ARGS) {
58 TriggerData * trigdata;
60 HeapTuple beforeTuple=NULL;
61 HeapTuple afterTuple=NULL;
62 HeapTuple retTuple=NULL;
65 if(fcinfo->context!=NULL) {
67 if(SPI_connect() < 0) {
68 elog(NOTICE,"storePending could not connect to SPI");
71 trigdata = (TriggerData*)fcinfo->context;
72 /* Extract the table name */
73 tblname = SPI_getrelname(trigdata->tg_relation);
74 tupdesc = trigdata->tg_relation->rd_att;
75 if(TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event)) {
76 retTuple = trigdata->tg_newtuple;
77 beforeTuple = trigdata->tg_trigtuple;
78 afterTuple = trigdata->tg_newtuple;
82 else if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event)) {
83 retTuple = trigdata->tg_trigtuple;
84 afterTuple = trigdata->tg_trigtuple;
87 else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event)) {
88 retTuple = trigdata->tg_trigtuple;
89 beforeTuple = trigdata->tg_trigtuple;
93 if(storePending(tblname,beforeTuple,afterTuple,tupdesc,trigdata,op)) {
94 /* An error occoured. Skip the operation. */
95 elog(ERROR,"Operation could not be mirrored");
96 return PointerGetDatum(NULL);
99 #if defined DEBUG_OUTPUT
100 elog(NOTICE,"Returning on success");
103 return PointerGetDatum(retTuple);
107 * Not being called as a trigger.
109 return PointerGetDatum(NULL);
114 /*****************************************************************************
115 * Constructs and executes an SQL query to write a record of this tuple change
116 * to the pending table.
117 *****************************************************************************/
118 int storePending(char * cpTableName, HeapTuple tBeforeTuple,
119 HeapTuple tAfterTuple,
121 TriggerData * tpTrigData,char cOp) {
122 char * cpQueryBase = "INSERT INTO \"Pending\" (\"TableName\",\"Op\",\"XID\") VALUES ($1,$2,$3)";
125 HeapTuple tCurTuple; // Points the current tuple(before or after)
127 Oid taPlanArgTypes[3] = {NAMEOID,CHAROID,INT4OID};
130 tCurTuple = tBeforeTuple ? tBeforeTuple : tAfterTuple;
135 vpPlan = SPI_prepare(cpQueryBase,3,taPlanArgTypes);
137 elog(NOTICE,"Error creating plan");
139 // SPI_saveplan(vpPlan);
141 saPlanData[0] = PointerGetDatum(cpTableName);
142 saPlanData[1] = CharGetDatum(cOp);
143 saPlanData[2] = Int32GetDatum(GetCurrentTransactionId());
146 iResult = SPI_execp(vpPlan,saPlanData,NULL,1);
148 elog(NOTICE,"storedPending fired (%s) returned %d",cpQueryBase,iResult);
152 #if defined DEBUG_OUTPUT
153 elog(NOTICE,"row successfully stored in pending table");
158 * This is a record of a delete operation.
159 * Just store the key data.
161 iResult = storeKeyInfo(cpTableName,tBeforeTuple,tTupDesc,tpTrigData);
165 * An Insert operation.
168 iResult = storeData(cpTableName,tAfterTuple,tTupDesc,tpTrigData,TRUE);
172 /* op must be an update. */
173 iResult = storeKeyInfo(cpTableName,tBeforeTuple,tTupDesc,tpTrigData);
174 iResult = iResult ? iResult : storeData(cpTableName,tAfterTuple,tTupDesc,
178 #if defined DEBUG_OUTPUT
179 elog(NOTICE,"DOne storing keyinfo");
186 int storeKeyInfo(char * cpTableName, HeapTuple tTupleData,
187 TupleDesc tTupleDesc,
188 TriggerData * tpTrigData) {
190 Oid saPlanArgTypes[1] = {NAMEOID};
191 char * insQuery = "INSERT INTO \"PendingData\" (\"SeqId\",\"IsKey\",\"Data\") VALUES(currval('\"Pending_SeqId_seq\"'),'t',$1)";
197 pplan = SPI_prepare(insQuery,1,saPlanArgTypes);
199 elog(NOTICE,"Could not prepare INSERT plan");
203 // pplan = SPI_saveplan(pplan);
204 cpKeyData = packageData(tTupleData, tTupleDesc,tpTrigData,PRIMARY);
205 #if defined DEBUG_OUTPUT
206 elog(NOTICE,cpKeyData);
208 saPlanData[0] = PointerGetDatum(cpKeyData);
210 iRetCode = SPI_execp(pplan,saPlanData,NULL,1);
212 if(cpKeyData!=NULL) {
213 SPI_pfree(cpKeyData);
216 if(iRetCode != SPI_OK_INSERT ) {
217 elog(NOTICE,"Error inserting row in pendingDelete");
220 #if defined DEBUG_OUTPUT
221 elog(NOTICE,"INSERT SUCCESFULL");
231 int2vector * getPrimaryKey(Oid tblOid) {
235 int2vector * resultKey;
236 int2vector * tpResultKey;
240 queryBase = "SELECT indkey FROM pg_index WHERE indisprimary='t' AND indrelid=";
241 query = SPI_palloc(strlen(queryBase) + MAX_OID_LEN+1);
242 sprintf(query,"%s%d",queryBase,tblOid);
243 ret = SPI_exec(query,1);
244 if(ret != SPI_OK_SELECT || SPI_processed != 1 ) {
245 elog(NOTICE,"Could not select primary index key");
249 resTuple = SPI_tuptable->vals[0];
250 resDatum = SPI_getbinval(resTuple,SPI_tuptable->tupdesc,1,&isNull);
252 tpResultKey = (int2vector*) DatumGetPointer(resDatum);
253 resultKey = SPI_palloc(sizeof(int2vector));
254 memcpy(resultKey,tpResultKey,sizeof(int2vector));
260 /******************************************************************************
261 * Stores a copy of the non-key data for the row.
262 *****************************************************************************/
263 int storeData(char * cpTableName,HeapTuple tTupleData,TupleDesc tTupleDesc,
264 TriggerData * tpTrigData,int iIncludeKeyData) {
266 Oid planArgTypes[1] = {NAMEOID};
267 char * insQuery = "INSERT INTO \"PendingData\" (\"SeqId\",\"IsKey\",\"Data\") VALUES(currval('\"Pending_SeqId_seq\"'),'f',$1)";
273 pplan = SPI_prepare(insQuery,1,planArgTypes);
275 elog(NOTICE,"Could not prepare INSERT plan");
279 // pplan = SPI_saveplan(pplan);
280 if(iIncludeKeyData==0) {
281 cpKeyData = packageData(tTupleData, tTupleDesc,tpTrigData,NONPRIMARY);
284 cpKeyData = packageData(tTupleData,tTupleDesc,tpTrigData,ALL);
287 planData[0] = PointerGetDatum(cpKeyData);
288 iRetValue = SPI_execp(pplan,planData,NULL,1);
291 SPI_pfree(cpKeyData);
294 if(iRetValue != SPI_OK_INSERT ) {
295 elog(NOTICE,"Error inserting row in pendingDelete");
298 #if defined DEBUG_OUTPUT
299 elog(NOTICE,"INSERT SUCCESFULL");
307 * Packages the data in tTupleData into a string of the format
308 * FieldName='value text' where any quotes inside of value text
309 * are escaped with a backslash and any backslashes in value text
310 * are esacped by a second back slash.
312 * tTupleDesc should be a description of the tuple stored in
315 * eFieldUsage specifies which fields to use.
316 * PRIMARY implies include only primary key fields.
317 * NONPRIMARY implies include only non-primary key fields.
318 * ALL implies include all fields.
320 char * packageData(HeapTuple tTupleData, TupleDesc tTupleDesc,
321 TriggerData * tpTrigData,
322 enum FieldUsage eKeyUsage ) {
324 int2vector * tpPKeys=NULL;
330 iNumCols = tTupleDesc->natts;
333 tpPKeys = getPrimaryKey(tpTrigData->tg_relation->rd_id);
338 #if defined DEBUG_OUTPUT
340 elog(NOTICE,"Have primary keys");
343 cpDataBlock = SPI_palloc(BUFFER_SIZE);
344 iDataBlockSize = BUFFER_SIZE;
345 iUsedDataBlock = 0; /* To account for the null */
347 for(iColumnCounter=1; iColumnCounter <=iNumCols; iColumnCounter++) {
349 int iPrimaryKeyIndex;
350 char * cpUnFormatedPtr;
351 char * cpFormatedPtr;
356 //Determine if this is a primary key or not.
358 for(iPrimaryKeyIndex=0; (*tpPKeys)[iPrimaryKeyIndex]!=0;
359 iPrimaryKeyIndex++) {
360 if((*tpPKeys)[iPrimaryKeyIndex]==iColumnCounter) {
365 if( iIsPrimaryKey ? (eKeyUsage!=PRIMARY) : (eKeyUsage!=NONPRIMARY)) {
369 #if defined DEBUG_OUTPUT
370 elog(NOTICE,"Skipping column");
374 } /* KeyUsage!=ALL */
375 cpFieldName = DatumGetPointer(NameGetDatum(&tTupleDesc->attrs
376 [iColumnCounter-1]->attname));
377 #if defined DEBUG_OUTPUT
378 elog(NOTICE,cpFieldName);
380 while(iDataBlockSize - iUsedDataBlock < strlen(cpFieldName) +4) {
381 cpDataBlock = SPI_repalloc(cpDataBlock,iDataBlockSize + BUFFER_SIZE);
382 iDataBlockSize = iDataBlockSize + BUFFER_SIZE;
384 sprintf(cpDataBlock+iUsedDataBlock,"\"%s\"=",cpFieldName);
385 iUsedDataBlock = iUsedDataBlock + strlen(cpFieldName)+3;
386 cpFieldData=SPI_getvalue(tTupleData,tTupleDesc,iColumnCounter);
388 cpUnFormatedPtr = cpFieldData;
389 cpFormatedPtr = cpDataBlock + iUsedDataBlock;
390 if(cpFieldData!=NULL) {
402 #if defined DEBUG_OUTPUT
403 elog(NOTICE,cpFieldData);
404 elog(NOTICE,"Starting format loop");
406 while(*cpUnFormatedPtr!=0) {
407 while(iDataBlockSize - iUsedDataBlock < 2) {
408 cpDataBlock = SPI_repalloc(cpDataBlock,iDataBlockSize+BUFFER_SIZE);
409 iDataBlockSize = iDataBlockSize + BUFFER_SIZE;
410 cpFormatedPtr = cpDataBlock + iUsedDataBlock;
412 if(*cpUnFormatedPtr=='\\' || *cpUnFormatedPtr=='\'') {
417 *cpFormatedPtr=*cpUnFormatedPtr;
423 SPI_pfree(cpFieldData);
425 while(iDataBlockSize - iUsedDataBlock < 3) {
426 cpDataBlock = SPI_repalloc(cpDataBlock,iDataBlockSize+BUFFER_SIZE);
427 iDataBlockSize = iDataBlockSize + BUFFER_SIZE;
428 cpFormatedPtr = cpDataBlock + iUsedDataBlock;
430 sprintf(cpFormatedPtr,"' ");
431 iUsedDataBlock = iUsedDataBlock +2;
432 #if defined DEBUG_OUTPUT
433 elog(NOTICE,cpDataBlock);
436 } /* for iColumnCounter */
440 #if defined DEBUG_OUTPUT
441 elog(NOTICE,"Returning");
443 memset(cpDataBlock + iUsedDataBlock,0,iDataBlockSize - iUsedDataBlock);