OSDN Git Service

Merge branch 'pgrex90-base' into pgrex90
[pg-rex/syncrep.git] / src / backend / access / heap / pruneheap.c
1 /*-------------------------------------------------------------------------
2  *
3  * pruneheap.c
4  *        heap page pruning and HOT-chain management code
5  *
6  * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/access/heap/pruneheap.c,v 1.25 2010/07/06 19:18:55 momjian Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "access/heapam.h"
18 #include "access/htup.h"
19 #include "access/transam.h"
20 #include "miscadmin.h"
21 #include "pgstat.h"
22 #include "storage/bufmgr.h"
23 #include "storage/off.h"
24 #include "utils/rel.h"
25 #include "utils/tqual.h"
26
27
28 /* Working data for heap_page_prune and subroutines */
29 typedef struct
30 {
31         TransactionId new_prune_xid;    /* new prune hint value for page */
32         TransactionId latestRemovedXid;         /* latest xid to be removed by this
33                                                                                  * prune */
34         int                     nredirected;    /* numbers of entries in arrays below */
35         int                     ndead;
36         int                     nunused;
37         /* arrays that accumulate indexes of items to be changed */
38         OffsetNumber redirected[MaxHeapTuplesPerPage * 2];
39         OffsetNumber nowdead[MaxHeapTuplesPerPage];
40         OffsetNumber nowunused[MaxHeapTuplesPerPage];
41         /* marked[i] is TRUE if item i is entered in one of the above arrays */
42         bool            marked[MaxHeapTuplesPerPage + 1];
43 } PruneState;
44
45 /* Local functions */
46 static int heap_prune_chain(Relation relation, Buffer buffer,
47                                  OffsetNumber rootoffnum,
48                                  TransactionId OldestXmin,
49                                  PruneState *prstate);
50 static void heap_prune_record_prunable(PruneState *prstate, TransactionId xid);
51 static void heap_prune_record_redirect(PruneState *prstate,
52                                                    OffsetNumber offnum, OffsetNumber rdoffnum);
53 static void heap_prune_record_dead(PruneState *prstate, OffsetNumber offnum);
54 static void heap_prune_record_unused(PruneState *prstate, OffsetNumber offnum);
55
56
57 /*
58  * Optionally prune and repair fragmentation in the specified page.
59  *
60  * This is an opportunistic function.  It will perform housekeeping
61  * only if the page heuristically looks like a candidate for pruning and we
62  * can acquire buffer cleanup lock without blocking.
63  *
64  * Note: this is called quite often.  It's important that it fall out quickly
65  * if there's not any use in pruning.
66  *
67  * Caller must have pin on the buffer, and must *not* have a lock on it.
68  *
69  * OldestXmin is the cutoff XID used to distinguish whether tuples are DEAD
70  * or RECENTLY_DEAD (see HeapTupleSatisfiesVacuum).
71  */
72 void
73 heap_page_prune_opt(Relation relation, Buffer buffer, TransactionId OldestXmin)
74 {
75         Page            page = BufferGetPage(buffer);
76         Size            minfree;
77
78         /*
79          * Let's see if we really need pruning.
80          *
81          * Forget it if page is not hinted to contain something prunable that's
82          * older than OldestXmin.
83          */
84         if (!PageIsPrunable(page, OldestXmin))
85                 return;
86
87         /*
88          * We can't write WAL in recovery mode, so there's no point trying to
89          * clean the page. The master will likely issue a cleaning WAL record soon
90          * anyway, so this is no particular loss.
91          */
92         if (RecoveryInProgress())
93                 return;
94
95         /*
96          * We prune when a previous UPDATE failed to find enough space on the page
97          * for a new tuple version, or when free space falls below the relation's
98          * fill-factor target (but not less than 10%).
99          *
100          * Checking free space here is questionable since we aren't holding any
101          * lock on the buffer; in the worst case we could get a bogus answer. It's
102          * unlikely to be *seriously* wrong, though, since reading either pd_lower
103          * or pd_upper is probably atomic.      Avoiding taking a lock seems more
104          * important than sometimes getting a wrong answer in what is after all
105          * just a heuristic estimate.
106          */
107         minfree = RelationGetTargetPageFreeSpace(relation,
108                                                                                          HEAP_DEFAULT_FILLFACTOR);
109         minfree = Max(minfree, BLCKSZ / 10);
110
111         if (PageIsFull(page) || PageGetHeapFreeSpace(page) < minfree)
112         {
113                 /* OK, try to get exclusive buffer lock */
114                 if (!ConditionalLockBufferForCleanup(buffer))
115                         return;
116
117                 /*
118                  * Now that we have buffer lock, get accurate information about the
119                  * page's free space, and recheck the heuristic about whether to
120                  * prune. (We needn't recheck PageIsPrunable, since no one else could
121                  * have pruned while we hold pin.)
122                  */
123                 if (PageIsFull(page) || PageGetHeapFreeSpace(page) < minfree)
124                 {
125                         TransactionId ignore = InvalidTransactionId;            /* return value not
126                                                                                                                                  * needed */
127
128                         /* OK to prune */
129                         (void) heap_page_prune(relation, buffer, OldestXmin, true, &ignore);
130                 }
131
132                 /* And release buffer lock */
133                 LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
134         }
135 }
136
137
138 /*
139  * Prune and repair fragmentation in the specified page.
140  *
141  * Caller must have pin and buffer cleanup lock on the page.
142  *
143  * OldestXmin is the cutoff XID used to distinguish whether tuples are DEAD
144  * or RECENTLY_DEAD (see HeapTupleSatisfiesVacuum).
145  *
146  * If report_stats is true then we send the number of reclaimed heap-only
147  * tuples to pgstats.  (This must be FALSE during vacuum, since vacuum will
148  * send its own new total to pgstats, and we don't want this delta applied
149  * on top of that.)
150  *
151  * Returns the number of tuples deleted from the page and sets
152  * latestRemovedXid.
153  */
154 int
155 heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
156                                 bool report_stats, TransactionId *latestRemovedXid)
157 {
158         int                     ndeleted = 0;
159         Page            page = BufferGetPage(buffer);
160         OffsetNumber offnum,
161                                 maxoff;
162         PruneState      prstate;
163
164         /*
165          * Our strategy is to scan the page and make lists of items to change,
166          * then apply the changes within a critical section.  This keeps as much
167          * logic as possible out of the critical section, and also ensures that
168          * WAL replay will work the same as the normal case.
169          *
170          * First, initialize the new pd_prune_xid value to zero (indicating no
171          * prunable tuples).  If we find any tuples which may soon become
172          * prunable, we will save the lowest relevant XID in new_prune_xid. Also
173          * initialize the rest of our working state.
174          */
175         prstate.new_prune_xid = InvalidTransactionId;
176         prstate.latestRemovedXid = InvalidTransactionId;
177         prstate.nredirected = prstate.ndead = prstate.nunused = 0;
178         memset(prstate.marked, 0, sizeof(prstate.marked));
179
180         /* Scan the page */
181         maxoff = PageGetMaxOffsetNumber(page);
182         for (offnum = FirstOffsetNumber;
183                  offnum <= maxoff;
184                  offnum = OffsetNumberNext(offnum))
185         {
186                 ItemId          itemid;
187
188                 /* Ignore items already processed as part of an earlier chain */
189                 if (prstate.marked[offnum])
190                         continue;
191
192                 /* Nothing to do if slot is empty or already dead */
193                 itemid = PageGetItemId(page, offnum);
194                 if (!ItemIdIsUsed(itemid) || ItemIdIsDead(itemid))
195                         continue;
196
197                 /* Process this item or chain of items */
198                 ndeleted += heap_prune_chain(relation, buffer, offnum,
199                                                                          OldestXmin,
200                                                                          &prstate);
201         }
202
203         /* Any error while applying the changes is critical */
204         START_CRIT_SECTION();
205
206         /* Have we found any prunable items? */
207         if (prstate.nredirected > 0 || prstate.ndead > 0 || prstate.nunused > 0)
208         {
209                 /*
210                  * Apply the planned item changes, then repair page fragmentation, and
211                  * update the page's hint bit about whether it has free line pointers.
212                  */
213                 heap_page_prune_execute(buffer,
214                                                                 prstate.redirected, prstate.nredirected,
215                                                                 prstate.nowdead, prstate.ndead,
216                                                                 prstate.nowunused, prstate.nunused);
217
218                 /*
219                  * Update the page's pd_prune_xid field to either zero, or the lowest
220                  * XID of any soon-prunable tuple.
221                  */
222                 ((PageHeader) page)->pd_prune_xid = prstate.new_prune_xid;
223
224                 /*
225                  * Also clear the "page is full" flag, since there's no point in
226                  * repeating the prune/defrag process until something else happens to
227                  * the page.
228                  */
229                 PageClearFull(page);
230
231                 MarkBufferDirty(buffer);
232
233                 /*
234                  * Emit a WAL HEAP_CLEAN record showing what we did
235                  */
236                 if (!relation->rd_istemp)
237                 {
238                         XLogRecPtr      recptr;
239
240                         recptr = log_heap_clean(relation, buffer,
241                                                                         prstate.redirected, prstate.nredirected,
242                                                                         prstate.nowdead, prstate.ndead,
243                                                                         prstate.nowunused, prstate.nunused,
244                                                                         prstate.latestRemovedXid);
245
246                         PageSetLSN(BufferGetPage(buffer), recptr);
247                         PageSetTLI(BufferGetPage(buffer), ThisTimeLineID);
248                 }
249         }
250         else
251         {
252                 /*
253                  * If we didn't prune anything, but have found a new value for the
254                  * pd_prune_xid field, update it and mark the buffer dirty. This is
255                  * treated as a non-WAL-logged hint.
256                  *
257                  * Also clear the "page is full" flag if it is set, since there's no
258                  * point in repeating the prune/defrag process until something else
259                  * happens to the page.
260                  */
261                 if (((PageHeader) page)->pd_prune_xid != prstate.new_prune_xid ||
262                         PageIsFull(page))
263                 {
264                         ((PageHeader) page)->pd_prune_xid = prstate.new_prune_xid;
265                         PageClearFull(page);
266                         SetBufferCommitInfoNeedsSave(buffer);
267                 }
268         }
269
270         END_CRIT_SECTION();
271
272         /*
273          * If requested, report the number of tuples reclaimed to pgstats. This is
274          * ndeleted minus ndead, because we don't want to count a now-DEAD root
275          * item as a deletion for this purpose.
276          */
277         if (report_stats && ndeleted > prstate.ndead)
278                 pgstat_update_heap_dead_tuples(relation, ndeleted - prstate.ndead);
279
280         *latestRemovedXid = prstate.latestRemovedXid;
281
282         /*
283          * XXX Should we update the FSM information of this page ?
284          *
285          * There are two schools of thought here. We may not want to update FSM
286          * information so that the page is not used for unrelated UPDATEs/INSERTs
287          * and any free space in this page will remain available for further
288          * UPDATEs in *this* page, thus improving chances for doing HOT updates.
289          *
290          * But for a large table and where a page does not receive further UPDATEs
291          * for a long time, we might waste this space by not updating the FSM
292          * information. The relation may get extended and fragmented further.
293          *
294          * One possibility is to leave "fillfactor" worth of space in this page
295          * and update FSM with the remaining space.
296          *
297          * In any case, the current FSM implementation doesn't accept
298          * one-page-at-a-time updates, so this is all academic for now.
299          */
300
301         return ndeleted;
302 }
303
304
305 /*
306  * Prune specified item pointer or a HOT chain originating at that item.
307  *
308  * If the item is an index-referenced tuple (i.e. not a heap-only tuple),
309  * the HOT chain is pruned by removing all DEAD tuples at the start of the HOT
310  * chain.  We also prune any RECENTLY_DEAD tuples preceding a DEAD tuple.
311  * This is OK because a RECENTLY_DEAD tuple preceding a DEAD tuple is really
312  * DEAD, the OldestXmin test is just too coarse to detect it.
313  *
314  * The root line pointer is redirected to the tuple immediately after the
315  * latest DEAD tuple.  If all tuples in the chain are DEAD, the root line
316  * pointer is marked LP_DEAD.  (This includes the case of a DEAD simple
317  * tuple, which we treat as a chain of length 1.)
318  *
319  * OldestXmin is the cutoff XID used to identify dead tuples.
320  *
321  * We don't actually change the page here, except perhaps for hint-bit updates
322  * caused by HeapTupleSatisfiesVacuum.  We just add entries to the arrays in
323  * prstate showing the changes to be made.      Items to be redirected are added
324  * to the redirected[] array (two entries per redirection); items to be set to
325  * LP_DEAD state are added to nowdead[]; and items to be set to LP_UNUSED
326  * state are added to nowunused[].
327  *
328  * Returns the number of tuples (to be) deleted from the page.
329  */
330 static int
331 heap_prune_chain(Relation relation, Buffer buffer, OffsetNumber rootoffnum,
332                                  TransactionId OldestXmin,
333                                  PruneState *prstate)
334 {
335         int                     ndeleted = 0;
336         Page            dp = (Page) BufferGetPage(buffer);
337         TransactionId priorXmax = InvalidTransactionId;
338         ItemId          rootlp;
339         HeapTupleHeader htup;
340         OffsetNumber latestdead = InvalidOffsetNumber,
341                                 maxoff = PageGetMaxOffsetNumber(dp),
342                                 offnum;
343         OffsetNumber chainitems[MaxHeapTuplesPerPage];
344         int                     nchain = 0,
345                                 i;
346
347         rootlp = PageGetItemId(dp, rootoffnum);
348
349         /*
350          * If it's a heap-only tuple, then it is not the start of a HOT chain.
351          */
352         if (ItemIdIsNormal(rootlp))
353         {
354                 htup = (HeapTupleHeader) PageGetItem(dp, rootlp);
355                 if (HeapTupleHeaderIsHeapOnly(htup))
356                 {
357                         /*
358                          * If the tuple is DEAD and doesn't chain to anything else, mark
359                          * it unused immediately.  (If it does chain, we can only remove
360                          * it as part of pruning its chain.)
361                          *
362                          * We need this primarily to handle aborted HOT updates, that is,
363                          * XMIN_INVALID heap-only tuples.  Those might not be linked to by
364                          * any chain, since the parent tuple might be re-updated before
365                          * any pruning occurs.  So we have to be able to reap them
366                          * separately from chain-pruning.  (Note that
367                          * HeapTupleHeaderIsHotUpdated will never return true for an
368                          * XMIN_INVALID tuple, so this code will work even when there were
369                          * sequential updates within the aborted transaction.)
370                          *
371                          * Note that we might first arrive at a dead heap-only tuple
372                          * either here or while following a chain below.  Whichever path
373                          * gets there first will mark the tuple unused.
374                          */
375                         if (HeapTupleSatisfiesVacuum(htup, OldestXmin, buffer)
376                                 == HEAPTUPLE_DEAD && !HeapTupleHeaderIsHotUpdated(htup))
377                         {
378                                 heap_prune_record_unused(prstate, rootoffnum);
379                                 HeapTupleHeaderAdvanceLatestRemovedXid(htup,
380                                                                                                  &prstate->latestRemovedXid);
381                                 ndeleted++;
382                         }
383
384                         /* Nothing more to do */
385                         return ndeleted;
386                 }
387         }
388
389         /* Start from the root tuple */
390         offnum = rootoffnum;
391
392         /* while not end of the chain */
393         for (;;)
394         {
395                 ItemId          lp;
396                 bool            tupdead,
397                                         recent_dead;
398
399                 /* Some sanity checks */
400                 if (offnum < FirstOffsetNumber || offnum > maxoff)
401                         break;
402
403                 /* If item is already processed, stop --- it must not be same chain */
404                 if (prstate->marked[offnum])
405                         break;
406
407                 lp = PageGetItemId(dp, offnum);
408
409                 /* Unused item obviously isn't part of the chain */
410                 if (!ItemIdIsUsed(lp))
411                         break;
412
413                 /*
414                  * If we are looking at the redirected root line pointer, jump to the
415                  * first normal tuple in the chain.  If we find a redirect somewhere
416                  * else, stop --- it must not be same chain.
417                  */
418                 if (ItemIdIsRedirected(lp))
419                 {
420                         if (nchain > 0)
421                                 break;                  /* not at start of chain */
422                         chainitems[nchain++] = offnum;
423                         offnum = ItemIdGetRedirect(rootlp);
424                         continue;
425                 }
426
427                 /*
428                  * Likewise, a dead item pointer can't be part of the chain. (We
429                  * already eliminated the case of dead root tuple outside this
430                  * function.)
431                  */
432                 if (ItemIdIsDead(lp))
433                         break;
434
435                 Assert(ItemIdIsNormal(lp));
436                 htup = (HeapTupleHeader) PageGetItem(dp, lp);
437
438                 /*
439                  * Check the tuple XMIN against prior XMAX, if any
440                  */
441                 if (TransactionIdIsValid(priorXmax) &&
442                         !TransactionIdEquals(HeapTupleHeaderGetXmin(htup), priorXmax))
443                         break;
444
445                 /*
446                  * OK, this tuple is indeed a member of the chain.
447                  */
448                 chainitems[nchain++] = offnum;
449
450                 /*
451                  * Check tuple's visibility status.
452                  */
453                 tupdead = recent_dead = false;
454
455                 switch (HeapTupleSatisfiesVacuum(htup, OldestXmin, buffer))
456                 {
457                         case HEAPTUPLE_DEAD:
458                                 tupdead = true;
459                                 break;
460
461                         case HEAPTUPLE_RECENTLY_DEAD:
462                                 recent_dead = true;
463
464                                 /*
465                                  * This tuple may soon become DEAD.  Update the hint field so
466                                  * that the page is reconsidered for pruning in future.
467                                  */
468                                 heap_prune_record_prunable(prstate,
469                                                                                    HeapTupleHeaderGetXmax(htup));
470                                 break;
471
472                         case HEAPTUPLE_DELETE_IN_PROGRESS:
473
474                                 /*
475                                  * This tuple may soon become DEAD.  Update the hint field so
476                                  * that the page is reconsidered for pruning in future.
477                                  */
478                                 heap_prune_record_prunable(prstate,
479                                                                                    HeapTupleHeaderGetXmax(htup));
480                                 break;
481
482                         case HEAPTUPLE_LIVE:
483                         case HEAPTUPLE_INSERT_IN_PROGRESS:
484
485                                 /*
486                                  * If we wanted to optimize for aborts, we might consider
487                                  * marking the page prunable when we see INSERT_IN_PROGRESS.
488                                  * But we don't.  See related decisions about when to mark the
489                                  * page prunable in heapam.c.
490                                  */
491                                 break;
492
493                         default:
494                                 elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
495                                 break;
496                 }
497
498                 /*
499                  * Remember the last DEAD tuple seen.  We will advance past
500                  * RECENTLY_DEAD tuples just in case there's a DEAD one after them;
501                  * but we can't advance past anything else.  (XXX is it really worth
502                  * continuing to scan beyond RECENTLY_DEAD?  The case where we will
503                  * find another DEAD tuple is a fairly unusual corner case.)
504                  */
505                 if (tupdead)
506                 {
507                         latestdead = offnum;
508                         HeapTupleHeaderAdvanceLatestRemovedXid(htup,
509                                                                                                  &prstate->latestRemovedXid);
510                 }
511                 else if (!recent_dead)
512                         break;
513
514                 /*
515                  * If the tuple is not HOT-updated, then we are at the end of this
516                  * HOT-update chain.
517                  */
518                 if (!HeapTupleHeaderIsHotUpdated(htup))
519                         break;
520
521                 /*
522                  * Advance to next chain member.
523                  */
524                 Assert(ItemPointerGetBlockNumber(&htup->t_ctid) ==
525                            BufferGetBlockNumber(buffer));
526                 offnum = ItemPointerGetOffsetNumber(&htup->t_ctid);
527                 priorXmax = HeapTupleHeaderGetXmax(htup);
528         }
529
530         /*
531          * If we found a DEAD tuple in the chain, adjust the HOT chain so that all
532          * the DEAD tuples at the start of the chain are removed and the root line
533          * pointer is appropriately redirected.
534          */
535         if (OffsetNumberIsValid(latestdead))
536         {
537                 /*
538                  * Mark as unused each intermediate item that we are able to remove
539                  * from the chain.
540                  *
541                  * When the previous item is the last dead tuple seen, we are at the
542                  * right candidate for redirection.
543                  */
544                 for (i = 1; (i < nchain) && (chainitems[i - 1] != latestdead); i++)
545                 {
546                         heap_prune_record_unused(prstate, chainitems[i]);
547                         ndeleted++;
548                 }
549
550                 /*
551                  * If the root entry had been a normal tuple, we are deleting it, so
552                  * count it in the result.      But changing a redirect (even to DEAD
553                  * state) doesn't count.
554                  */
555                 if (ItemIdIsNormal(rootlp))
556                         ndeleted++;
557
558                 /*
559                  * If the DEAD tuple is at the end of the chain, the entire chain is
560                  * dead and the root line pointer can be marked dead.  Otherwise just
561                  * redirect the root to the correct chain member.
562                  */
563                 if (i >= nchain)
564                         heap_prune_record_dead(prstate, rootoffnum);
565                 else
566                         heap_prune_record_redirect(prstate, rootoffnum, chainitems[i]);
567         }
568         else if (nchain < 2 && ItemIdIsRedirected(rootlp))
569         {
570                 /*
571                  * We found a redirect item that doesn't point to a valid follow-on
572                  * item.  This can happen if the loop in heap_page_prune caused us to
573                  * visit the dead successor of a redirect item before visiting the
574                  * redirect item.  We can clean up by setting the redirect item to
575                  * DEAD state.
576                  */
577                 heap_prune_record_dead(prstate, rootoffnum);
578         }
579
580         return ndeleted;
581 }
582
583 /* Record lowest soon-prunable XID */
584 static void
585 heap_prune_record_prunable(PruneState *prstate, TransactionId xid)
586 {
587         /*
588          * This should exactly match the PageSetPrunable macro.  We can't store
589          * directly into the page header yet, so we update working state.
590          */
591         Assert(TransactionIdIsNormal(xid));
592         if (!TransactionIdIsValid(prstate->new_prune_xid) ||
593                 TransactionIdPrecedes(xid, prstate->new_prune_xid))
594                 prstate->new_prune_xid = xid;
595 }
596
597 /* Record item pointer to be redirected */
598 static void
599 heap_prune_record_redirect(PruneState *prstate,
600                                                    OffsetNumber offnum, OffsetNumber rdoffnum)
601 {
602         Assert(prstate->nredirected < MaxHeapTuplesPerPage);
603         prstate->redirected[prstate->nredirected * 2] = offnum;
604         prstate->redirected[prstate->nredirected * 2 + 1] = rdoffnum;
605         prstate->nredirected++;
606         Assert(!prstate->marked[offnum]);
607         prstate->marked[offnum] = true;
608         Assert(!prstate->marked[rdoffnum]);
609         prstate->marked[rdoffnum] = true;
610 }
611
612 /* Record item pointer to be marked dead */
613 static void
614 heap_prune_record_dead(PruneState *prstate, OffsetNumber offnum)
615 {
616         Assert(prstate->ndead < MaxHeapTuplesPerPage);
617         prstate->nowdead[prstate->ndead] = offnum;
618         prstate->ndead++;
619         Assert(!prstate->marked[offnum]);
620         prstate->marked[offnum] = true;
621 }
622
623 /* Record item pointer to be marked unused */
624 static void
625 heap_prune_record_unused(PruneState *prstate, OffsetNumber offnum)
626 {
627         Assert(prstate->nunused < MaxHeapTuplesPerPage);
628         prstate->nowunused[prstate->nunused] = offnum;
629         prstate->nunused++;
630         Assert(!prstate->marked[offnum]);
631         prstate->marked[offnum] = true;
632 }
633
634
635 /*
636  * Perform the actual page changes needed by heap_page_prune.
637  * It is expected that the caller has suitable pin and lock on the
638  * buffer, and is inside a critical section.
639  *
640  * This is split out because it is also used by heap_xlog_clean()
641  * to replay the WAL record when needed after a crash.  Note that the
642  * arguments are identical to those of log_heap_clean().
643  */
644 void
645 heap_page_prune_execute(Buffer buffer,
646                                                 OffsetNumber *redirected, int nredirected,
647                                                 OffsetNumber *nowdead, int ndead,
648                                                 OffsetNumber *nowunused, int nunused)
649 {
650         Page            page = (Page) BufferGetPage(buffer);
651         OffsetNumber *offnum;
652         int                     i;
653
654         /* Update all redirected line pointers */
655         offnum = redirected;
656         for (i = 0; i < nredirected; i++)
657         {
658                 OffsetNumber fromoff = *offnum++;
659                 OffsetNumber tooff = *offnum++;
660                 ItemId          fromlp = PageGetItemId(page, fromoff);
661
662                 ItemIdSetRedirect(fromlp, tooff);
663         }
664
665         /* Update all now-dead line pointers */
666         offnum = nowdead;
667         for (i = 0; i < ndead; i++)
668         {
669                 OffsetNumber off = *offnum++;
670                 ItemId          lp = PageGetItemId(page, off);
671
672                 ItemIdSetDead(lp);
673         }
674
675         /* Update all now-unused line pointers */
676         offnum = nowunused;
677         for (i = 0; i < nunused; i++)
678         {
679                 OffsetNumber off = *offnum++;
680                 ItemId          lp = PageGetItemId(page, off);
681
682                 ItemIdSetUnused(lp);
683         }
684
685         /*
686          * Finally, repair any fragmentation, and update the page's hint bit about
687          * whether it has free pointers.
688          */
689         PageRepairFragmentation(page);
690 }
691
692
693 /*
694  * For all items in this page, find their respective root line pointers.
695  * If item k is part of a HOT-chain with root at item j, then we set
696  * root_offsets[k - 1] = j.
697  *
698  * The passed-in root_offsets array must have MaxHeapTuplesPerPage entries.
699  * We zero out all unused entries.
700  *
701  * The function must be called with at least share lock on the buffer, to
702  * prevent concurrent prune operations.
703  *
704  * Note: The information collected here is valid only as long as the caller
705  * holds a pin on the buffer. Once pin is released, a tuple might be pruned
706  * and reused by a completely unrelated tuple.
707  */
708 void
709 heap_get_root_tuples(Page page, OffsetNumber *root_offsets)
710 {
711         OffsetNumber offnum,
712                                 maxoff;
713
714         MemSet(root_offsets, 0, MaxHeapTuplesPerPage * sizeof(OffsetNumber));
715
716         maxoff = PageGetMaxOffsetNumber(page);
717         for (offnum = FirstOffsetNumber; offnum <= maxoff; offnum = OffsetNumberNext(offnum))
718         {
719                 ItemId          lp = PageGetItemId(page, offnum);
720                 HeapTupleHeader htup;
721                 OffsetNumber nextoffnum;
722                 TransactionId priorXmax;
723
724                 /* skip unused and dead items */
725                 if (!ItemIdIsUsed(lp) || ItemIdIsDead(lp))
726                         continue;
727
728                 if (ItemIdIsNormal(lp))
729                 {
730                         htup = (HeapTupleHeader) PageGetItem(page, lp);
731
732                         /*
733                          * Check if this tuple is part of a HOT-chain rooted at some other
734                          * tuple. If so, skip it for now; we'll process it when we find
735                          * its root.
736                          */
737                         if (HeapTupleHeaderIsHeapOnly(htup))
738                                 continue;
739
740                         /*
741                          * This is either a plain tuple or the root of a HOT-chain.
742                          * Remember it in the mapping.
743                          */
744                         root_offsets[offnum - 1] = offnum;
745
746                         /* If it's not the start of a HOT-chain, we're done with it */
747                         if (!HeapTupleHeaderIsHotUpdated(htup))
748                                 continue;
749
750                         /* Set up to scan the HOT-chain */
751                         nextoffnum = ItemPointerGetOffsetNumber(&htup->t_ctid);
752                         priorXmax = HeapTupleHeaderGetXmax(htup);
753                 }
754                 else
755                 {
756                         /* Must be a redirect item. We do not set its root_offsets entry */
757                         Assert(ItemIdIsRedirected(lp));
758                         /* Set up to scan the HOT-chain */
759                         nextoffnum = ItemIdGetRedirect(lp);
760                         priorXmax = InvalidTransactionId;
761                 }
762
763                 /*
764                  * Now follow the HOT-chain and collect other tuples in the chain.
765                  *
766                  * Note: Even though this is a nested loop, the complexity of the
767                  * function is O(N) because a tuple in the page should be visited not
768                  * more than twice, once in the outer loop and once in HOT-chain
769                  * chases.
770                  */
771                 for (;;)
772                 {
773                         lp = PageGetItemId(page, nextoffnum);
774
775                         /* Check for broken chains */
776                         if (!ItemIdIsNormal(lp))
777                                 break;
778
779                         htup = (HeapTupleHeader) PageGetItem(page, lp);
780
781                         if (TransactionIdIsValid(priorXmax) &&
782                                 !TransactionIdEquals(priorXmax, HeapTupleHeaderGetXmin(htup)))
783                                 break;
784
785                         /* Remember the root line pointer for this item */
786                         root_offsets[nextoffnum - 1] = offnum;
787
788                         /* Advance to next chain member, if any */
789                         if (!HeapTupleHeaderIsHotUpdated(htup))
790                                 break;
791
792                         nextoffnum = ItemPointerGetOffsetNumber(&htup->t_ctid);
793                         priorXmax = HeapTupleHeaderGetXmax(htup);
794                 }
795         }
796 }