OSDN Git Service

a9027dbe502c8ffc8ef67d2762ab0f8f02c04152
[pg-rex/syncrep.git] / contrib / dbmirror / pending.c
1 /****************************************************************************
2  * pending.c
3  * $Id: pending.c,v 1.1 2002/06/23 21:58:08 momjian Exp $ 
4  *
5  * This file contains a trigger for Postgresql-7.x to record changes to tables
6  * to a pending table for mirroring.
7  * All tables that should be mirrored should have this trigger hooked up to it.
8  *
9  *   Written by Steven Singer (ssinger@navtechinc.com)
10  *   (c) 2001-2002 Navtech Systems Support Inc.
11  *   Released under the GNU Public License version 2. See COPYING.
12  *
13  *
14  *   This program is distributed in the hope that it will be useful,
15  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
16  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  *   GNU General Public License for more details.
18  *
19  *
20  ***************************************************************************/
21 #include <executor/spi.h>
22 #include <commands/trigger.h>
23 #include <postgres.h>
24
25 enum FieldUsage  {PRIMARY=0,NONPRIMARY,ALL,NUM_FIELDUSAGE};
26
27 int storePending(char * cpTableName, HeapTuple  tBeforeTuple, 
28                  HeapTuple tAfterTuple,
29                   TupleDesc tTupdesc,
30                  TriggerData * tpTrigdata,char cOp);
31
32 int storeKeyInfo(char * cpTableName, HeapTuple tTupleData, TupleDesc tTuplDesc,
33                  TriggerData * tpTrigdata);
34 int storeData(char * cpTableName,HeapTuple tTupleData,TupleDesc tTupleDesc,
35               TriggerData * tpTrigData,int iIncludeKeyData);
36      
37 int2vector * getPrimaryKey(Oid tblOid);
38
39 char * packageData(HeapTuple tTupleData, TupleDesc tTupleDecs,
40                    TriggerData * tTrigData,
41                    enum FieldUsage eKeyUsage );
42
43 #define BUFFER_SIZE 256
44 #define MAX_OID_LEN 10
45
46
47 extern Datum recordchange(PG_FUNCTION_ARGS);
48 PG_FUNCTION_INFO_V1(recordchange);
49
50
51 /*****************************************************************************
52  * The entry point for the trigger function.
53  * The Trigger takes a single SQL 'text' argument indicating the name of the
54  * table the trigger was applied to.  If this name is incorrect so will the
55  * mirroring.
56  ****************************************************************************/
57 Datum recordchange(PG_FUNCTION_ARGS) {
58   TriggerData * trigdata; 
59   TupleDesc tupdesc;
60   HeapTuple beforeTuple=NULL;
61   HeapTuple afterTuple=NULL;
62   HeapTuple retTuple=NULL;
63   char * tblname;
64   char op;
65   if(fcinfo->context!=NULL) {
66     
67     if(SPI_connect() < 0) {
68       elog(NOTICE,"storePending could not connect to SPI");
69       return -1;
70     }
71     trigdata = (TriggerData*)fcinfo->context;
72     /* Extract the table name */
73     tblname = SPI_getrelname(trigdata->tg_relation);
74     tupdesc = trigdata->tg_relation->rd_att;
75     if(TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event)) {
76       retTuple = trigdata->tg_newtuple;
77       beforeTuple = trigdata->tg_trigtuple;
78       afterTuple = trigdata->tg_newtuple;
79       op='u';
80
81     }
82     else if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event)) {
83       retTuple = trigdata->tg_trigtuple;
84       afterTuple = trigdata->tg_trigtuple;
85       op = 'i';
86     }
87     else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event)) {
88       retTuple = trigdata->tg_trigtuple;
89       beforeTuple = trigdata->tg_trigtuple;
90       op = 'd';
91     }
92
93     if(storePending(tblname,beforeTuple,afterTuple,tupdesc,trigdata,op)) {
94       /* An error occoured. Skip the operation. */
95       elog(ERROR,"Operation could not be mirrored");
96       return PointerGetDatum(NULL);
97          
98     }
99 #if defined DEBUG_OUTPUT
100     elog(NOTICE,"Returning on success");
101 #endif
102     SPI_finish();
103     return PointerGetDatum(retTuple);
104   }
105   else {
106     /*
107      * Not being called as a trigger.
108      */
109     return PointerGetDatum(NULL);
110   }
111 }
112
113
114 /*****************************************************************************
115  * Constructs and executes an SQL query to write a record of this tuple change
116  * to the pending table. 
117  *****************************************************************************/
118 int storePending(char * cpTableName, HeapTuple  tBeforeTuple, 
119                  HeapTuple tAfterTuple,
120                   TupleDesc tTupDesc,
121                   TriggerData * tpTrigData,char cOp) {
122   char * cpQueryBase = "INSERT INTO \"Pending\" (\"TableName\",\"Op\",\"XID\") VALUES ($1,$2,$3)";
123
124   int iResult=0;
125   HeapTuple tCurTuple; // Points the current tuple(before or after)
126   Datum saPlanData[4];
127   Oid taPlanArgTypes[3] = {NAMEOID,CHAROID,INT4OID};
128   void * vpPlan;
129
130   tCurTuple = tBeforeTuple ? tBeforeTuple : tAfterTuple;
131
132   
133
134   
135   vpPlan = SPI_prepare(cpQueryBase,3,taPlanArgTypes);
136   if(vpPlan==NULL) {
137     elog(NOTICE,"Error creating plan");
138   }
139   //  SPI_saveplan(vpPlan);
140
141   saPlanData[0] = PointerGetDatum(cpTableName);
142   saPlanData[1] = CharGetDatum(cOp);
143   saPlanData[2] = Int32GetDatum(GetCurrentTransactionId());
144   
145   
146   iResult = SPI_execp(vpPlan,saPlanData,NULL,1);
147   if(iResult < 0) {
148     elog(NOTICE,"storedPending fired (%s) returned %d",cpQueryBase,iResult);
149   }
150   
151
152 #if defined DEBUG_OUTPUT
153  elog(NOTICE,"row successfully stored in pending table");
154 #endif
155
156   if(cOp=='d') {
157     /**
158      * This is a record of a delete operation.
159      * Just store the key data.
160      */
161     iResult = storeKeyInfo(cpTableName,tBeforeTuple,tTupDesc,tpTrigData);
162   }
163   else if (cOp=='i') {
164     /**
165      * An Insert operation.
166      * Store all data
167      */
168     iResult = storeData(cpTableName,tAfterTuple,tTupDesc,tpTrigData,TRUE);
169
170   }
171   else {
172     /* op must be an update. */
173     iResult = storeKeyInfo(cpTableName,tBeforeTuple,tTupDesc,tpTrigData);
174     iResult = iResult ? iResult : storeData(cpTableName,tAfterTuple,tTupDesc,
175                                             tpTrigData,TRUE);
176   }
177
178 #if defined DEBUG_OUTPUT
179   elog(NOTICE,"DOne storing keyinfo");
180 #endif
181  
182   return iResult;
183
184 }
185
186 int storeKeyInfo(char * cpTableName, HeapTuple tTupleData, 
187                  TupleDesc tTupleDesc,
188                  TriggerData * tpTrigData) {
189  
190   Oid saPlanArgTypes[1] = {NAMEOID};
191   char * insQuery = "INSERT INTO \"PendingData\" (\"SeqId\",\"IsKey\",\"Data\") VALUES(currval('\"Pending_SeqId_seq\"'),'t',$1)";
192   void * pplan;
193   Datum saPlanData[1];
194   char * cpKeyData;
195   int iRetCode;
196
197   pplan = SPI_prepare(insQuery,1,saPlanArgTypes);
198     if(pplan==NULL) {
199       elog(NOTICE,"Could not prepare INSERT plan");
200       return -1;
201     }
202     
203     //    pplan = SPI_saveplan(pplan);
204     cpKeyData = packageData(tTupleData, tTupleDesc,tpTrigData,PRIMARY);
205 #if defined DEBUG_OUTPUT
206     elog(NOTICE,cpKeyData);
207 #endif
208     saPlanData[0] = PointerGetDatum(cpKeyData);
209     
210     iRetCode = SPI_execp(pplan,saPlanData,NULL,1);    
211
212     if(cpKeyData!=NULL) {
213       SPI_pfree(cpKeyData);
214     }
215
216     if(iRetCode != SPI_OK_INSERT ) {
217       elog(NOTICE,"Error inserting row in pendingDelete");
218       return -1;
219     }
220 #if defined DEBUG_OUTPUT
221     elog(NOTICE,"INSERT SUCCESFULL");
222 #endif
223
224     return 0;
225     
226 }
227
228
229
230
231 int2vector * getPrimaryKey(Oid tblOid) {
232   char * queryBase;
233   char * query;
234   bool isNull;
235   int2vector * resultKey;
236   int2vector * tpResultKey;
237   HeapTuple resTuple;
238   Datum resDatum;
239   int ret;
240   queryBase = "SELECT indkey FROM pg_index WHERE indisprimary='t' AND indrelid=";
241   query = SPI_palloc(strlen(queryBase) + MAX_OID_LEN+1);
242   sprintf(query,"%s%d",queryBase,tblOid);
243   ret = SPI_exec(query,1);
244   if(ret != SPI_OK_SELECT || SPI_processed != 1 ) {
245     elog(NOTICE,"Could not select primary index key");
246     return NULL;
247   }
248
249   resTuple = SPI_tuptable->vals[0];
250   resDatum = SPI_getbinval(resTuple,SPI_tuptable->tupdesc,1,&isNull);
251
252   tpResultKey = (int2vector*) DatumGetPointer(resDatum);
253   resultKey = SPI_palloc(sizeof(int2vector));
254   memcpy(resultKey,tpResultKey,sizeof(int2vector));
255
256   SPI_pfree(query);
257   return resultKey;
258 }
259
260 /******************************************************************************
261  * Stores a copy of the non-key data for the row.
262  *****************************************************************************/
263 int storeData(char * cpTableName,HeapTuple tTupleData,TupleDesc tTupleDesc,
264                      TriggerData * tpTrigData,int iIncludeKeyData) {
265
266   Oid planArgTypes[1] = {NAMEOID};
267   char * insQuery = "INSERT INTO \"PendingData\" (\"SeqId\",\"IsKey\",\"Data\") VALUES(currval('\"Pending_SeqId_seq\"'),'f',$1)";
268   void * pplan;
269   Datum planData[1];
270   char * cpKeyData;
271   int iRetValue;
272
273   pplan = SPI_prepare(insQuery,1,planArgTypes);
274     if(pplan==NULL) {
275       elog(NOTICE,"Could not prepare INSERT plan");
276       return -1;
277     }
278     
279     //    pplan = SPI_saveplan(pplan);
280     if(iIncludeKeyData==0) {
281       cpKeyData = packageData(tTupleData, tTupleDesc,tpTrigData,NONPRIMARY); 
282     }
283     else {
284       cpKeyData = packageData(tTupleData,tTupleDesc,tpTrigData,ALL);
285     }
286     
287     planData[0] = PointerGetDatum(cpKeyData);
288     iRetValue = SPI_execp(pplan,planData,NULL,1);
289     
290     if(cpKeyData!=0) {
291       SPI_pfree(cpKeyData);
292     }
293
294     if(iRetValue != SPI_OK_INSERT ) {
295       elog(NOTICE,"Error inserting row in pendingDelete");
296       return -1;
297     }
298 #if defined DEBUG_OUTPUT
299     elog(NOTICE,"INSERT SUCCESFULL");
300 #endif
301
302     return 0;  
303     
304 }
305
306 /**
307  * Packages the data in tTupleData into a string of the format 
308  * FieldName='value text'  where any quotes inside of value text
309  * are escaped with a backslash and any backslashes in value text
310  * are esacped by a second back slash.
311  *
312  * tTupleDesc should be a description of the tuple stored in 
313  * tTupleData.  
314  *
315  * eFieldUsage specifies which fields to use.
316  *  PRIMARY implies include only primary key fields.
317  *  NONPRIMARY implies include only non-primary key fields.
318  *  ALL implies include all fields.
319  */
320 char * packageData(HeapTuple tTupleData, TupleDesc tTupleDesc,
321                    TriggerData *  tpTrigData,
322                    enum FieldUsage eKeyUsage ) {
323   int iNumCols;
324   int2vector * tpPKeys=NULL;
325   int iColumnCounter;
326   char * cpDataBlock;
327   int iDataBlockSize;
328   int  iUsedDataBlock;
329   
330   iNumCols = tTupleDesc->natts;
331
332   if(eKeyUsage!=ALL) {
333     tpPKeys = getPrimaryKey(tpTrigData->tg_relation->rd_id);
334     if(tpPKeys==NULL) {
335       return NULL;
336     }
337   }
338 #if defined DEBUG_OUTPUT
339   if(tpPKeys!=NULL) {
340     elog(NOTICE,"Have primary keys");
341   }
342 #endif 
343   cpDataBlock = SPI_palloc(BUFFER_SIZE);
344   iDataBlockSize = BUFFER_SIZE;
345   iUsedDataBlock = 0;                   /* To account for the null */
346
347   for(iColumnCounter=1; iColumnCounter <=iNumCols; iColumnCounter++) {
348     int iIsPrimaryKey;
349     int iPrimaryKeyIndex;
350     char * cpUnFormatedPtr;
351     char * cpFormatedPtr;
352
353     char * cpFieldName;
354     char * cpFieldData;
355     if(eKeyUsage!=ALL) {
356       //Determine if this is a primary key or not.
357       iIsPrimaryKey=0;
358       for(iPrimaryKeyIndex=0; (*tpPKeys)[iPrimaryKeyIndex]!=0;
359           iPrimaryKeyIndex++) {
360         if((*tpPKeys)[iPrimaryKeyIndex]==iColumnCounter) {      
361           iIsPrimaryKey=1;
362           break;
363         }
364       }
365       if( iIsPrimaryKey ? (eKeyUsage!=PRIMARY) : (eKeyUsage!=NONPRIMARY)) {
366         /**
367          * Don't use.
368          */
369 #if defined DEBUG_OUTPUT
370         elog(NOTICE,"Skipping column");
371 #endif
372         continue;
373       }
374     }                                   /* KeyUsage!=ALL */
375     cpFieldName = DatumGetPointer(NameGetDatum(&tTupleDesc->attrs
376                                                [iColumnCounter-1]->attname));
377 #if defined DEBUG_OUTPUT
378     elog(NOTICE,cpFieldName);
379 #endif
380     while(iDataBlockSize - iUsedDataBlock < strlen(cpFieldName) +4) {
381       cpDataBlock = SPI_repalloc(cpDataBlock,iDataBlockSize + BUFFER_SIZE);
382       iDataBlockSize = iDataBlockSize + BUFFER_SIZE;      
383     }
384     sprintf(cpDataBlock+iUsedDataBlock,"\"%s\"=",cpFieldName);
385     iUsedDataBlock = iUsedDataBlock + strlen(cpFieldName)+3;
386     cpFieldData=SPI_getvalue(tTupleData,tTupleDesc,iColumnCounter);
387     
388     cpUnFormatedPtr = cpFieldData;
389     cpFormatedPtr = cpDataBlock + iUsedDataBlock;
390     if(cpFieldData!=NULL) {
391       *cpFormatedPtr='\'';
392       iUsedDataBlock++;
393       cpFormatedPtr++;
394     }
395     else {
396       *cpFormatedPtr=' ';
397       iUsedDataBlock++;
398       cpFormatedPtr++;
399       continue;
400       
401     }
402 #if defined DEBUG_OUTPUT
403     elog(NOTICE,cpFieldData);
404     elog(NOTICE,"Starting format loop");
405 #endif
406     while(*cpUnFormatedPtr!=0) {
407       while(iDataBlockSize - iUsedDataBlock < 2) {
408         cpDataBlock = SPI_repalloc(cpDataBlock,iDataBlockSize+BUFFER_SIZE);
409         iDataBlockSize = iDataBlockSize + BUFFER_SIZE;
410         cpFormatedPtr = cpDataBlock + iUsedDataBlock;
411       }
412       if(*cpUnFormatedPtr=='\\' || *cpUnFormatedPtr=='\'') {
413         *cpFormatedPtr='\\';
414         cpFormatedPtr++;
415         iUsedDataBlock++;
416       }
417       *cpFormatedPtr=*cpUnFormatedPtr;
418       cpFormatedPtr++;
419       cpUnFormatedPtr++;
420       iUsedDataBlock++;
421     }
422
423     SPI_pfree(cpFieldData);
424
425     while(iDataBlockSize - iUsedDataBlock < 3) {
426       cpDataBlock = SPI_repalloc(cpDataBlock,iDataBlockSize+BUFFER_SIZE);
427       iDataBlockSize = iDataBlockSize + BUFFER_SIZE;
428       cpFormatedPtr = cpDataBlock + iUsedDataBlock;
429     }
430     sprintf(cpFormatedPtr,"' ");
431     iUsedDataBlock = iUsedDataBlock +2;
432 #if defined DEBUG_OUTPUT
433     elog(NOTICE,cpDataBlock);
434 #endif
435     
436   }                                     /*  for iColumnCounter  */
437   if(tpPKeys!=NULL) {
438     SPI_pfree(tpPKeys);    
439   }
440 #if defined DEBUG_OUTPUT
441   elog(NOTICE,"Returning");
442 #endif
443   memset(cpDataBlock + iUsedDataBlock,0,iDataBlockSize - iUsedDataBlock);
444
445   return cpDataBlock;
446   
447 }