OSDN Git Service

Fix typo in sslmode documentation
[pg-rex/syncrep.git] / src / backend / storage / ipc / standby.c
1 /*-------------------------------------------------------------------------
2  *
3  * standby.c
4  *        Misc functions used in Hot Standby mode.
5  *
6  *      All functions for handling RM_STANDBY_ID, which relate to
7  *      AccessExclusiveLocks and starting snapshots for Hot Standby mode.
8  *      Plus conflict recovery processing.
9  *
10  * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
11  * Portions Copyright (c) 1994, Regents of the University of California
12  *
13  * IDENTIFICATION
14  *        src/backend/storage/ipc/standby.c
15  *
16  *-------------------------------------------------------------------------
17  */
18 #include "postgres.h"
19 #include "access/transam.h"
20 #include "access/twophase.h"
21 #include "access/xact.h"
22 #include "access/xlog.h"
23 #include "miscadmin.h"
24 #include "storage/bufmgr.h"
25 #include "storage/lmgr.h"
26 #include "storage/proc.h"
27 #include "storage/procarray.h"
28 #include "storage/sinvaladt.h"
29 #include "storage/standby.h"
30 #include "utils/ps_status.h"
31
32 /* User-settable GUC parameters */
33 int                     vacuum_defer_cleanup_age;
34 int                     max_standby_archive_delay = 30 * 1000;
35 int                     max_standby_streaming_delay = 30 * 1000;
36
37 static List *RecoveryLockList;
38
39 static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
40                                                                            ProcSignalReason reason);
41 static void ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid);
42 static void LogCurrentRunningXacts(RunningTransactions CurrRunningXacts);
43 static void LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks);
44
45
46 /*
47  * InitRecoveryTransactionEnvironment
48  *              Initialize tracking of in-progress transactions in master
49  *
50  * We need to issue shared invalidations and hold locks. Holding locks
51  * means others may want to wait on us, so we need to make a lock table
52  * vxact entry like a real transaction. We could create and delete
53  * lock table entries for each transaction but its simpler just to create
54  * one permanent entry and leave it there all the time. Locks are then
55  * acquired and released as needed. Yes, this means you can see the
56  * Startup process in pg_locks once we have run this.
57  */
58 void
59 InitRecoveryTransactionEnvironment(void)
60 {
61         VirtualTransactionId vxid;
62
63         /*
64          * Initialize shared invalidation management for Startup process, being
65          * careful to register ourselves as a sendOnly process so we don't need to
66          * read messages, nor will we get signalled when the queue starts filling
67          * up.
68          */
69         SharedInvalBackendInit(true);
70
71         /*
72          * Lock a virtual transaction id for Startup process.
73          *
74          * We need to do GetNextLocalTransactionId() because
75          * SharedInvalBackendInit() leaves localTransactionid invalid and the lock
76          * manager doesn't like that at all.
77          *
78          * Note that we don't need to run XactLockTableInsert() because nobody
79          * needs to wait on xids. That sounds a little strange, but table locks
80          * are held by vxids and row level locks are held by xids. All queries
81          * hold AccessShareLocks so never block while we write or lock new rows.
82          */
83         vxid.backendId = MyBackendId;
84         vxid.localTransactionId = GetNextLocalTransactionId();
85         VirtualXactLockTableInsert(vxid);
86
87         standbyState = STANDBY_INITIALIZED;
88 }
89
90 /*
91  * ShutdownRecoveryTransactionEnvironment
92  *              Shut down transaction tracking
93  *
94  * Prepare to switch from hot standby mode to normal operation. Shut down
95  * recovery-time transaction tracking.
96  */
97 void
98 ShutdownRecoveryTransactionEnvironment(void)
99 {
100         /* Mark all tracked in-progress transactions as finished. */
101         ExpireAllKnownAssignedTransactionIds();
102
103         /* Release all locks the tracked transactions were holding */
104         StandbyReleaseAllLocks();
105 }
106
107
108 /*
109  * -----------------------------------------------------
110  *              Standby wait timers and backend cancel logic
111  * -----------------------------------------------------
112  */
113
114 /*
115  * Determine the cutoff time at which we want to start canceling conflicting
116  * transactions.  Returns zero (a time safely in the past) if we are willing
117  * to wait forever.
118  */
119 static TimestampTz
120 GetStandbyLimitTime(void)
121 {
122         TimestampTz rtime;
123         bool            fromStream;
124
125         /*
126          * The cutoff time is the last WAL data receipt time plus the appropriate
127          * delay variable.      Delay of -1 means wait forever.
128          */
129         GetXLogReceiptTime(&rtime, &fromStream);
130         if (fromStream)
131         {
132                 if (max_standby_streaming_delay < 0)
133                         return 0;                       /* wait forever */
134                 return TimestampTzPlusMilliseconds(rtime, max_standby_streaming_delay);
135         }
136         else
137         {
138                 if (max_standby_archive_delay < 0)
139                         return 0;                       /* wait forever */
140                 return TimestampTzPlusMilliseconds(rtime, max_standby_archive_delay);
141         }
142 }
143
144 #define STANDBY_INITIAL_WAIT_US  1000
145 static int      standbyWait_us = STANDBY_INITIAL_WAIT_US;
146
147 /*
148  * Standby wait logic for ResolveRecoveryConflictWithVirtualXIDs.
149  * We wait here for a while then return. If we decide we can't wait any
150  * more then we return true, if we can wait some more return false.
151  */
152 static bool
153 WaitExceedsMaxStandbyDelay(void)
154 {
155         TimestampTz ltime;
156
157         /* Are we past the limit time? */
158         ltime = GetStandbyLimitTime();
159         if (ltime && GetCurrentTimestamp() >= ltime)
160                 return true;
161
162         /*
163          * Sleep a bit (this is essential to avoid busy-waiting).
164          */
165         pg_usleep(standbyWait_us);
166
167         /*
168          * Progressively increase the sleep times, but not to more than 1s, since
169          * pg_usleep isn't interruptable on some platforms.
170          */
171         standbyWait_us *= 2;
172         if (standbyWait_us > 1000000)
173                 standbyWait_us = 1000000;
174
175         return false;
176 }
177
178 /*
179  * This is the main executioner for any query backend that conflicts with
180  * recovery processing. Judgement has already been passed on it within
181  * a specific rmgr. Here we just issue the orders to the procs. The procs
182  * then throw the required error as instructed.
183  */
184 static void
185 ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
186                                                                            ProcSignalReason reason)
187 {
188         TimestampTz waitStart;
189         char       *new_status;
190
191         /* Fast exit, to avoid a kernel call if there's no work to be done. */
192         if (!VirtualTransactionIdIsValid(*waitlist))
193                 return;
194
195         waitStart = GetCurrentTimestamp();
196         new_status = NULL;                      /* we haven't changed the ps display */
197
198         while (VirtualTransactionIdIsValid(*waitlist))
199         {
200                 /* reset standbyWait_us for each xact we wait for */
201                 standbyWait_us = STANDBY_INITIAL_WAIT_US;
202
203                 /* wait until the virtual xid is gone */
204                 while (!ConditionalVirtualXactLockTableWait(*waitlist))
205                 {
206                         /*
207                          * Report via ps if we have been waiting for more than 500 msec
208                          * (should that be configurable?)
209                          */
210                         if (update_process_title && new_status == NULL &&
211                                 TimestampDifferenceExceeds(waitStart, GetCurrentTimestamp(),
212                                                                                    500))
213                         {
214                                 const char *old_status;
215                                 int                     len;
216
217                                 old_status = get_ps_display(&len);
218                                 new_status = (char *) palloc(len + 8 + 1);
219                                 memcpy(new_status, old_status, len);
220                                 strcpy(new_status + len, " waiting");
221                                 set_ps_display(new_status, false);
222                                 new_status[len] = '\0'; /* truncate off " waiting" */
223                         }
224
225                         /* Is it time to kill it? */
226                         if (WaitExceedsMaxStandbyDelay())
227                         {
228                                 pid_t           pid;
229
230                                 /*
231                                  * Now find out who to throw out of the balloon.
232                                  */
233                                 Assert(VirtualTransactionIdIsValid(*waitlist));
234                                 pid = CancelVirtualTransaction(*waitlist, reason);
235
236                                 /*
237                                  * Wait a little bit for it to die so that we avoid flooding
238                                  * an unresponsive backend when system is heavily loaded.
239                                  */
240                                 if (pid != 0)
241                                         pg_usleep(5000L);
242                         }
243                 }
244
245                 /* The virtual transaction is gone now, wait for the next one */
246                 waitlist++;
247         }
248
249         /* Reset ps display if we changed it */
250         if (new_status)
251         {
252                 set_ps_display(new_status, false);
253                 pfree(new_status);
254         }
255 }
256
257 void
258 ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode node)
259 {
260         VirtualTransactionId *backends;
261
262         /*
263          * If we get passed InvalidTransactionId then we are a little surprised,
264          * but it is theoretically possible in normal running. It also happens
265          * when replaying already applied WAL records after a standby crash or
266          * restart. If latestRemovedXid is invalid then there is no conflict. That
267          * rule applies across all record types that suffer from this conflict.
268          */
269         if (!TransactionIdIsValid(latestRemovedXid))
270                 return;
271
272         backends = GetConflictingVirtualXIDs(latestRemovedXid,
273                                                                                  node.dbNode);
274
275         ResolveRecoveryConflictWithVirtualXIDs(backends,
276                                                                                  PROCSIG_RECOVERY_CONFLICT_SNAPSHOT);
277 }
278
279 void
280 ResolveRecoveryConflictWithTablespace(Oid tsid)
281 {
282         VirtualTransactionId *temp_file_users;
283
284         /*
285          * Standby users may be currently using this tablespace for for their
286          * temporary files. We only care about current users because
287          * temp_tablespace parameter will just ignore tablespaces that no longer
288          * exist.
289          *
290          * Ask everybody to cancel their queries immediately so we can ensure no
291          * temp files remain and we can remove the tablespace. Nuke the entire
292          * site from orbit, it's the only way to be sure.
293          *
294          * XXX: We could work out the pids of active backends using this
295          * tablespace by examining the temp filenames in the directory. We would
296          * then convert the pids into VirtualXIDs before attempting to cancel
297          * them.
298          *
299          * We don't wait for commit because drop tablespace is non-transactional.
300          */
301         temp_file_users = GetConflictingVirtualXIDs(InvalidTransactionId,
302                                                                                                 InvalidOid);
303         ResolveRecoveryConflictWithVirtualXIDs(temp_file_users,
304                                                                            PROCSIG_RECOVERY_CONFLICT_TABLESPACE);
305 }
306
307 void
308 ResolveRecoveryConflictWithDatabase(Oid dbid)
309 {
310         /*
311          * We don't do ResolveRecoveryConflictWithVirtualXIDs() here since that
312          * only waits for transactions and completely idle sessions would block
313          * us. This is rare enough that we do this as simply as possible: no wait,
314          * just force them off immediately.
315          *
316          * No locking is required here because we already acquired
317          * AccessExclusiveLock. Anybody trying to connect while we do this will
318          * block during InitPostgres() and then disconnect when they see the
319          * database has been removed.
320          */
321         while (CountDBBackends(dbid) > 0)
322         {
323                 CancelDBBackends(dbid, PROCSIG_RECOVERY_CONFLICT_DATABASE, true);
324
325                 /*
326                  * Wait awhile for them to die so that we avoid flooding an
327                  * unresponsive backend when system is heavily loaded.
328                  */
329                 pg_usleep(10000);
330         }
331 }
332
333 static void
334 ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid)
335 {
336         VirtualTransactionId *backends;
337         bool            lock_acquired = false;
338         int                     num_attempts = 0;
339         LOCKTAG         locktag;
340
341         SET_LOCKTAG_RELATION(locktag, dbOid, relOid);
342
343         /*
344          * If blowing away everybody with conflicting locks doesn't work, after
345          * the first two attempts then we just start blowing everybody away until
346          * it does work. We do this because its likely that we either have too
347          * many locks and we just can't get one at all, or that there are many
348          * people crowding for the same table. Recovery must win; the end
349          * justifies the means.
350          */
351         while (!lock_acquired)
352         {
353                 if (++num_attempts < 3)
354                         backends = GetLockConflicts(&locktag, AccessExclusiveLock);
355                 else
356                         backends = GetConflictingVirtualXIDs(InvalidTransactionId,
357                                                                                                  InvalidOid);
358
359                 ResolveRecoveryConflictWithVirtualXIDs(backends,
360                                                                                          PROCSIG_RECOVERY_CONFLICT_LOCK);
361
362                 if (LockAcquireExtended(&locktag, AccessExclusiveLock, true, true, false)
363                         != LOCKACQUIRE_NOT_AVAIL)
364                         lock_acquired = true;
365         }
366 }
367
368 /*
369  * ResolveRecoveryConflictWithBufferPin is called from LockBufferForCleanup()
370  * to resolve conflicts with other backends holding buffer pins.
371  *
372  * We either resolve conflicts immediately or set a SIGALRM to wake us at
373  * the limit of our patience. The sleep in LockBufferForCleanup() is
374  * performed here, for code clarity.
375  *
376  * Resolve conflicts by sending a PROCSIG signal to all backends to check if
377  * they hold one of the buffer pins that is blocking Startup process. If so,
378  * backends will take an appropriate error action, ERROR or FATAL.
379  *
380  * We also must check for deadlocks.  Deadlocks occur because if queries
381  * wait on a lock, that must be behind an AccessExclusiveLock, which can only
382  * be cleared if the Startup process replays a transaction completion record.
383  * If Startup process is also waiting then that is a deadlock. The deadlock
384  * can occur if the query is waiting and then the Startup sleeps, or if
385  * Startup is sleeping and the query waits on a lock. We protect against
386  * only the former sequence here, the latter sequence is checked prior to
387  * the query sleeping, in CheckRecoveryConflictDeadlock().
388  *
389  * Deadlocks are extremely rare, and relatively expensive to check for,
390  * so we don't do a deadlock check right away ... only if we have had to wait
391  * at least deadlock_timeout.  Most of the logic about that is in proc.c.
392  */
393 void
394 ResolveRecoveryConflictWithBufferPin(void)
395 {
396         bool            sig_alarm_enabled = false;
397         TimestampTz ltime;
398         TimestampTz now;
399
400         Assert(InHotStandby);
401
402         ltime = GetStandbyLimitTime();
403         now = GetCurrentTimestamp();
404
405         if (!ltime)
406         {
407                 /*
408                  * We're willing to wait forever for conflicts, so set timeout for
409                  * deadlock check (only)
410                  */
411                 if (enable_standby_sig_alarm(now, now, true))
412                         sig_alarm_enabled = true;
413                 else
414                         elog(FATAL, "could not set timer for process wakeup");
415         }
416         else if (now >= ltime)
417         {
418                 /*
419                  * We're already behind, so clear a path as quickly as possible.
420                  */
421                 SendRecoveryConflictWithBufferPin(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
422         }
423         else
424         {
425                 /*
426                  * Wake up at ltime, and check for deadlocks as well if we will be
427                  * waiting longer than deadlock_timeout
428                  */
429                 if (enable_standby_sig_alarm(now, ltime, false))
430                         sig_alarm_enabled = true;
431                 else
432                         elog(FATAL, "could not set timer for process wakeup");
433         }
434
435         /* Wait to be signaled by UnpinBuffer() */
436         ProcWaitForSignal();
437
438         if (sig_alarm_enabled)
439         {
440                 if (!disable_standby_sig_alarm())
441                         elog(FATAL, "could not disable timer for process wakeup");
442         }
443 }
444
445 void
446 SendRecoveryConflictWithBufferPin(ProcSignalReason reason)
447 {
448         Assert(reason == PROCSIG_RECOVERY_CONFLICT_BUFFERPIN ||
449                    reason == PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK);
450
451         /*
452          * We send signal to all backends to ask them if they are holding the
453          * buffer pin which is delaying the Startup process. We must not set the
454          * conflict flag yet, since most backends will be innocent. Let the
455          * SIGUSR1 handling in each backend decide their own fate.
456          */
457         CancelDBBackends(InvalidOid, reason, false);
458 }
459
460 /*
461  * In Hot Standby perform early deadlock detection.  We abort the lock
462  * wait if are about to sleep while holding the buffer pin that Startup
463  * process is waiting for. The deadlock occurs because we can only be
464  * waiting behind an AccessExclusiveLock, which can only clear when a
465  * transaction completion record is replayed, which can only occur when
466  * Startup process is not waiting. So if Startup process is waiting we
467  * never will clear that lock, so if we wait we cause deadlock. If we
468  * are the Startup process then no need to check for deadlocks.
469  */
470 void
471 CheckRecoveryConflictDeadlock(LWLockId partitionLock)
472 {
473         Assert(!InRecovery);
474
475         if (!HoldingBufferPinThatDelaysRecovery())
476                 return;
477
478         LWLockRelease(partitionLock);
479
480         /*
481          * Error message should match ProcessInterrupts() but we avoid calling
482          * that because we aren't handling an interrupt at this point. Note that
483          * we only cancel the current transaction here, so if we are in a
484          * subtransaction and the pin is held by a parent, then the Startup
485          * process will continue to wait even though we have avoided deadlock.
486          */
487         ereport(ERROR,
488                         (errcode(ERRCODE_T_R_DEADLOCK_DETECTED),
489                          errmsg("canceling statement due to conflict with recovery"),
490            errdetail("User transaction caused buffer deadlock with recovery.")));
491 }
492
493 /*
494  * -----------------------------------------------------
495  * Locking in Recovery Mode
496  * -----------------------------------------------------
497  *
498  * All locks are held by the Startup process using a single virtual
499  * transaction. This implementation is both simpler and in some senses,
500  * more correct. The locks held mean "some original transaction held
501  * this lock, so query access is not allowed at this time". So the Startup
502  * process is the proxy by which the original locks are implemented.
503  *
504  * We only keep track of AccessExclusiveLocks, which are only ever held by
505  * one transaction on one relation, and don't worry about lock queuing.
506  *
507  * We keep a single dynamically expandible list of locks in local memory,
508  * RelationLockList, so we can keep track of the various entries made by
509  * the Startup process's virtual xid in the shared lock table.
510  *
511  * List elements use type xl_rel_lock, since the WAL record type exactly
512  * matches the information that we need to keep track of.
513  *
514  * We use session locks rather than normal locks so we don't need
515  * ResourceOwners.
516  */
517
518
519 void
520 StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
521 {
522         xl_standby_lock *newlock;
523         LOCKTAG         locktag;
524
525         /* Already processed? */
526         if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
527                 return;
528
529         elog(trace_recovery(DEBUG4),
530                  "adding recovery lock: db %u rel %u", dbOid, relOid);
531
532         /* dbOid is InvalidOid when we are locking a shared relation. */
533         Assert(OidIsValid(relOid));
534
535         newlock = palloc(sizeof(xl_standby_lock));
536         newlock->xid = xid;
537         newlock->dbOid = dbOid;
538         newlock->relOid = relOid;
539         RecoveryLockList = lappend(RecoveryLockList, newlock);
540
541         /*
542          * Attempt to acquire the lock as requested, if not resolve conflict
543          */
544         SET_LOCKTAG_RELATION(locktag, newlock->dbOid, newlock->relOid);
545
546         if (LockAcquireExtended(&locktag, AccessExclusiveLock, true, true, false)
547                 == LOCKACQUIRE_NOT_AVAIL)
548                 ResolveRecoveryConflictWithLock(newlock->dbOid, newlock->relOid);
549 }
550
551 static void
552 StandbyReleaseLocks(TransactionId xid)
553 {
554         ListCell   *cell,
555                            *prev,
556                            *next;
557
558         /*
559          * Release all matching locks and remove them from list
560          */
561         prev = NULL;
562         for (cell = list_head(RecoveryLockList); cell; cell = next)
563         {
564                 xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell);
565
566                 next = lnext(cell);
567
568                 if (!TransactionIdIsValid(xid) || lock->xid == xid)
569                 {
570                         LOCKTAG         locktag;
571
572                         elog(trace_recovery(DEBUG4),
573                                  "releasing recovery lock: xid %u db %u rel %u",
574                                  lock->xid, lock->dbOid, lock->relOid);
575                         SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
576                         if (!LockRelease(&locktag, AccessExclusiveLock, true))
577                                 elog(LOG,
578                                          "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
579                                          lock->xid, lock->dbOid, lock->relOid);
580
581                         RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev);
582                         pfree(lock);
583                 }
584                 else
585                         prev = cell;
586         }
587 }
588
589 /*
590  * Release locks for a transaction tree, starting at xid down, from
591  * RecoveryLockList.
592  *
593  * Called during WAL replay of COMMIT/ROLLBACK when in hot standby mode,
594  * to remove any AccessExclusiveLocks requested by a transaction.
595  */
596 void
597 StandbyReleaseLockTree(TransactionId xid, int nsubxids, TransactionId *subxids)
598 {
599         int                     i;
600
601         StandbyReleaseLocks(xid);
602
603         for (i = 0; i < nsubxids; i++)
604                 StandbyReleaseLocks(subxids[i]);
605 }
606
607 /*
608  * StandbyReleaseLocksMany
609  *              Release standby locks held by XIDs < removeXid
610  *
611  * If keepPreparedXacts is true, keep prepared transactions even if
612  * they're older than removeXid
613  */
614 static void
615 StandbyReleaseLocksMany(TransactionId removeXid, bool keepPreparedXacts)
616 {
617         ListCell   *cell,
618                            *prev,
619                            *next;
620         LOCKTAG         locktag;
621
622         /*
623          * Release all matching locks.
624          */
625         prev = NULL;
626         for (cell = list_head(RecoveryLockList); cell; cell = next)
627         {
628                 xl_standby_lock *lock = (xl_standby_lock *) lfirst(cell);
629
630                 next = lnext(cell);
631
632                 if (!TransactionIdIsValid(removeXid) || TransactionIdPrecedes(lock->xid, removeXid))
633                 {
634                         if (keepPreparedXacts && StandbyTransactionIdIsPrepared(lock->xid))
635                                 continue;
636                         elog(trace_recovery(DEBUG4),
637                                  "releasing recovery lock: xid %u db %u rel %u",
638                                  lock->xid, lock->dbOid, lock->relOid);
639                         SET_LOCKTAG_RELATION(locktag, lock->dbOid, lock->relOid);
640                         if (!LockRelease(&locktag, AccessExclusiveLock, true))
641                                 elog(LOG,
642                                          "RecoveryLockList contains entry for lock no longer recorded by lock manager: xid %u database %u relation %u",
643                                          lock->xid, lock->dbOid, lock->relOid);
644                         RecoveryLockList = list_delete_cell(RecoveryLockList, cell, prev);
645                         pfree(lock);
646                 }
647                 else
648                         prev = cell;
649         }
650 }
651
652 /*
653  * Called at end of recovery and when we see a shutdown checkpoint.
654  */
655 void
656 StandbyReleaseAllLocks(void)
657 {
658         elog(trace_recovery(DEBUG2), "release all standby locks");
659         StandbyReleaseLocksMany(InvalidTransactionId, false);
660 }
661
662 /*
663  * StandbyReleaseOldLocks
664  *              Release standby locks held by XIDs < removeXid, as long
665  *              as they're not prepared transactions.
666  */
667 void
668 StandbyReleaseOldLocks(TransactionId removeXid)
669 {
670         StandbyReleaseLocksMany(removeXid, true);
671 }
672
673 /*
674  * --------------------------------------------------------------------
675  *              Recovery handling for Rmgr RM_STANDBY_ID
676  *
677  * These record types will only be created if XLogStandbyInfoActive()
678  * --------------------------------------------------------------------
679  */
680
681 void
682 standby_redo(XLogRecPtr lsn, XLogRecord *record)
683 {
684         uint8           info = record->xl_info & ~XLR_INFO_MASK;
685
686         /* Do nothing if we're not in hot standby mode */
687         if (standbyState == STANDBY_DISABLED)
688                 return;
689
690         if (info == XLOG_STANDBY_LOCK)
691         {
692                 xl_standby_locks *xlrec = (xl_standby_locks *) XLogRecGetData(record);
693                 int                     i;
694
695                 for (i = 0; i < xlrec->nlocks; i++)
696                         StandbyAcquireAccessExclusiveLock(xlrec->locks[i].xid,
697                                                                                           xlrec->locks[i].dbOid,
698                                                                                           xlrec->locks[i].relOid);
699         }
700         else if (info == XLOG_RUNNING_XACTS)
701         {
702                 xl_running_xacts *xlrec = (xl_running_xacts *) XLogRecGetData(record);
703                 RunningTransactionsData running;
704
705                 running.xcnt = xlrec->xcnt;
706                 running.subxid_overflow = xlrec->subxid_overflow;
707                 running.nextXid = xlrec->nextXid;
708                 running.latestCompletedXid = xlrec->latestCompletedXid;
709                 running.oldestRunningXid = xlrec->oldestRunningXid;
710                 running.xids = xlrec->xids;
711
712                 ProcArrayApplyRecoveryInfo(&running);
713         }
714         else
715                 elog(PANIC, "relation_redo: unknown op code %u", info);
716 }
717
718 static void
719 standby_desc_running_xacts(StringInfo buf, xl_running_xacts *xlrec)
720 {
721         int                     i;
722
723         appendStringInfo(buf, " nextXid %u latestCompletedXid %u oldestRunningXid %u",
724                                          xlrec->nextXid,
725                                          xlrec->latestCompletedXid,
726                                          xlrec->oldestRunningXid);
727         if (xlrec->xcnt > 0)
728         {
729                 appendStringInfo(buf, "; %d xacts:", xlrec->xcnt);
730                 for (i = 0; i < xlrec->xcnt; i++)
731                         appendStringInfo(buf, " %u", xlrec->xids[i]);
732         }
733
734         if (xlrec->subxid_overflow)
735                 appendStringInfo(buf, "; subxid ovf");
736 }
737
738 void
739 standby_desc(StringInfo buf, uint8 xl_info, char *rec)
740 {
741         uint8           info = xl_info & ~XLR_INFO_MASK;
742
743         if (info == XLOG_STANDBY_LOCK)
744         {
745                 xl_standby_locks *xlrec = (xl_standby_locks *) rec;
746                 int                     i;
747
748                 appendStringInfo(buf, "AccessExclusive locks:");
749
750                 for (i = 0; i < xlrec->nlocks; i++)
751                         appendStringInfo(buf, " xid %u db %u rel %u",
752                                                          xlrec->locks[i].xid, xlrec->locks[i].dbOid,
753                                                          xlrec->locks[i].relOid);
754         }
755         else if (info == XLOG_RUNNING_XACTS)
756         {
757                 xl_running_xacts *xlrec = (xl_running_xacts *) rec;
758
759                 appendStringInfo(buf, " running xacts:");
760                 standby_desc_running_xacts(buf, xlrec);
761         }
762         else
763                 appendStringInfo(buf, "UNKNOWN");
764 }
765
766 /*
767  * Log details of the current snapshot to WAL. This allows the snapshot state
768  * to be reconstructed on the standby.
769  *
770  * We can move directly to STANDBY_SNAPSHOT_READY at startup if we
771  * start from a shutdown checkpoint because we know nothing was running
772  * at that time and our recovery snapshot is known empty. In the more
773  * typical case of an online checkpoint we need to jump through a few
774  * hoops to get a correct recovery snapshot and this requires a two or
775  * sometimes a three stage process.
776  *
777  * The initial snapshot must contain all running xids and all current
778  * AccessExclusiveLocks at a point in time on the standby. Assembling
779  * that information while the server is running requires many and
780  * various LWLocks, so we choose to derive that information piece by
781  * piece and then re-assemble that info on the standby. When that
782  * information is fully assembled we move to STANDBY_SNAPSHOT_READY.
783  *
784  * Since locking on the primary when we derive the information is not
785  * strict, we note that there is a time window between the derivation and
786  * writing to WAL of the derived information. That allows race conditions
787  * that we must resolve, since xids and locks may enter or leave the
788  * snapshot during that window. This creates the issue that an xid or
789  * lock may start *after* the snapshot has been derived yet *before* the
790  * snapshot is logged in the running xacts WAL record. We resolve this by
791  * starting to accumulate changes at a point just prior to when we derive
792  * the snapshot on the primary, then ignore duplicates when we later apply
793  * the snapshot from the running xacts record. This is implemented during
794  * CreateCheckpoint() where we use the logical checkpoint location as
795  * our starting point and then write the running xacts record immediately
796  * before writing the main checkpoint WAL record. Since we always start
797  * up from a checkpoint and are immediately at our starting point, we
798  * unconditionally move to STANDBY_INITIALIZED. After this point we
799  * must do 4 things:
800  *      * move shared nextXid forwards as we see new xids
801  *      * extend the clog and subtrans with each new xid
802  *      * keep track of uncommitted known assigned xids
803  *      * keep track of uncommitted AccessExclusiveLocks
804  *
805  * When we see a commit/abort we must remove known assigned xids and locks
806  * from the completing transaction. Attempted removals that cannot locate
807  * an entry are expected and must not cause an error when we are in state
808  * STANDBY_INITIALIZED. This is implemented in StandbyReleaseLocks() and
809  * KnownAssignedXidsRemove().
810  *
811  * Later, when we apply the running xact data we must be careful to ignore
812  * transactions already committed, since those commits raced ahead when
813  * making WAL entries.
814  */
815 void
816 LogStandbySnapshot(TransactionId *oldestActiveXid, TransactionId *nextXid)
817 {
818         RunningTransactions running;
819         xl_standby_lock *locks;
820         int                     nlocks;
821
822         Assert(XLogStandbyInfoActive());
823
824         /*
825          * Get details of any AccessExclusiveLocks being held at the moment.
826          *
827          * XXX GetRunningTransactionLocks() currently holds a lock on all
828          * partitions though it is possible to further optimise the locking. By
829          * reference counting locks and storing the value on the ProcArray entry
830          * for each backend we can easily tell if any locks need recording without
831          * trying to acquire the partition locks and scanning the lock table.
832          */
833         locks = GetRunningTransactionLocks(&nlocks);
834         if (nlocks > 0)
835                 LogAccessExclusiveLocks(nlocks, locks);
836
837         /*
838          * Log details of all in-progress transactions. This should be the last
839          * record we write, because standby will open up when it sees this.
840          */
841         running = GetRunningTransactionData();
842         LogCurrentRunningXacts(running);
843         /* GetRunningTransactionData() acquired XidGenLock, we must release it */
844         LWLockRelease(XidGenLock);
845
846         *oldestActiveXid = running->oldestRunningXid;
847         *nextXid = running->nextXid;
848 }
849
850 /*
851  * Record an enhanced snapshot of running transactions into WAL.
852  *
853  * The definitions of RunningTransactionsData and xl_xact_running_xacts
854  * are similar. We keep them separate because xl_xact_running_xacts
855  * is a contiguous chunk of memory and never exists fully until it is
856  * assembled in WAL.
857  */
858 static void
859 LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
860 {
861         xl_running_xacts xlrec;
862         XLogRecData rdata[2];
863         int                     lastrdata = 0;
864         XLogRecPtr      recptr;
865
866         xlrec.xcnt = CurrRunningXacts->xcnt;
867         xlrec.subxid_overflow = CurrRunningXacts->subxid_overflow;
868         xlrec.nextXid = CurrRunningXacts->nextXid;
869         xlrec.oldestRunningXid = CurrRunningXacts->oldestRunningXid;
870         xlrec.latestCompletedXid = CurrRunningXacts->latestCompletedXid;
871
872         /* Header */
873         rdata[0].data = (char *) (&xlrec);
874         rdata[0].len = MinSizeOfXactRunningXacts;
875         rdata[0].buffer = InvalidBuffer;
876
877         /* array of TransactionIds */
878         if (xlrec.xcnt > 0)
879         {
880                 rdata[0].next = &(rdata[1]);
881                 rdata[1].data = (char *) CurrRunningXacts->xids;
882                 rdata[1].len = xlrec.xcnt * sizeof(TransactionId);
883                 rdata[1].buffer = InvalidBuffer;
884                 lastrdata = 1;
885         }
886
887         rdata[lastrdata].next = NULL;
888
889         recptr = XLogInsert(RM_STANDBY_ID, XLOG_RUNNING_XACTS, rdata);
890
891         if (CurrRunningXacts->subxid_overflow)
892                 elog(trace_recovery(DEBUG2),
893                          "snapshot of %u running transactions overflowed (lsn %X/%X oldest xid %u latest complete %u next xid %u)",
894                          CurrRunningXacts->xcnt,
895                          recptr.xlogid, recptr.xrecoff,
896                          CurrRunningXacts->oldestRunningXid,
897                          CurrRunningXacts->latestCompletedXid,
898                          CurrRunningXacts->nextXid);
899         else
900                 elog(trace_recovery(DEBUG2),
901                          "snapshot of %u running transaction ids (lsn %X/%X oldest xid %u latest complete %u next xid %u)",
902                          CurrRunningXacts->xcnt,
903                          recptr.xlogid, recptr.xrecoff,
904                          CurrRunningXacts->oldestRunningXid,
905                          CurrRunningXacts->latestCompletedXid,
906                          CurrRunningXacts->nextXid);
907 }
908
909 /*
910  * Wholesale logging of AccessExclusiveLocks. Other lock types need not be
911  * logged, as described in backend/storage/lmgr/README.
912  */
913 static void
914 LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks)
915 {
916         XLogRecData rdata[2];
917         xl_standby_locks xlrec;
918
919         xlrec.nlocks = nlocks;
920
921         rdata[0].data = (char *) &xlrec;
922         rdata[0].len = offsetof(xl_standby_locks, locks);
923         rdata[0].buffer = InvalidBuffer;
924         rdata[0].next = &rdata[1];
925
926         rdata[1].data = (char *) locks;
927         rdata[1].len = nlocks * sizeof(xl_standby_lock);
928         rdata[1].buffer = InvalidBuffer;
929         rdata[1].next = NULL;
930
931         (void) XLogInsert(RM_STANDBY_ID, XLOG_STANDBY_LOCK, rdata);
932 }
933
934 /*
935  * Individual logging of AccessExclusiveLocks for use during LockAcquire()
936  */
937 void
938 LogAccessExclusiveLock(Oid dbOid, Oid relOid)
939 {
940         xl_standby_lock xlrec;
941
942         xlrec.xid = GetTopTransactionId();
943
944         /*
945          * Decode the locktag back to the original values, to avoid sending lots
946          * of empty bytes with every message.  See lock.h to check how a locktag
947          * is defined for LOCKTAG_RELATION
948          */
949         xlrec.dbOid = dbOid;
950         xlrec.relOid = relOid;
951
952         LogAccessExclusiveLocks(1, &xlrec);
953 }
954
955 /*
956  * Prepare to log an AccessExclusiveLock, for use during LockAcquire()
957  */
958 void
959 LogAccessExclusiveLockPrepare(void)
960 {
961         /*
962          * Ensure that a TransactionId has been assigned to this transaction, for
963          * two reasons, both related to lock release on the standby. First, we
964          * must assign an xid so that RecordTransactionCommit() and
965          * RecordTransactionAbort() do not optimise away the transaction
966          * completion record which recovery relies upon to release locks. It's a
967          * hack, but for a corner case not worth adding code for into the main
968          * commit path. Second, must must assign an xid before the lock is
969          * recorded in shared memory, otherwise a concurrently executing
970          * GetRunningTransactionLocks() might see a lock associated with an
971          * InvalidTransactionId which we later assert cannot happen.
972          */
973         (void) GetTopTransactionId();
974 }