OSDN Git Service

Move call to GetTopTransactionId() earlier in LockAcquire(),
[pg-rex/syncrep.git] / src / backend / storage / ipc / standby.c
1 /*-------------------------------------------------------------------------
2  *
3  * standby.c
4  *        Misc functions used in Hot Standby mode.
5  *
6  *      All functions for handling RM_STANDBY_ID, which relate to
7  *      AccessExclusiveLocks and starting snapshots for Hot Standby mode.
8  *      Plus conflict recovery processing.
9  *
10  * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
11  * Portions Copyright (c) 1994, Regents of the University of California
12  *
13  * IDENTIFICATION
14  *        $PostgreSQL: pgsql/src/backend/storage/ipc/standby.c,v 1.27.2.2 2010/08/19 22:55:10 tgl Exp $
15  *
16  *-------------------------------------------------------------------------
17  */
18 #include "postgres.h"
19 #include "access/transam.h"
20 #include "access/twophase.h"
21 #include "access/xact.h"
22 #include "access/xlog.h"
23 #include "miscadmin.h"
24 #include "pgstat.h"
25 #include "storage/bufmgr.h"
26 #include "storage/lmgr.h"
27 #include "storage/proc.h"
28 #include "storage/procarray.h"
29 #include "storage/sinvaladt.h"
30 #include "storage/standby.h"
31 #include "utils/ps_status.h"
32
33 /* User-settable GUC parameters */
34 int                     vacuum_defer_cleanup_age;
35 int                     max_standby_archive_delay = 30 * 1000;
36 int                     max_standby_streaming_delay = 30 * 1000;
37
38 static List *RecoveryLockList;
39
40 static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
41                                                                            ProcSignalReason reason);
42 static void ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid);
43 static void LogCurrentRunningXacts(RunningTransactions CurrRunningXacts);
44 static void LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks);
45
46
47 /*
48  * InitRecoveryTransactionEnvironment
49  *              Initialize tracking of in-progress transactions in master
50  *
51  * We need to issue shared invalidations and hold locks. Holding locks
52  * means others may want to wait on us, so we need to make a lock table
53  * vxact entry like a real transaction. We could create and delete
54  * lock table entries for each transaction but its simpler just to create
55  * one permanent entry and leave it there all the time. Locks are then
56  * acquired and released as needed. Yes, this means you can see the
57  * Startup process in pg_locks once we have run this.
58  */
59 void
60 InitRecoveryTransactionEnvironment(void)
61 {
62         VirtualTransactionId vxid;
63
64         /*
65          * Initialize shared invalidation management for Startup process, being
66          * careful to register ourselves as a sendOnly process so we don't need to
67          * read messages, nor will we get signalled when the queue starts filling
68          * up.
69          */
70         SharedInvalBackendInit(true);
71
72         /*
73          * Record the PID and PGPROC structure of the startup process.
74          */
75         PublishStartupProcessInformation();
76
77         /*
78          * Lock a virtual transaction id for Startup process.
79          *
80          * We need to do GetNextLocalTransactionId() because
81          * SharedInvalBackendInit() leaves localTransactionid invalid and the lock
82          * manager doesn't like that at all.
83          *
84          * Note that we don't need to run XactLockTableInsert() because nobody
85          * needs to wait on xids. That sounds a little strange, but table locks
86          * are held by vxids and row level locks are held by xids. All queries
87          * hold AccessShareLocks so never block while we write or lock new rows.
88          */
89         vxid.backendId = MyBackendId;
90         vxid.localTransactionId = GetNextLocalTransactionId();
91         VirtualXactLockTableInsert(vxid);
92
93         standbyState = STANDBY_INITIALIZED;
94 }
95
96 /*
97  * ShutdownRecoveryTransactionEnvironment
98  *              Shut down transaction tracking
99  *
100  * Prepare to switch from hot standby mode to normal operation. Shut down
101  * recovery-time transaction tracking.
102  */
103 void
104 ShutdownRecoveryTransactionEnvironment(void)
105 {
106         /* Mark all tracked in-progress transactions as finished. */
107         ExpireAllKnownAssignedTransactionIds();
108
109         /* Release all locks the tracked transactions were holding */
110         StandbyReleaseAllLocks();
111 }
112
113
114 /*
115  * -----------------------------------------------------
116  *              Standby wait timers and backend cancel logic
117  * -----------------------------------------------------
118  */
119
120 /*
121  * Determine the cutoff time at which we want to start canceling conflicting
122  * transactions.  Returns zero (a time safely in the past) if we are willing
123  * to wait forever.
124  */
125 static TimestampTz
126 GetStandbyLimitTime(void)
127 {
128         TimestampTz rtime;
129         bool            fromStream;
130
131         /*
132          * The cutoff time is the last WAL data receipt time plus the appropriate
133          * delay variable.      Delay of -1 means wait forever.
134          */
135         GetXLogReceiptTime(&rtime, &fromStream);
136         if (fromStream)
137         {
138                 if (max_standby_streaming_delay < 0)
139                         return 0;                       /* wait forever */
140                 return TimestampTzPlusMilliseconds(rtime, max_standby_streaming_delay);
141         }
142         else
143         {
144                 if (max_standby_archive_delay < 0)
145                         return 0;                       /* wait forever */
146                 return TimestampTzPlusMilliseconds(rtime, max_standby_archive_delay);
147         }
148 }
149
150 #define STANDBY_INITIAL_WAIT_US  1000
151 static int      standbyWait_us = STANDBY_INITIAL_WAIT_US;
152
153 /*
154  * Standby wait logic for ResolveRecoveryConflictWithVirtualXIDs.
155  * We wait here for a while then return. If we decide we can't wait any
156  * more then we return true, if we can wait some more return false.
157  */
158 static bool
159 WaitExceedsMaxStandbyDelay(void)
160 {
161         TimestampTz ltime;
162
163         /* Are we past the limit time? */
164         ltime = GetStandbyLimitTime();
165         if (ltime && GetCurrentTimestamp() >= ltime)
166                 return true;
167
168         /*
169          * Sleep a bit (this is essential to avoid busy-waiting).
170          */
171         pg_usleep(standbyWait_us);
172
173         /*
174          * Progressively increase the sleep times, but not to more than 1s, since
175          * pg_usleep isn't interruptable on some platforms.
176          */
177         standbyWait_us *= 2;
178         if (standbyWait_us > 1000000)
179                 standbyWait_us = 1000000;
180
181         return false;
182 }
183
184 /*
185  * This is the main executioner for any query backend that conflicts with
186  * recovery processing. Judgement has already been passed on it within
187  * a specific rmgr. Here we just issue the orders to the procs. The procs
188  * then throw the required error as instructed.
189  */
190 static void
191 ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
192                                                                            ProcSignalReason reason)
193 {
194         while (VirtualTransactionIdIsValid(*waitlist))
195         {
196                 TimestampTz waitStart;
197                 char       *new_status;
198
199                 pgstat_report_waiting(true);
200
201                 waitStart = GetCurrentTimestamp();
202                 new_status = NULL;              /* we haven't changed the ps display */
203
204                 /* reset standbyWait_us for each xact we wait for */
205                 standbyWait_us = STANDBY_INITIAL_WAIT_US;
206
207                 /* wait until the virtual xid is gone */
208                 while (!ConditionalVirtualXactLockTableWait(*waitlist))
209                 {
210                         /*
211                          * Report via ps if we have been waiting for more than 500 msec
212                          * (should that be configurable?)
213                          */
214                         if (update_process_title && new_status == NULL &&
215                                 TimestampDifferenceExceeds(waitStart, GetCurrentTimestamp(),
216                                                                                    500))
217                         {
218                                 const char *old_status;
219                                 int                     len;
220
221                                 old_status = get_ps_display(&len);
222                                 new_status = (char *) palloc(len + 8 + 1);
223                                 memcpy(new_status, old_status, len);
224                                 strcpy(new_status + len, " waiting");
225                                 set_ps_display(new_status, false);
226                                 new_status[len] = '\0'; /* truncate off " waiting" */
227                         }
228
229                         /* Is it time to kill it? */
230                         if (WaitExceedsMaxStandbyDelay())
231                         {
232                                 pid_t           pid;
233
234                                 /*
235                                  * Now find out who to throw out of the balloon.
236                                  */
237                                 Assert(VirtualTransactionIdIsValid(*waitlist));
238                                 pid = CancelVirtualTransaction(*waitlist, reason);
239
240                                 /*
241                                  * Wait a little bit for it to die so that we avoid flooding
242                                  * an unresponsive backend when system is heavily loaded.
243                                  */
244                                 if (pid != 0)
245                                         pg_usleep(5000L);
246                         }
247                 }
248
249                 /* Reset ps display if we changed it */
250                 if (new_status)
251                 {
252                         set_ps_display(new_status, false);
253                         pfree(new_status);
254                 }
255                 pgstat_report_waiting(false);
256
257                 /* The virtual transaction is gone now, wait for the next one */
258                 waitlist++;
259         }
260 }
261
262 void
263 ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode node)
264 {
265         VirtualTransactionId *backends;
266
267         /*
268          * If we get passed InvalidTransactionId then we are a little surprised,
269          * but it is theoretically possible in normal running. It also happens
270          * when replaying already applied WAL records after a standby crash or
271          * restart. If latestRemovedXid is invalid then there is no conflict. That
272          * rule applies across all record types that suffer from this conflict.
273          */
274         if (!TransactionIdIsValid(latestRemovedXid))
275                 return;
276
277         backends = GetConflictingVirtualXIDs(latestRemovedXid,
278                                                                                  node.dbNode);
279
280         ResolveRecoveryConflictWithVirtualXIDs(backends,
281                                                                                  PROCSIG_RECOVERY_CONFLICT_SNAPSHOT);
282 }
283
284 void
285 ResolveRecoveryConflictWithTablespace(Oid tsid)
286 {
287         VirtualTransactionId *temp_file_users;
288
289         /*
290          * Standby users may be currently using this tablespace for for their
291          * temporary files. We only care about current users because
292          * temp_tablespace parameter will just ignore tablespaces that no longer
293          * exist.
294          *
295          * Ask everybody to cancel their queries immediately so we can ensure no
296          * temp files remain and we can remove the tablespace. Nuke the entire
297          * site from orbit, it's the only way to be sure.
298          *
299          * XXX: We could work out the pids of active backends using this
300          * tablespace by examining the temp filenames in the directory. We would
301          * then convert the pids into VirtualXIDs before attempting to cancel
302          * them.
303          *
304          * We don't wait for commit because drop tablespace is non-transactional.
305          */
306         temp_file_users = GetConflictingVirtualXIDs(InvalidTransactionId,
307                                                                                                 InvalidOid);
308         ResolveRecoveryConflictWithVirtualXIDs(temp_file_users,
309                                                                            PROCSIG_RECOVERY_CONFLICT_TABLESPACE);
310 }
311
312 void
313 ResolveRecoveryConflictWithDatabase(Oid dbid)
314 {
315         /*
316          * We don't do ResolveRecoveryConflictWithVirtualXIDs() here since that
317          * only waits for transactions and completely idle sessions would block
318          * us. This is rare enough that we do this as simply as possible: no wait,
319          * just force them off immediately.
320          *
321          * No locking is required here because we already acquired
322          * AccessExclusiveLock. Anybody trying to connect while we do this will
323          * block during InitPostgres() and then disconnect when they see the
324          * database has been removed.
325          */
326         while (CountDBBackends(dbid) > 0)
327         {
328                 CancelDBBackends(dbid, PROCSIG_RECOVERY_CONFLICT_DATABASE, true);
329
330                 /*
331                  * Wait awhile for them to die so that we avoid flooding an
332                  * unresponsive backend when system is heavily loaded.
333                  */
334                 pg_usleep(10000);
335         }
336 }
337
338 static void
339 ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid)
340 {
341         VirtualTransactionId *backends;
342         bool            report_memory_error = false;
343         bool            lock_acquired = false;
344         int                     num_attempts = 0;
345         LOCKTAG         locktag;
346
347         SET_LOCKTAG_RELATION(locktag, dbOid, relOid);
348
349         /*
350          * If blowing away everybody with conflicting locks doesn't work, after
351          * the first two attempts then we just start blowing everybody away until
352          * it does work. We do this because its likely that we either have too
353          * many locks and we just can't get one at all, or that there are many
354          * people crowding for the same table. Recovery must win; the end
355          * justifies the means.
356          */
357         while (!lock_acquired)
358         {
359                 if (++num_attempts < 3)
360                         backends = GetLockConflicts(&locktag, AccessExclusiveLock);
361                 else
362                 {
363                         backends = GetConflictingVirtualXIDs(InvalidTransactionId,
364                                                                                                  InvalidOid);
365                         report_memory_error = true;
366                 }
367
368                 ResolveRecoveryConflictWithVirtualXIDs(backends,
369                                                                                          PROCSIG_RECOVERY_CONFLICT_LOCK);
370
371                 if (LockAcquireExtended(&locktag, AccessExclusiveLock, true, true, false)
372                         != LOCKACQUIRE_NOT_AVAIL)
373                         lock_acquired = true;
374         }
375 }
376
377 /*
378  * ResolveRecoveryConflictWithBufferPin is called from LockBufferForCleanup()
379  * to resolve conflicts with other backends holding buffer pins.
380  *
381  * We either resolve conflicts immediately or set a SIGALRM to wake us at
382  * the limit of our patience. The sleep in LockBufferForCleanup() is
383  * performed here, for code clarity.
384  *
385  * Resolve conflicts by sending a PROCSIG signal to all backends to check if
386  * they hold one of the buffer pins that is blocking Startup process. If so,
387  * backends will take an appropriate error action, ERROR or FATAL.
388  *
389  * We also must check for deadlocks.  Deadlocks occur because if queries
390  * wait on a lock, that must be behind an AccessExclusiveLock, which can only
391  * be cleared if the Startup process replays a transaction completion record.
392  * If Startup process is also waiting then that is a deadlock. The deadlock
393  * can occur if the query is waiting and then the Startup sleeps, or if
394  * Startup is sleeping and the query waits on a lock. We protect against
395  * only the former sequence here, the latter sequence is checked prior to
396  * the query sleeping, in CheckRecoveryConflictDeadlock().
397  *
398  * Deadlocks are extremely rare, and relatively expensive to check for,
399  * so we don't do a deadlock check right away ... only if we have had to wait
400  * at least deadlock_timeout.  Most of the logic about that is in proc.c.
401  */
402 void
403 ResolveRecoveryConflictWithBufferPin(void)
404 {
405         bool            sig_alarm_enabled = false;
406         TimestampTz ltime;
407         TimestampTz now;
408
409         Assert(InHotStandby);
410
411         ltime = GetStandbyLimitTime();
412         now = GetCurrentTimestamp();
413
414         if (!ltime)
415         {
416                 /*
417                  * We're willing to wait forever for conflicts, so set timeout for
418                  * deadlock check (only)
419                  */
420                 if (enable_standby_sig_alarm(now, now, true))
421                         sig_alarm_enabled = true;
422                 else
423                         elog(FATAL, "could not set timer for process wakeup");
424         }
425         else if (now >= ltime)
426         {
427                 /*
428                  * We're already behind, so clear a path as quickly as possible.
429                  */
430                 SendRecoveryConflictWithBufferPin(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
431         }
432         else
433         {
434                 /*
435                  * Wake up at ltime, and check for deadlocks as well if we will be
436                  * waiting longer than deadlock_timeout
437                  */
438                 if (enable_standby_sig_alarm(now, ltime, false))
439                         sig_alarm_enabled = true;
440                 else
441                         elog(FATAL, "could not set timer for process wakeup");
442         }
443
444         /* Wait to be signaled by UnpinBuffer() */
445         ProcWaitForSignal();
446
447         if (sig_alarm_enabled)
448         {
449                 if (!disable_standby_sig_alarm())
450                         elog(FATAL, "could not disable timer for process wakeup");
451         }
452 }
453
454 void
455 SendRecoveryConflictWithBufferPin(ProcSignalReason reason)
456 {
457         Assert(reason == PROCSIG_RECOVERY_CONFLICT_BUFFERPIN ||
458                    reason == PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK);
459
460         /*
461          * We send signal to all backends to ask them if they are holding the
462          * buffer pin which is delaying the Startup process. We must not set the
463          * conflict flag yet, since most backends will be innocent. Let the
464          * SIGUSR1 handling in each backend decide their own fate.
465          */
466         CancelDBBackends(InvalidOid, reason, false);
467 }
468
469 /*
470  * In Hot Standby perform early deadlock detection.  We abort the lock
471  * wait if are about to sleep while holding the buffer pin that Startup
472  * process is waiting for. The deadlock occurs because we can only be
473  * waiting behind an AccessExclusiveLock, which can only clear when a
474  * transaction completion record is replayed, which can only occur when
475  * Startup process is not waiting. So if Startup process is waiting we
476  * never will clear that lock, so if we wait we cause deadlock. If we
477  * are the Startup process then no need to check for deadlocks.
478  */
479 void
480 CheckRecoveryConflictDeadlock(LWLockId partitionLock)
481 {
482         Assert(!InRecovery);
483
484         if (!HoldingBufferPinThatDelaysRecovery())
485                 return;
486
487         LWLockRelease(partitionLock);
488
489         /*
490          * Error message should match ProcessInterrupts() but we avoid calling
491          * that because we aren't handling an interrupt at this point. Note that
492          * we only cancel the current transaction here, so if we are in a
493          * subtransaction and the pin is held by a parent, then the Startup
494          * process will continue to wait even though we have avoided deadlock.
495          */
496         ereport(ERROR,
497                         (errcode(ERRCODE_QUERY_CANCELED),
498                          errmsg("canceling statement due to conflict with recovery"),
499            errdetail("User transaction caused buffer deadlock with recovery.")));
500 }
501
502 /*
503  * -----------------------------------------------------
504  * Locking in Recovery Mode
505  * -----------------------------------------------------
506  *
507  * All locks are held by the Startup process using a single virtual
508  * transaction. This implementation is both simpler and in some senses,
509  * more correct. The locks held mean "some original transaction held
510  * this lock, so query access is not allowed at this time". So the Startup
511  * process is the proxy by which the original locks are implemented.
512  *
513  * We only keep track of AccessExclusiveLocks, which are only ever held by
514  * one transaction on one relation, and don't worry about lock queuing.
515  *
516  * We keep a single dynamically expandible list of locks in local memory,
517  * RelationLockList, so we can keep track of the various entries made by
518  * the Startup process's virtual xid in the shared lock table.
519  *
520  * List elements use type xl_rel_lock, since the WAL record type exactly
521  * matches the information that we need to keep track of.
522  *
523  * We use session locks rather than normal locks so we don't need
524  * ResourceOwners.
525  */
526
527
528 void
529 StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
530 {
531         xl_standby_lock *newlock;
532         LOCKTAG         locktag;
533
534         /* Already processed? */
535         if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
536                 return;
537
538         elog(trace_recovery(DEBUG4),
539                  "adding recovery lock: db %u rel %u", dbOid, relOid);
540
541         /* dbOid is InvalidOid when we are locking a shared relation. */
542         Assert(OidIsValid(relOid));
543
544         newlock = palloc(sizeof(xl_standby_lock));
545         newlock->xid = xid;
546         newlock->dbOid = dbOid;
547         newlock->relOid = relOid;
548         RecoveryLockList = lappend(RecoveryLockList, newlock);
549
550         /*
551          * Attempt to acquire the lock as requested, if not resolve conflict
552          */
553         SET_LOCKTAG_RELATION(locktag, newlock->dbOid, newlock->relOid);
554
555         if (LockAcquireExtended(&locktag, AccessExclusiveLock, true, true, false)
556                 == LOCKACQUIRE_NOT_AVAIL)
557                 ResolveRecoveryConflictWithLock(newlock->dbOid, newlock->relOid);
558 }
559
560 static void
561 StandbyReleaseLocks(TransactionId xid)
562 {
563         ListCell   *cell,
564                            *prev,
565                            *next;
566
567         /*
568          * Release all matching locks and remove them from list
569          */
570         prev = NULL;
571         for (cell = list_head(RecoveryLockList); cell; cell = next)
572         {
573                 xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell);
574
575                 next = lnext(cell);
576
577                 if (!TransactionIdIsValid(xid) || lock->xid == xid)
578                 {
579                         LOCKTAG         locktag;
580
581                         elog(trace_recovery(DEBUG4),
582                                  "releasing recovery lock: xid %u db %u rel %u",
583                                  lock->xid, lock->dbOid, lock->relOid);
584                         SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
585                         if (!LockRelease(&locktag, AccessExclusiveLock, true))
586                                 elog(LOG,
587                                          "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
588                                          lock->xid, lock->dbOid, lock->relOid);
589
590                         RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev);
591                         pfree(lock);
592                 }
593                 else
594                         prev = cell;
595         }
596 }
597
598 /*
599  * Release locks for a transaction tree, starting at xid down, from
600  * RecoveryLockList.
601  *
602  * Called during WAL replay of COMMIT/ROLLBACK when in hot standby mode,
603  * to remove any AccessExclusiveLocks requested by a transaction.
604  */
605 void
606 StandbyReleaseLockTree(TransactionId xid, int nsubxids, TransactionId *subxids)
607 {
608         int                     i;
609
610         StandbyReleaseLocks(xid);
611
612         for (i = 0; i < nsubxids; i++)
613                 StandbyReleaseLocks(subxids[i]);
614 }
615
616 /*
617  * StandbyReleaseLocksMany
618  *              Release standby locks held by XIDs < removeXid
619  *
620  * If keepPreparedXacts is true, keep prepared transactions even if
621  * they're older than removeXid
622  */
623 static void
624 StandbyReleaseLocksMany(TransactionId removeXid, bool keepPreparedXacts)
625 {
626         ListCell   *cell,
627                            *prev,
628                            *next;
629         LOCKTAG         locktag;
630
631         /*
632          * Release all matching locks.
633          */
634         prev = NULL;
635         for (cell = list_head(RecoveryLockList); cell; cell = next)
636         {
637                 xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell);
638
639                 next = lnext(cell);
640
641                 if (!TransactionIdIsValid(removeXid) || TransactionIdPrecedes(lock->xid, removeXid))
642                 {
643                         if (keepPreparedXacts && StandbyTransactionIdIsPrepared(lock->xid))
644                                 continue;
645                         elog(trace_recovery(DEBUG4),
646                                  "releasing recovery lock: xid %u db %u rel %u",
647                                  lock->xid, lock->dbOid, lock->relOid);
648                         SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
649                         if (!LockRelease(&locktag, AccessExclusiveLock, true))
650                                 elog(LOG,
651                                          "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
652                                          lock->xid, lock->dbOid, lock->relOid);
653                         RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev);
654                         pfree(lock);
655                 }
656                 else
657                         prev = cell;
658         }
659 }
660
661 /*
662  * Called at end of recovery and when we see a shutdown checkpoint.
663  */
664 void
665 StandbyReleaseAllLocks(void)
666 {
667         elog(trace_recovery(DEBUG2), "release all standby locks");
668         StandbyReleaseLocksMany(InvalidTransactionId, false);
669 }
670
671 /*
672  * StandbyReleaseOldLocks
673  *              Release standby locks held by XIDs < removeXid, as long
674  *              as their not prepared transactions.
675  */
676 void
677 StandbyReleaseOldLocks(TransactionId removeXid)
678 {
679         StandbyReleaseLocksMany(removeXid, true);
680 }
681
682 /*
683  * --------------------------------------------------------------------
684  *              Recovery handling for Rmgr RM_STANDBY_ID
685  *
686  * These record types will only be created if XLogStandbyInfoActive()
687  * --------------------------------------------------------------------
688  */
689
690 void
691 standby_redo(XLogRecPtr lsn, XLogRecord *record)
692 {
693         uint8           info = record->xl_info & ~XLR_INFO_MASK;
694
695         /* Do nothing if we're not in hot standby mode */
696         if (standbyState == STANDBY_DISABLED)
697                 return;
698
699         if (info == XLOG_STANDBY_LOCK)
700         {
701                 xl_standby_locks *xlrec = (xl_standby_locks *) XLogRecGetData(record);
702                 int                     i;
703
704                 for (i = 0; i < xlrec->nlocks; i++)
705                         StandbyAcquireAccessExclusiveLock(xlrec->locks[i].xid,
706                                                                                           xlrec->locks[i].dbOid,
707                                                                                           xlrec->locks[i].relOid);
708         }
709         else if (info == XLOG_RUNNING_XACTS)
710         {
711                 xl_running_xacts *xlrec = (xl_running_xacts *) XLogRecGetData(record);
712                 RunningTransactionsData running;
713
714                 running.xcnt = xlrec->xcnt;
715                 running.subxid_overflow = xlrec->subxid_overflow;
716                 running.nextXid = xlrec->nextXid;
717                 running.latestCompletedXid = xlrec->latestCompletedXid;
718                 running.oldestRunningXid = xlrec->oldestRunningXid;
719                 running.xids = xlrec->xids;
720
721                 ProcArrayApplyRecoveryInfo(&running);
722         }
723         else
724                 elog(PANIC, "relation_redo: unknown op code %u", info);
725 }
726
727 static void
728 standby_desc_running_xacts(StringInfo buf, xl_running_xacts *xlrec)
729 {
730         int                     i;
731
732         appendStringInfo(buf, " nextXid %u latestCompletedXid %u oldestRunningXid %u",
733                                          xlrec->nextXid,
734                                          xlrec->latestCompletedXid,
735                                          xlrec->oldestRunningXid);
736         if (xlrec->xcnt > 0)
737         {
738                 appendStringInfo(buf, "; %d xacts:", xlrec->xcnt);
739                 for (i = 0; i < xlrec->xcnt; i++)
740                         appendStringInfo(buf, " %u", xlrec->xids[i]);
741         }
742
743         if (xlrec->subxid_overflow)
744                 appendStringInfo(buf, "; subxid ovf");
745 }
746
747 void
748 standby_desc(StringInfo buf, uint8 xl_info, char *rec)
749 {
750         uint8           info = xl_info & ~XLR_INFO_MASK;
751
752         if (info == XLOG_STANDBY_LOCK)
753         {
754                 xl_standby_locks *xlrec = (xl_standby_locks *) rec;
755                 int                     i;
756
757                 appendStringInfo(buf, "AccessExclusive locks:");
758
759                 for (i = 0; i < xlrec->nlocks; i++)
760                         appendStringInfo(buf, " xid %u db %u rel %u",
761                                                          xlrec->locks[i].xid, xlrec->locks[i].dbOid,
762                                                          xlrec->locks[i].relOid);
763         }
764         else if (info == XLOG_RUNNING_XACTS)
765         {
766                 xl_running_xacts *xlrec = (xl_running_xacts *) rec;
767
768                 appendStringInfo(buf, " running xacts:");
769                 standby_desc_running_xacts(buf, xlrec);
770         }
771         else
772                 appendStringInfo(buf, "UNKNOWN");
773 }
774
775 /*
776  * Log details of the current snapshot to WAL. This allows the snapshot state
777  * to be reconstructed on the standby.
778  *
779  * We can move directly to STANDBY_SNAPSHOT_READY at startup if we
780  * start from a shutdown checkpoint because we know nothing was running
781  * at that time and our recovery snapshot is known empty. In the more
782  * typical case of an online checkpoint we need to jump through a few
783  * hoops to get a correct recovery snapshot and this requires a two or
784  * sometimes a three stage process.
785  *
786  * The initial snapshot must contain all running xids and all current
787  * AccessExclusiveLocks at a point in time on the standby. Assembling
788  * that information while the server is running requires many and
789  * various LWLocks, so we choose to derive that information piece by
790  * piece and then re-assemble that info on the standby. When that
791  * information is fully assembled we move to STANDBY_SNAPSHOT_READY.
792  *
793  * Since locking on the primary when we derive the information is not
794  * strict, we note that there is a time window between the derivation and
795  * writing to WAL of the derived information. That allows race conditions
796  * that we must resolve, since xids and locks may enter or leave the
797  * snapshot during that window. This creates the issue that an xid or
798  * lock may start *after* the snapshot has been derived yet *before* the
799  * snapshot is logged in the running xacts WAL record. We resolve this by
800  * starting to accumulate changes at a point just prior to when we derive
801  * the snapshot on the primary, then ignore duplicates when we later apply
802  * the snapshot from the running xacts record. This is implemented during
803  * CreateCheckpoint() where we use the logical checkpoint location as
804  * our starting point and then write the running xacts record immediately
805  * before writing the main checkpoint WAL record. Since we always start
806  * up from a checkpoint and are immediately at our starting point, we
807  * unconditionally move to STANDBY_INITIALIZED. After this point we
808  * must do 4 things:
809  *      * move shared nextXid forwards as we see new xids
810  *      * extend the clog and subtrans with each new xid
811  *      * keep track of uncommitted known assigned xids
812  *      * keep track of uncommitted AccessExclusiveLocks
813  *
814  * When we see a commit/abort we must remove known assigned xids and locks
815  * from the completing transaction. Attempted removals that cannot locate
816  * an entry are expected and must not cause an error when we are in state
817  * STANDBY_INITIALIZED. This is implemented in StandbyReleaseLocks() and
818  * KnownAssignedXidsRemove().
819  *
820  * Later, when we apply the running xact data we must be careful to ignore
821  * transactions already committed, since those commits raced ahead when
822  * making WAL entries.
823  */
824 void
825 LogStandbySnapshot(TransactionId *oldestActiveXid, TransactionId *nextXid)
826 {
827         RunningTransactions running;
828         xl_standby_lock *locks;
829         int                     nlocks;
830
831         Assert(XLogStandbyInfoActive());
832
833         /*
834          * Get details of any AccessExclusiveLocks being held at the moment.
835          *
836          * XXX GetRunningTransactionLocks() currently holds a lock on all
837          * partitions though it is possible to further optimise the locking. By
838          * reference counting locks and storing the value on the ProcArray entry
839          * for each backend we can easily tell if any locks need recording without
840          * trying to acquire the partition locks and scanning the lock table.
841          */
842         locks = GetRunningTransactionLocks(&nlocks);
843         if (nlocks > 0)
844                 LogAccessExclusiveLocks(nlocks, locks);
845
846         /*
847          * Log details of all in-progress transactions. This should be the last
848          * record we write, because standby will open up when it sees this.
849          */
850         running = GetRunningTransactionData();
851
852         /*
853          * The gap between GetRunningTransactionData() and
854          * LogCurrentRunningXacts() is what most of the fuss is about here, so
855          * artifically extending this interval is a great way to test the little
856          * used parts of the code.
857          */
858         LogCurrentRunningXacts(running);
859
860         *oldestActiveXid = running->oldestRunningXid;
861         *nextXid = running->nextXid;
862 }
863
864 /*
865  * Record an enhanced snapshot of running transactions into WAL.
866  *
867  * The definitions of RunningTransactionsData and xl_xact_running_xacts
868  * are similar. We keep them separate because xl_xact_running_xacts
869  * is a contiguous chunk of memory and never exists fully until it is
870  * assembled in WAL.
871  */
872 static void
873 LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
874 {
875         xl_running_xacts xlrec;
876         XLogRecData rdata[2];
877         int                     lastrdata = 0;
878         XLogRecPtr      recptr;
879
880         xlrec.xcnt = CurrRunningXacts->xcnt;
881         xlrec.subxid_overflow = CurrRunningXacts->subxid_overflow;
882         xlrec.nextXid = CurrRunningXacts->nextXid;
883         xlrec.oldestRunningXid = CurrRunningXacts->oldestRunningXid;
884         xlrec.latestCompletedXid = CurrRunningXacts->latestCompletedXid;
885
886         /* Header */
887         rdata[0].data = (char *) (&xlrec);
888         rdata[0].len = MinSizeOfXactRunningXacts;
889         rdata[0].buffer = InvalidBuffer;
890
891         /* array of TransactionIds */
892         if (xlrec.xcnt > 0)
893         {
894                 rdata[0].next = &(rdata[1]);
895                 rdata[1].data = (char *) CurrRunningXacts->xids;
896                 rdata[1].len = xlrec.xcnt * sizeof(TransactionId);
897                 rdata[1].buffer = InvalidBuffer;
898                 lastrdata = 1;
899         }
900
901         rdata[lastrdata].next = NULL;
902
903         recptr = XLogInsert(RM_STANDBY_ID, XLOG_RUNNING_XACTS, rdata);
904
905         if (CurrRunningXacts->subxid_overflow)
906                 elog(trace_recovery(DEBUG2),
907                          "snapshot of %u running transactions overflowed (lsn %X/%X oldest xid %u latest complete %u next xid %u)",
908                          CurrRunningXacts->xcnt,
909                          recptr.xlogid, recptr.xrecoff,
910                          CurrRunningXacts->oldestRunningXid,
911                          CurrRunningXacts->latestCompletedXid,
912                          CurrRunningXacts->nextXid);
913         else
914                 elog(trace_recovery(DEBUG2),
915                          "snapshot of %u running transaction ids (lsn %X/%X oldest xid %u latest complete %u next xid %u)",
916                          CurrRunningXacts->xcnt,
917                          recptr.xlogid, recptr.xrecoff,
918                          CurrRunningXacts->oldestRunningXid,
919                          CurrRunningXacts->latestCompletedXid,
920                          CurrRunningXacts->nextXid);
921 }
922
923 /*
924  * Wholesale logging of AccessExclusiveLocks. Other lock types need not be
925  * logged, as described in backend/storage/lmgr/README.
926  */
927 static void
928 LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks)
929 {
930         XLogRecData rdata[2];
931         xl_standby_locks xlrec;
932
933         xlrec.nlocks = nlocks;
934
935         rdata[0].data = (char *) &xlrec;
936         rdata[0].len = offsetof(xl_standby_locks, locks);
937         rdata[0].buffer = InvalidBuffer;
938         rdata[0].next = &rdata[1];
939
940         rdata[1].data = (char *) locks;
941         rdata[1].len = nlocks * sizeof(xl_standby_lock);
942         rdata[1].buffer = InvalidBuffer;
943         rdata[1].next = NULL;
944
945         (void) XLogInsert(RM_STANDBY_ID, XLOG_STANDBY_LOCK, rdata);
946 }
947
948 /*
949  * Individual logging of AccessExclusiveLocks for use during LockAcquire()
950  */
951 void
952 LogAccessExclusiveLock(Oid dbOid, Oid relOid)
953 {
954         xl_standby_lock xlrec;
955
956         xlrec.xid = GetTopTransactionId();
957
958         /*
959          * Decode the locktag back to the original values, to avoid sending lots
960          * of empty bytes with every message.  See lock.h to check how a locktag
961          * is defined for LOCKTAG_RELATION
962          */
963         xlrec.dbOid = dbOid;
964         xlrec.relOid = relOid;
965
966         LogAccessExclusiveLocks(1, &xlrec);
967 }
968
969 /*
970  * Prepare to log an AccessExclusiveLock, for use during LockAcquire()
971  */
972 void
973 LogAccessExclusiveLockPrepare(void)
974 {
975         /*
976          * Ensure that a TransactionId has been assigned to this transaction,
977          * for two reasons, both related to lock release on the standby.
978          * First, we must assign an xid so that RecordTransactionCommit() and
979          * RecordTransactionAbort() do not optimise away the transaction
980          * completion record which recovery relies upon to release locks. It's
981          * a hack, but for a corner case not worth adding code for into the
982          * main commit path. Second, must must assign an xid before the lock
983          * is recorded in shared memory, otherwise a concurrently executing
984          * GetRunningTransactionLocks() might see a lock associated with an
985          * InvalidTransactionId which we later assert cannot happen.
986          */
987         (void) GetTopTransactionId();
988 }