OSDN Git Service

Prevents query conflict from suspending a failover.
[pg-rex/syncrep.git] / src / backend / access / heap / pruneheap.c
1 /*-------------------------------------------------------------------------
2  *
3  * pruneheap.c
4  *        heap page pruning and HOT-chain management code
5  *
6  * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/access/heap/pruneheap.c,v 1.25 2010/07/06 19:18:55 momjian Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "access/heapam.h"
18 #include "access/htup.h"
19 #include "access/transam.h"
20 #include "miscadmin.h"
21 #include "pgstat.h"
22 #include "storage/bufmgr.h"
23 #include "storage/off.h"
24 #include "utils/rel.h"
25 #include "utils/tqual.h"
26
27
28 /* Working data for heap_page_prune and subroutines */
29 typedef struct
30 {
31         TransactionId new_prune_xid;    /* new prune hint value for page */
32         TransactionId latestRemovedXid;         /* latest xid to be removed by this
33                                                                                  * prune */
34         int                     nredirected;    /* numbers of entries in arrays below */
35         int                     ndead;
36         int                     nunused;
37         /* arrays that accumulate indexes of items to be changed */
38         OffsetNumber redirected[MaxHeapTuplesPerPage * 2];
39         OffsetNumber nowdead[MaxHeapTuplesPerPage];
40         OffsetNumber nowunused[MaxHeapTuplesPerPage];
41         /* marked[i] is TRUE if item i is entered in one of the above arrays */
42         bool            marked[MaxHeapTuplesPerPage + 1];
43 } PruneState;
44
45 /* Local functions */
46 static int heap_prune_chain(Relation relation, Buffer buffer,
47                                  OffsetNumber rootoffnum,
48                                  TransactionId OldestXmin,
49                                  PruneState *prstate);
50 static void heap_prune_record_prunable(PruneState *prstate, TransactionId xid);
51 static void heap_prune_record_redirect(PruneState *prstate,
52                                                    OffsetNumber offnum, OffsetNumber rdoffnum);
53 static void heap_prune_record_dead(PruneState *prstate, OffsetNumber offnum);
54 static void heap_prune_record_unused(PruneState *prstate, OffsetNumber offnum);
55
56
57 /*
58  * Optionally prune and repair fragmentation in the specified page.
59  *
60  * This is an opportunistic function.  It will perform housekeeping
61  * only if the page heuristically looks like a candidate for pruning and we
62  * can acquire buffer cleanup lock without blocking.
63  *
64  * Note: this is called quite often.  It's important that it fall out quickly
65  * if there's not any use in pruning.
66  *
67  * Caller must have pin on the buffer, and must *not* have a lock on it.
68  *
69  * OldestXmin is the cutoff XID used to distinguish whether tuples are DEAD
70  * or RECENTLY_DEAD (see HeapTupleSatisfiesVacuum).
71  */
72 void
73 heap_page_prune_opt(Relation relation, Buffer buffer, TransactionId OldestXmin)
74 {
75         Page            page = BufferGetPage(buffer);
76         Size            minfree;
77
78         /*
79          * Let's see if we really need pruning.
80          *
81          * Forget it if page is not hinted to contain something prunable that's
82          * older than OldestXmin.
83          */
84         if (!PageIsPrunable(page, OldestXmin))
85                 return;
86
87         /*
88          * We can't write WAL in recovery mode, so there's no point trying to
89          * clean the page. The master will likely issue a cleaning WAL record soon
90          * anyway, so this is no particular loss.
91          */
92         if (RecoveryInProgress())
93                 return;
94
95         /*
96          * We prune when a previous UPDATE failed to find enough space on the page
97          * for a new tuple version, or when free space falls below the relation's
98          * fill-factor target (but not less than 10%).
99          *
100          * Checking free space here is questionable since we aren't holding any
101          * lock on the buffer; in the worst case we could get a bogus answer. It's
102          * unlikely to be *seriously* wrong, though, since reading either pd_lower
103          * or pd_upper is probably atomic.      Avoiding taking a lock seems more
104          * important than sometimes getting a wrong answer in what is after all
105          * just a heuristic estimate.
106          */
107         minfree = RelationGetTargetPageFreeSpace(relation,
108                                                                                          HEAP_DEFAULT_FILLFACTOR);
109         minfree = Max(minfree, BLCKSZ / 10);
110
111         if (PageIsFull(page) || PageGetHeapFreeSpace(page) < minfree)
112         {
113                 /* OK, try to get exclusive buffer lock */
114                 if (!ConditionalLockBufferForCleanup(buffer))
115                         return;
116
117                 /*
118                  * Now that we have buffer lock, get accurate information about the
119                  * page's free space, and recheck the heuristic about whether to
120                  * prune. (We needn't recheck PageIsPrunable, since no one else could
121                  * have pruned while we hold pin.)
122                  */
123                 if (PageIsFull(page) || PageGetHeapFreeSpace(page) < minfree)
124                 {
125                         TransactionId ignore = InvalidTransactionId;            /* return value not
126                                                                                                                                  * needed */
127
128                         /* OK to prune */
129                         (void) heap_page_prune(relation, buffer, OldestXmin, true, &ignore);
130                 }
131
132                 /* And release buffer lock */
133                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
134         }
135 }
136
137
138 /*
139  * Prune and repair fragmentation in the specified page.
140  *
141  * Caller must have pin and buffer cleanup lock on the page.
142  *
143  * OldestXmin is the cutoff XID used to distinguish whether tuples are DEAD
144  * or RECENTLY_DEAD (see HeapTupleSatisfiesVacuum).
145  *
146  * If report_stats is true then we send the number of reclaimed heap-only
147  * tuples to pgstats.  (This must be FALSE during vacuum, since vacuum will
148  * send its own new total to pgstats, and we don't want this delta applied
149  * on top of that.)
150  *
151  * Returns the number of tuples deleted from the page and sets
152  * latestRemovedXid.
153  */
154 int
155 heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
156                                 bool report_stats, TransactionId *latestRemovedXid)
157 {
158         int                     ndeleted = 0;
159         Page            page = BufferGetPage(buffer);
160         OffsetNumber offnum,
161                                 maxoff;
162         PruneState      prstate;
163
164         /*
165          * Our strategy is to scan the page and make lists of items to change,
166          * then apply the changes within a critical section.  This keeps as much
167          * logic as possible out of the critical section, and also ensures that
168          * WAL replay will work the same as the normal case.
169          *
170          * First, initialize the new pd_prune_xid value to zero (indicating no
171          * prunable tuples).  If we find any tuples which may soon become
172          * prunable, we will save the lowest relevant XID in new_prune_xid. Also
173          * initialize the rest of our working state.
174          */
175         prstate.new_prune_xid = InvalidTransactionId;
176         prstate.latestRemovedXid = InvalidTransactionId;
177         prstate.nredirected = prstate.ndead = prstate.nunused = 0;
178         memset(prstate.marked, 0, sizeof(prstate.marked));
179
180         /* Scan the page */
181         maxoff = PageGetMaxOffsetNumber(page);
182         for (offnum = FirstOffsetNumber;
183                  offnum <= maxoff;
184                  offnum = OffsetNumberNext(offnum))
185         {
186                 ItemId          itemid;
187
188                 /* Ignore items already processed as part of an earlier chain */
189                 if (prstate.marked[offnum])
190                         continue;
191
192                 /* Nothing to do if slot is empty or already dead */
193                 itemid = PageGetItemId(page, offnum);
194                 if (!ItemIdIsUsed(itemid) || ItemIdIsDead(itemid))
195                         continue;
196
197                 /* Process this item or chain of items */
198                 ndeleted += heap_prune_chain(relation, buffer, offnum,
199                                                                          OldestXmin,
200                                                                          &prstate);
201         }
202
203         /* Any error while applying the changes is critical */
204         START_CRIT_SECTION();
205
206         /* Have we found any prunable items? */
207         if (prstate.nredirected > 0 || prstate.ndead > 0 || prstate.nunused > 0)
208         {
209                 /*
210                  * Apply the planned item changes, then repair page fragmentation, and
211                  * update the page's hint bit about whether it has free line pointers.
212                  */
213                 heap_page_prune_execute(buffer,
214                                                                 prstate.redirected, prstate.nredirected,
215                                                                 prstate.nowdead, prstate.ndead,
216                                                                 prstate.nowunused, prstate.nunused);
217
218                 /*
219                  * Update the page's pd_prune_xid field to either zero, or the lowest
220                  * XID of any soon-prunable tuple.
221                  */
222                 ((PageHeader) page)->pd_prune_xid = prstate.new_prune_xid;
223
224                 /*
225                  * Also clear the "page is full" flag, since there's no point in
226                  * repeating the prune/defrag process until something else happens to
227                  * the page.
228                  */
229                 PageClearFull(page);
230
231                 MarkBufferDirty(buffer);
232
233                 /*
234                  * Emit a WAL HEAP_CLEAN record showing what we did
235                  */
236                 if (!relation->rd_istemp)
237                 {
238                         XLogRecPtr      recptr;
239
240                         Assert(TransactionIdIsValid(prstate.latestRemovedXid));
241                         recptr = log_heap_clean(relation, buffer,
242                                                                         prstate.redirected, prstate.nredirected,
243                                                                         prstate.nowdead, prstate.ndead,
244                                                                         prstate.nowunused, prstate.nunused,
245                                                                         prstate.latestRemovedXid);
246
247                         PageSetLSN(BufferGetPage(buffer), recptr);
248                         PageSetTLI(BufferGetPage(buffer), ThisTimeLineID);
249                 }
250         }
251         else
252         {
253                 /*
254                  * If we didn't prune anything, but have found a new value for the
255                  * pd_prune_xid field, update it and mark the buffer dirty. This is
256                  * treated as a non-WAL-logged hint.
257                  *
258                  * Also clear the "page is full" flag if it is set, since there's no
259                  * point in repeating the prune/defrag process until something else
260                  * happens to the page.
261                  */
262                 if (((PageHeader) page)->pd_prune_xid != prstate.new_prune_xid ||
263                         PageIsFull(page))
264                 {
265                         ((PageHeader) page)->pd_prune_xid = prstate.new_prune_xid;
266                         PageClearFull(page);
267                         SetBufferCommitInfoNeedsSave(buffer);
268                 }
269         }
270
271         END_CRIT_SECTION();
272
273         /*
274          * If requested, report the number of tuples reclaimed to pgstats. This is
275          * ndeleted minus ndead, because we don't want to count a now-DEAD root
276          * item as a deletion for this purpose.
277          */
278         if (report_stats && ndeleted > prstate.ndead)
279                 pgstat_update_heap_dead_tuples(relation, ndeleted - prstate.ndead);
280
281         *latestRemovedXid = prstate.latestRemovedXid;
282
283         /*
284          * XXX Should we update the FSM information of this page ?
285          *
286          * There are two schools of thought here. We may not want to update FSM
287          * information so that the page is not used for unrelated UPDATEs/INSERTs
288          * and any free space in this page will remain available for further
289          * UPDATEs in *this* page, thus improving chances for doing HOT updates.
290          *
291          * But for a large table and where a page does not receive further UPDATEs
292          * for a long time, we might waste this space by not updating the FSM
293          * information. The relation may get extended and fragmented further.
294          *
295          * One possibility is to leave "fillfactor" worth of space in this page
296          * and update FSM with the remaining space.
297          *
298          * In any case, the current FSM implementation doesn't accept
299          * one-page-at-a-time updates, so this is all academic for now.
300          */
301
302         return ndeleted;
303 }
304
305
306 /*
307  * Prune specified item pointer or a HOT chain originating at that item.
308  *
309  * If the item is an index-referenced tuple (i.e. not a heap-only tuple),
310  * the HOT chain is pruned by removing all DEAD tuples at the start of the HOT
311  * chain.  We also prune any RECENTLY_DEAD tuples preceding a DEAD tuple.
312  * This is OK because a RECENTLY_DEAD tuple preceding a DEAD tuple is really
313  * DEAD, the OldestXmin test is just too coarse to detect it.
314  *
315  * The root line pointer is redirected to the tuple immediately after the
316  * latest DEAD tuple.  If all tuples in the chain are DEAD, the root line
317  * pointer is marked LP_DEAD.  (This includes the case of a DEAD simple
318  * tuple, which we treat as a chain of length 1.)
319  *
320  * OldestXmin is the cutoff XID used to identify dead tuples.
321  *
322  * We don't actually change the page here, except perhaps for hint-bit updates
323  * caused by HeapTupleSatisfiesVacuum.  We just add entries to the arrays in
324  * prstate showing the changes to be made.      Items to be redirected are added
325  * to the redirected[] array (two entries per redirection); items to be set to
326  * LP_DEAD state are added to nowdead[]; and items to be set to LP_UNUSED
327  * state are added to nowunused[].
328  *
329  * Returns the number of tuples (to be) deleted from the page.
330  */
331 static int
332 heap_prune_chain(Relation relation, Buffer buffer, OffsetNumber rootoffnum,
333                                  TransactionId OldestXmin,
334                                  PruneState *prstate)
335 {
336         int                     ndeleted = 0;
337         Page            dp = (Page) BufferGetPage(buffer);
338         TransactionId priorXmax = InvalidTransactionId;
339         ItemId          rootlp;
340         HeapTupleHeader htup;
341         OffsetNumber latestdead = InvalidOffsetNumber,
342                                 maxoff = PageGetMaxOffsetNumber(dp),
343                                 offnum;
344         OffsetNumber chainitems[MaxHeapTuplesPerPage];
345         int                     nchain = 0,
346                                 i;
347
348         rootlp = PageGetItemId(dp, rootoffnum);
349
350         /*
351          * If it's a heap-only tuple, then it is not the start of a HOT chain.
352          */
353         if (ItemIdIsNormal(rootlp))
354         {
355                 htup = (HeapTupleHeader) PageGetItem(dp, rootlp);
356                 if (HeapTupleHeaderIsHeapOnly(htup))
357                 {
358                         /*
359                          * If the tuple is DEAD and doesn't chain to anything else, mark
360                          * it unused immediately.  (If it does chain, we can only remove
361                          * it as part of pruning its chain.)
362                          *
363                          * We need this primarily to handle aborted HOT updates, that is,
364                          * XMIN_INVALID heap-only tuples.  Those might not be linked to by
365                          * any chain, since the parent tuple might be re-updated before
366                          * any pruning occurs.  So we have to be able to reap them
367                          * separately from chain-pruning.  (Note that
368                          * HeapTupleHeaderIsHotUpdated will never return true for an
369                          * XMIN_INVALID tuple, so this code will work even when there were
370                          * sequential updates within the aborted transaction.)
371                          *
372                          * Note that we might first arrive at a dead heap-only tuple
373                          * either here or while following a chain below.  Whichever path
374                          * gets there first will mark the tuple unused.
375                          */
376                         if (HeapTupleSatisfiesVacuum(htup, OldestXmin, buffer)
377                                 == HEAPTUPLE_DEAD && !HeapTupleHeaderIsHotUpdated(htup))
378                         {
379                                 heap_prune_record_unused(prstate, rootoffnum);
380                                 HeapTupleHeaderAdvanceLatestRemovedXid(htup,
381                                                                                                  &prstate->latestRemovedXid);
382                                 ndeleted++;
383                         }
384
385                         /* Nothing more to do */
386                         return ndeleted;
387                 }
388         }
389
390         /* Start from the root tuple */
391         offnum = rootoffnum;
392
393         /* while not end of the chain */
394         for (;;)
395         {
396                 ItemId          lp;
397                 bool            tupdead,
398                                         recent_dead;
399
400                 /* Some sanity checks */
401                 if (offnum < FirstOffsetNumber || offnum > maxoff)
402                         break;
403
404                 /* If item is already processed, stop --- it must not be same chain */
405                 if (prstate->marked[offnum])
406                         break;
407
408                 lp = PageGetItemId(dp, offnum);
409
410                 /* Unused item obviously isn't part of the chain */
411                 if (!ItemIdIsUsed(lp))
412                         break;
413
414                 /*
415                  * If we are looking at the redirected root line pointer, jump to the
416                  * first normal tuple in the chain.  If we find a redirect somewhere
417                  * else, stop --- it must not be same chain.
418                  */
419                 if (ItemIdIsRedirected(lp))
420                 {
421                         if (nchain > 0)
422                                 break;                  /* not at start of chain */
423                         chainitems[nchain++] = offnum;
424                         offnum = ItemIdGetRedirect(rootlp);
425                         continue;
426                 }
427
428                 /*
429                  * Likewise, a dead item pointer can't be part of the chain. (We
430                  * already eliminated the case of dead root tuple outside this
431                  * function.)
432                  */
433                 if (ItemIdIsDead(lp))
434                         break;
435
436                 Assert(ItemIdIsNormal(lp));
437                 htup = (HeapTupleHeader) PageGetItem(dp, lp);
438
439                 /*
440                  * Check the tuple XMIN against prior XMAX, if any
441                  */
442                 if (TransactionIdIsValid(priorXmax) &&
443                         !TransactionIdEquals(HeapTupleHeaderGetXmin(htup), priorXmax))
444                         break;
445
446                 /*
447                  * OK, this tuple is indeed a member of the chain.
448                  */
449                 chainitems[nchain++] = offnum;
450
451                 /*
452                  * Check tuple's visibility status.
453                  */
454                 tupdead = recent_dead = false;
455
456                 switch (HeapTupleSatisfiesVacuum(htup, OldestXmin, buffer))
457                 {
458                         case HEAPTUPLE_DEAD:
459                                 tupdead = true;
460                                 break;
461
462                         case HEAPTUPLE_RECENTLY_DEAD:
463                                 recent_dead = true;
464
465                                 /*
466                                  * This tuple may soon become DEAD.  Update the hint field so
467                                  * that the page is reconsidered for pruning in future.
468                                  */
469                                 heap_prune_record_prunable(prstate,
470                                                                                    HeapTupleHeaderGetXmax(htup));
471                                 break;
472
473                         case HEAPTUPLE_DELETE_IN_PROGRESS:
474
475                                 /*
476                                  * This tuple may soon become DEAD.  Update the hint field so
477                                  * that the page is reconsidered for pruning in future.
478                                  */
479                                 heap_prune_record_prunable(prstate,
480                                                                                    HeapTupleHeaderGetXmax(htup));
481                                 break;
482
483                         case HEAPTUPLE_LIVE:
484                         case HEAPTUPLE_INSERT_IN_PROGRESS:
485
486                                 /*
487                                  * If we wanted to optimize for aborts, we might consider
488                                  * marking the page prunable when we see INSERT_IN_PROGRESS.
489                                  * But we don't.  See related decisions about when to mark the
490                                  * page prunable in heapam.c.
491                                  */
492                                 break;
493
494                         default:
495                                 elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
496                                 break;
497                 }
498
499                 /*
500                  * Remember the last DEAD tuple seen.  We will advance past
501                  * RECENTLY_DEAD tuples just in case there's a DEAD one after them;
502                  * but we can't advance past anything else.  (XXX is it really worth
503                  * continuing to scan beyond RECENTLY_DEAD?  The case where we will
504                  * find another DEAD tuple is a fairly unusual corner case.)
505                  */
506                 if (tupdead)
507                 {
508                         latestdead = offnum;
509                         HeapTupleHeaderAdvanceLatestRemovedXid(htup,
510                                                                                                  &prstate->latestRemovedXid);
511                 }
512                 else if (!recent_dead)
513                         break;
514
515                 /*
516                  * If the tuple is not HOT-updated, then we are at the end of this
517                  * HOT-update chain.
518                  */
519                 if (!HeapTupleHeaderIsHotUpdated(htup))
520                         break;
521
522                 /*
523                  * Advance to next chain member.
524                  */
525                 Assert(ItemPointerGetBlockNumber(&htup->t_ctid) ==
526                            BufferGetBlockNumber(buffer));
527                 offnum = ItemPointerGetOffsetNumber(&htup->t_ctid);
528                 priorXmax = HeapTupleHeaderGetXmax(htup);
529         }
530
531         /*
532          * If we found a DEAD tuple in the chain, adjust the HOT chain so that all
533          * the DEAD tuples at the start of the chain are removed and the root line
534          * pointer is appropriately redirected.
535          */
536         if (OffsetNumberIsValid(latestdead))
537         {
538                 /*
539                  * Mark as unused each intermediate item that we are able to remove
540                  * from the chain.
541                  *
542                  * When the previous item is the last dead tuple seen, we are at the
543                  * right candidate for redirection.
544                  */
545                 for (i = 1; (i < nchain) && (chainitems[i - 1] != latestdead); i++)
546                 {
547                         heap_prune_record_unused(prstate, chainitems[i]);
548                         ndeleted++;
549                 }
550
551                 /*
552                  * If the root entry had been a normal tuple, we are deleting it, so
553                  * count it in the result.      But changing a redirect (even to DEAD
554                  * state) doesn't count.
555                  */
556                 if (ItemIdIsNormal(rootlp))
557                         ndeleted++;
558
559                 /*
560                  * If the DEAD tuple is at the end of the chain, the entire chain is
561                  * dead and the root line pointer can be marked dead.  Otherwise just
562                  * redirect the root to the correct chain member.
563                  */
564                 if (i >= nchain)
565                         heap_prune_record_dead(prstate, rootoffnum);
566                 else
567                         heap_prune_record_redirect(prstate, rootoffnum, chainitems[i]);
568         }
569         else if (nchain < 2 && ItemIdIsRedirected(rootlp))
570         {
571                 /*
572                  * We found a redirect item that doesn't point to a valid follow-on
573                  * item.  This can happen if the loop in heap_page_prune caused us to
574                  * visit the dead successor of a redirect item before visiting the
575                  * redirect item.  We can clean up by setting the redirect item to
576                  * DEAD state.
577                  */
578                 heap_prune_record_dead(prstate, rootoffnum);
579         }
580
581         return ndeleted;
582 }
583
584 /* Record lowest soon-prunable XID */
585 static void
586 heap_prune_record_prunable(PruneState *prstate, TransactionId xid)
587 {
588         /*
589          * This should exactly match the PageSetPrunable macro.  We can't store
590          * directly into the page header yet, so we update working state.
591          */
592         Assert(TransactionIdIsNormal(xid));
593         if (!TransactionIdIsValid(prstate->new_prune_xid) ||
594                 TransactionIdPrecedes(xid, prstate->new_prune_xid))
595                 prstate->new_prune_xid = xid;
596 }
597
598 /* Record item pointer to be redirected */
599 static void
600 heap_prune_record_redirect(PruneState *prstate,
601                                                    OffsetNumber offnum, OffsetNumber rdoffnum)
602 {
603         Assert(prstate->nredirected < MaxHeapTuplesPerPage);
604         prstate->redirected[prstate->nredirected * 2] = offnum;
605         prstate->redirected[prstate->nredirected * 2 + 1] = rdoffnum;
606         prstate->nredirected++;
607         Assert(!prstate->marked[offnum]);
608         prstate->marked[offnum] = true;
609         Assert(!prstate->marked[rdoffnum]);
610         prstate->marked[rdoffnum] = true;
611 }
612
613 /* Record item pointer to be marked dead */
614 static void
615 heap_prune_record_dead(PruneState *prstate, OffsetNumber offnum)
616 {
617         Assert(prstate->ndead < MaxHeapTuplesPerPage);
618         prstate->nowdead[prstate->ndead] = offnum;
619         prstate->ndead++;
620         Assert(!prstate->marked[offnum]);
621         prstate->marked[offnum] = true;
622 }
623
624 /* Record item pointer to be marked unused */
625 static void
626 heap_prune_record_unused(PruneState *prstate, OffsetNumber offnum)
627 {
628         Assert(prstate->nunused < MaxHeapTuplesPerPage);
629         prstate->nowunused[prstate->nunused] = offnum;
630         prstate->nunused++;
631         Assert(!prstate->marked[offnum]);
632         prstate->marked[offnum] = true;
633 }
634
635
636 /*
637  * Perform the actual page changes needed by heap_page_prune.
638  * It is expected that the caller has suitable pin and lock on the
639  * buffer, and is inside a critical section.
640  *
641  * This is split out because it is also used by heap_xlog_clean()
642  * to replay the WAL record when needed after a crash.  Note that the
643  * arguments are identical to those of log_heap_clean().
644  */
645 void
646 heap_page_prune_execute(Buffer buffer,
647                                                 OffsetNumber *redirected, int nredirected,
648                                                 OffsetNumber *nowdead, int ndead,
649                                                 OffsetNumber *nowunused, int nunused)
650 {
651         Page            page = (Page) BufferGetPage(buffer);
652         OffsetNumber *offnum;
653         int                     i;
654
655         /* Update all redirected line pointers */
656         offnum = redirected;
657         for (i = 0; i < nredirected; i++)
658         {
659                 OffsetNumber fromoff = *offnum++;
660                 OffsetNumber tooff = *offnum++;
661                 ItemId          fromlp = PageGetItemId(page, fromoff);
662
663                 ItemIdSetRedirect(fromlp, tooff);
664         }
665
666         /* Update all now-dead line pointers */
667         offnum = nowdead;
668         for (i = 0; i < ndead; i++)
669         {
670                 OffsetNumber off = *offnum++;
671                 ItemId          lp = PageGetItemId(page, off);
672
673                 ItemIdSetDead(lp);
674         }
675
676         /* Update all now-unused line pointers */
677         offnum = nowunused;
678         for (i = 0; i < nunused; i++)
679         {
680                 OffsetNumber off = *offnum++;
681                 ItemId          lp = PageGetItemId(page, off);
682
683                 ItemIdSetUnused(lp);
684         }
685
686         /*
687          * Finally, repair any fragmentation, and update the page's hint bit about
688          * whether it has free pointers.
689          */
690         PageRepairFragmentation(page);
691 }
692
693
694 /*
695  * For all items in this page, find their respective root line pointers.
696  * If item k is part of a HOT-chain with root at item j, then we set
697  * root_offsets[k - 1] = j.
698  *
699  * The passed-in root_offsets array must have MaxHeapTuplesPerPage entries.
700  * We zero out all unused entries.
701  *
702  * The function must be called with at least share lock on the buffer, to
703  * prevent concurrent prune operations.
704  *
705  * Note: The information collected here is valid only as long as the caller
706  * holds a pin on the buffer. Once pin is released, a tuple might be pruned
707  * and reused by a completely unrelated tuple.
708  */
709 void
710 heap_get_root_tuples(Page page, OffsetNumber *root_offsets)
711 {
712         OffsetNumber offnum,
713                                 maxoff;
714
715         MemSet(root_offsets, 0, MaxHeapTuplesPerPage * sizeof(OffsetNumber));
716
717         maxoff = PageGetMaxOffsetNumber(page);
718         for (offnum = FirstOffsetNumber; offnum <= maxoff; offnum = OffsetNumberNext(offnum))
719         {
720                 ItemId          lp = PageGetItemId(page, offnum);
721                 HeapTupleHeader htup;
722                 OffsetNumber nextoffnum;
723                 TransactionId priorXmax;
724
725                 /* skip unused and dead items */
726                 if (!ItemIdIsUsed(lp) || ItemIdIsDead(lp))
727                         continue;
728
729                 if (ItemIdIsNormal(lp))
730                 {
731                         htup = (HeapTupleHeader) PageGetItem(page, lp);
732
733                         /*
734                          * Check if this tuple is part of a HOT-chain rooted at some other
735                          * tuple. If so, skip it for now; we'll process it when we find
736                          * its root.
737                          */
738                         if (HeapTupleHeaderIsHeapOnly(htup))
739                                 continue;
740
741                         /*
742                          * This is either a plain tuple or the root of a HOT-chain.
743                          * Remember it in the mapping.
744                          */
745                         root_offsets[offnum - 1] = offnum;
746
747                         /* If it's not the start of a HOT-chain, we're done with it */
748                         if (!HeapTupleHeaderIsHotUpdated(htup))
749                                 continue;
750
751                         /* Set up to scan the HOT-chain */
752                         nextoffnum = ItemPointerGetOffsetNumber(&htup->t_ctid);
753                         priorXmax = HeapTupleHeaderGetXmax(htup);
754                 }
755                 else
756                 {
757                         /* Must be a redirect item. We do not set its root_offsets entry */
758                         Assert(ItemIdIsRedirected(lp));
759                         /* Set up to scan the HOT-chain */
760                         nextoffnum = ItemIdGetRedirect(lp);
761                         priorXmax = InvalidTransactionId;
762                 }
763
764                 /*
765                  * Now follow the HOT-chain and collect other tuples in the chain.
766                  *
767                  * Note: Even though this is a nested loop, the complexity of the
768                  * function is O(N) because a tuple in the page should be visited not
769                  * more than twice, once in the outer loop and once in HOT-chain
770                  * chases.
771                  */
772                 for (;;)
773                 {
774                         lp = PageGetItemId(page, nextoffnum);
775
776                         /* Check for broken chains */
777                         if (!ItemIdIsNormal(lp))
778                                 break;
779
780                         htup = (HeapTupleHeader) PageGetItem(page, lp);
781
782                         if (TransactionIdIsValid(priorXmax) &&
783                                 !TransactionIdEquals(priorXmax, HeapTupleHeaderGetXmin(htup)))
784                                 break;
785
786                         /* Remember the root line pointer for this item */
787                         root_offsets[nextoffnum - 1] = offnum;
788
789                         /* Advance to next chain member, if any */
790                         if (!HeapTupleHeaderIsHotUpdated(htup))
791                                 break;
792
793                         nextoffnum = ItemPointerGetOffsetNumber(&htup->t_ctid);
794                         priorXmax = HeapTupleHeaderGetXmax(htup);
795                 }
796         }
797 }