OSDN Git Service

d71278ebd729f682f6b1220cd0f43a2334c66635
[pg-rex/syncrep.git] / src / backend / executor / nodeLockRows.c
1 /*-------------------------------------------------------------------------
2  *
3  * nodeLockRows.c
4  *        Routines to handle FOR UPDATE/FOR SHARE row locking
5  *
6  * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/executor/nodeLockRows.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 /*
16  * INTERFACE ROUTINES
17  *              ExecLockRows            - fetch locked rows
18  *              ExecInitLockRows        - initialize node and subnodes..
19  *              ExecEndLockRows         - shutdown node and subnodes
20  */
21
22 #include "postgres.h"
23
24 #include "access/xact.h"
25 #include "executor/executor.h"
26 #include "executor/nodeLockRows.h"
27 #include "storage/bufmgr.h"
28 #include "utils/tqual.h"
29
30
31 /* ----------------------------------------------------------------
32  *              ExecLockRows
33  * ----------------------------------------------------------------
34  */
35 TupleTableSlot *                                /* return: a tuple or NULL */
36 ExecLockRows(LockRowsState *node)
37 {
38         TupleTableSlot *slot;
39         EState     *estate;
40         PlanState  *outerPlan;
41         bool            epq_started;
42         ListCell   *lc;
43
44         /*
45          * get information from the node
46          */
47         estate = node->ps.state;
48         outerPlan = outerPlanState(node);
49
50         /*
51          * Get next tuple from subplan, if any.
52          */
53 lnext:
54         slot = ExecProcNode(outerPlan);
55
56         if (TupIsNull(slot))
57                 return NULL;
58
59         /*
60          * Attempt to lock the source tuple(s).  (Note we only have locking
61          * rowmarks in lr_arowMarks.)
62          */
63         epq_started = false;
64         foreach(lc, node->lr_arowMarks)
65         {
66                 ExecAuxRowMark *aerm = (ExecAuxRowMark *) lfirst(lc);
67                 ExecRowMark *erm = aerm->rowmark;
68                 Datum           datum;
69                 bool            isNull;
70                 HeapTupleData tuple;
71                 Buffer          buffer;
72                 ItemPointerData update_ctid;
73                 TransactionId update_xmax;
74                 LockTupleMode lockmode;
75                 HTSU_Result test;
76                 HeapTuple       copyTuple;
77
78                 /* clear any leftover test tuple for this rel */
79                 if (node->lr_epqstate.estate != NULL)
80                         EvalPlanQualSetTuple(&node->lr_epqstate, erm->rti, NULL);
81
82                 /* if child rel, must check whether it produced this row */
83                 if (erm->rti != erm->prti)
84                 {
85                         Oid                     tableoid;
86
87                         datum = ExecGetJunkAttribute(slot,
88                                                                                  aerm->toidAttNo,
89                                                                                  &isNull);
90                         /* shouldn't ever get a null result... */
91                         if (isNull)
92                                 elog(ERROR, "tableoid is NULL");
93                         tableoid = DatumGetObjectId(datum);
94
95                         if (tableoid != RelationGetRelid(erm->relation))
96                         {
97                                 /* this child is inactive right now */
98                                 ItemPointerSetInvalid(&(erm->curCtid));
99                                 continue;
100                         }
101                 }
102
103                 /* fetch the tuple's ctid */
104                 datum = ExecGetJunkAttribute(slot,
105                                                                          aerm->ctidAttNo,
106                                                                          &isNull);
107                 /* shouldn't ever get a null result... */
108                 if (isNull)
109                         elog(ERROR, "ctid is NULL");
110                 tuple.t_self = *((ItemPointer) DatumGetPointer(datum));
111
112                 /* okay, try to lock the tuple */
113                 if (erm->markType == ROW_MARK_EXCLUSIVE)
114                         lockmode = LockTupleExclusive;
115                 else
116                         lockmode = LockTupleShared;
117
118                 test = heap_lock_tuple(erm->relation, &tuple, &buffer,
119                                                            &update_ctid, &update_xmax,
120                                                            estate->es_output_cid,
121                                                            lockmode, erm->noWait);
122                 ReleaseBuffer(buffer);
123                 switch (test)
124                 {
125                         case HeapTupleSelfUpdated:
126                                 /* treat it as deleted; do not process */
127                                 goto lnext;
128
129                         case HeapTupleMayBeUpdated:
130                                 /* got the lock successfully */
131                                 break;
132
133                         case HeapTupleUpdated:
134                                 if (IsolationUsesXactSnapshot())
135                                         ereport(ERROR,
136                                                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
137                                                          errmsg("could not serialize access due to concurrent update")));
138                                 if (ItemPointerEquals(&update_ctid,
139                                                                           &tuple.t_self))
140                                 {
141                                         /* Tuple was deleted, so don't return it */
142                                         goto lnext;
143                                 }
144
145                                 /* updated, so fetch and lock the updated version */
146                                 copyTuple = EvalPlanQualFetch(estate, erm->relation, lockmode,
147                                                                                           &update_ctid, update_xmax);
148
149                                 if (copyTuple == NULL)
150                                 {
151                                         /* Tuple was deleted, so don't return it */
152                                         goto lnext;
153                                 }
154                                 /* remember the actually locked tuple's TID */
155                                 tuple.t_self = copyTuple->t_self;
156
157                                 /*
158                                  * Need to run a recheck subquery.      Initialize EPQ state if we
159                                  * didn't do so already.
160                                  */
161                                 if (!epq_started)
162                                 {
163                                         EvalPlanQualBegin(&node->lr_epqstate, estate);
164                                         epq_started = true;
165                                 }
166
167                                 /* Store target tuple for relation's scan node */
168                                 EvalPlanQualSetTuple(&node->lr_epqstate, erm->rti, copyTuple);
169
170                                 /* Continue loop until we have all target tuples */
171                                 break;
172
173                         default:
174                                 elog(ERROR, "unrecognized heap_lock_tuple status: %u",
175                                          test);
176                 }
177
178                 /* Remember locked tuple's TID for WHERE CURRENT OF */
179                 erm->curCtid = tuple.t_self;
180         }
181
182         /*
183          * If we need to do EvalPlanQual testing, do so.
184          */
185         if (epq_started)
186         {
187                 /*
188                  * First, fetch a copy of any rows that were successfully locked
189                  * without any update having occurred.  (We do this in a separate pass
190                  * so as to avoid overhead in the common case where there are no
191                  * concurrent updates.)
192                  */
193                 foreach(lc, node->lr_arowMarks)
194                 {
195                         ExecAuxRowMark *aerm = (ExecAuxRowMark *) lfirst(lc);
196                         ExecRowMark *erm = aerm->rowmark;
197                         HeapTupleData tuple;
198                         Buffer          buffer;
199
200                         /* ignore non-active child tables */
201                         if (!ItemPointerIsValid(&(erm->curCtid)))
202                         {
203                                 Assert(erm->rti != erm->prti);  /* check it's child table */
204                                 continue;
205                         }
206
207                         if (EvalPlanQualGetTuple(&node->lr_epqstate, erm->rti) != NULL)
208                                 continue;               /* it was updated and fetched above */
209
210                         /* okay, fetch the tuple */
211                         tuple.t_self = erm->curCtid;
212                         if (!heap_fetch(erm->relation, SnapshotAny, &tuple, &buffer,
213                                                         false, NULL))
214                                 elog(ERROR, "failed to fetch tuple for EvalPlanQual recheck");
215
216                         /* successful, copy and store tuple */
217                         EvalPlanQualSetTuple(&node->lr_epqstate, erm->rti,
218                                                                  heap_copytuple(&tuple));
219                         ReleaseBuffer(buffer);
220                 }
221
222                 /*
223                  * Now fetch any non-locked source rows --- the EPQ logic knows how to
224                  * do that.
225                  */
226                 EvalPlanQualSetSlot(&node->lr_epqstate, slot);
227                 EvalPlanQualFetchRowMarks(&node->lr_epqstate);
228
229                 /*
230                  * And finally we can re-evaluate the tuple.
231                  */
232                 slot = EvalPlanQualNext(&node->lr_epqstate);
233                 if (TupIsNull(slot))
234                 {
235                         /* Updated tuple fails qual, so ignore it and go on */
236                         goto lnext;
237                 }
238         }
239
240         /* Got all locks, so return the current tuple */
241         return slot;
242 }
243
244 /* ----------------------------------------------------------------
245  *              ExecInitLockRows
246  *
247  *              This initializes the LockRows node state structures and
248  *              the node's subplan.
249  * ----------------------------------------------------------------
250  */
251 LockRowsState *
252 ExecInitLockRows(LockRows *node, EState *estate, int eflags)
253 {
254         LockRowsState *lrstate;
255         Plan       *outerPlan = outerPlan(node);
256         List       *epq_arowmarks;
257         ListCell   *lc;
258
259         /* check for unsupported flags */
260         Assert(!(eflags & EXEC_FLAG_MARK));
261
262         /*
263          * create state structure
264          */
265         lrstate = makeNode(LockRowsState);
266         lrstate->ps.plan = (Plan *) node;
267         lrstate->ps.state = estate;
268
269         /*
270          * Miscellaneous initialization
271          *
272          * LockRows nodes never call ExecQual or ExecProject.
273          */
274
275         /*
276          * Tuple table initialization (XXX not actually used...)
277          */
278         ExecInitResultTupleSlot(estate, &lrstate->ps);
279
280         /*
281          * then initialize outer plan
282          */
283         outerPlanState(lrstate) = ExecInitNode(outerPlan, estate, eflags);
284
285         /*
286          * LockRows nodes do no projections, so initialize projection info for
287          * this node appropriately
288          */
289         ExecAssignResultTypeFromTL(&lrstate->ps);
290         lrstate->ps.ps_ProjInfo = NULL;
291
292         /*
293          * Locate the ExecRowMark(s) that this node is responsible for, and
294          * construct ExecAuxRowMarks for them.  (InitPlan should already have
295          * built the global list of ExecRowMarks.)
296          */
297         lrstate->lr_arowMarks = NIL;
298         epq_arowmarks = NIL;
299         foreach(lc, node->rowMarks)
300         {
301                 PlanRowMark *rc = (PlanRowMark *) lfirst(lc);
302                 ExecRowMark *erm;
303                 ExecAuxRowMark *aerm;
304
305                 Assert(IsA(rc, PlanRowMark));
306
307                 /* ignore "parent" rowmarks; they are irrelevant at runtime */
308                 if (rc->isParent)
309                         continue;
310
311                 /* find ExecRowMark and build ExecAuxRowMark */
312                 erm = ExecFindRowMark(estate, rc->rti);
313                 aerm = ExecBuildAuxRowMark(erm, outerPlan->targetlist);
314
315                 /*
316                  * Only locking rowmarks go into our own list.  Non-locking marks are
317                  * passed off to the EvalPlanQual machinery.  This is because we don't
318                  * want to bother fetching non-locked rows unless we actually have to
319                  * do an EPQ recheck.
320                  */
321                 if (RowMarkRequiresRowShareLock(erm->markType))
322                         lrstate->lr_arowMarks = lappend(lrstate->lr_arowMarks, aerm);
323                 else
324                         epq_arowmarks = lappend(epq_arowmarks, aerm);
325         }
326
327         /* Now we have the info needed to set up EPQ state */
328         EvalPlanQualInit(&lrstate->lr_epqstate, estate,
329                                          outerPlan, epq_arowmarks, node->epqParam);
330
331         return lrstate;
332 }
333
334 /* ----------------------------------------------------------------
335  *              ExecEndLockRows
336  *
337  *              This shuts down the subplan and frees resources allocated
338  *              to this node.
339  * ----------------------------------------------------------------
340  */
341 void
342 ExecEndLockRows(LockRowsState *node)
343 {
344         EvalPlanQualEnd(&node->lr_epqstate);
345         ExecEndNode(outerPlanState(node));
346 }
347
348
349 void
350 ExecReScanLockRows(LockRowsState *node)
351 {
352         /*
353          * if chgParam of subnode is not null then plan will be re-scanned by
354          * first ExecProcNode.
355          */
356         if (node->ps.lefttree->chgParam == NULL)
357                 ExecReScan(node->ps.lefttree);
358 }