OSDN Git Service

Move session_start out of MyProcPort stucture and make it a global called MyStartTime,
[pg-rex/syncrep.git] / src / backend / postmaster / autovacuum.c
1 /*-------------------------------------------------------------------------
2  *
3  * autovacuum.c
4  *
5  * PostgreSQL Integrated Autovacuum Daemon
6  *
7  * The autovacuum system is structured in two different kinds of processes: the
8  * autovacuum launcher and the autovacuum worker.  The launcher is an
9  * always-running process, started by the postmaster when the autovacuum GUC
10  * parameter is set.  The launcher schedules autovacuum workers to be started
11  * when appropriate.  The workers are the processes which execute the actual
12  * vacuuming; they connect to a database as determined in the launcher, and
13  * once connected they examine the catalogs to select the tables to vacuum.
14  *
15  * The autovacuum launcher cannot start the worker processes by itself,
16  * because doing so would cause robustness issues (namely, failure to shut
17  * them down on exceptional conditions, and also, since the launcher is
18  * connected to shared memory and is thus subject to corruption there, it is
19  * not as robust as the postmaster).  So it leaves that task to the postmaster.
20  *
21  * There is an autovacuum shared memory area, where the launcher stores
22  * information about the database it wants vacuumed.  When it wants a new
23  * worker to start, it sets a flag in shared memory and sends a signal to the
24  * postmaster.  Then postmaster knows nothing more than it must start a worker;
25  * so it forks a new child, which turns into a worker.  This new process
26  * connects to shared memory, and there it can inspect the information that the
27  * launcher has set up.
28  *
29  * If the fork() call fails in the postmaster, it sets a flag in the shared
30  * memory area, and sends a signal to the launcher.  The launcher, upon
31  * noticing the flag, can try starting the worker again by resending the
32  * signal.  Note that the failure can only be transient (fork failure due to
33  * high load, memory pressure, too many processes, etc); more permanent
34  * problems, like failure to connect to a database, are detected later in the
35  * worker and dealt with just by having the worker exit normally.  The launcher
36  * will launch a new worker again later, per schedule.
37  *
38  * When the worker is done vacuuming it sends SIGUSR1 to the launcher.  The
39  * launcher then wakes up and is able to launch another worker, if the schedule
40  * is so tight that a new worker is needed immediately.  At this time the
41  * launcher can also balance the settings for the various remaining workers'
42  * cost-based vacuum delay feature.
43  *
44  * Note that there can be more than one worker in a database concurrently.
45  * They will store the table they are currently vacuuming in shared memory, so
46  * that other workers avoid being blocked waiting for the vacuum lock for that
47  * table.  They will also reload the pgstats data just before vacuuming each
48  * table, to avoid vacuuming a table that was just finished being vacuumed by
49  * another worker and thus is no longer noted in shared memory.  However,
50  * there is a window (caused by pgstat delay) on which a worker may choose a
51  * table that was already vacuumed; this is a bug in the current design.
52  *
53  * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
54  * Portions Copyright (c) 1994, Regents of the University of California
55  *
56  *
57  * IDENTIFICATION
58  *        $PostgreSQL: pgsql/src/backend/postmaster/autovacuum.c,v 1.56 2007/08/02 23:39:44 adunstan Exp $
59  *
60  *-------------------------------------------------------------------------
61  */
62 #include "postgres.h"
63
64 #include <signal.h>
65 #include <sys/types.h>
66 #include <sys/time.h>
67 #include <time.h>
68 #include <unistd.h>
69
70 #include "access/genam.h"
71 #include "access/heapam.h"
72 #include "access/transam.h"
73 #include "access/xact.h"
74 #include "catalog/indexing.h"
75 #include "catalog/namespace.h"
76 #include "catalog/pg_autovacuum.h"
77 #include "catalog/pg_database.h"
78 #include "commands/dbcommands.h"
79 #include "commands/vacuum.h"
80 #include "libpq/hba.h"
81 #include "libpq/pqsignal.h"
82 #include "miscadmin.h"
83 #include "pgstat.h"
84 #include "postmaster/autovacuum.h"
85 #include "postmaster/fork_process.h"
86 #include "postmaster/postmaster.h"
87 #include "storage/fd.h"
88 #include "storage/ipc.h"
89 #include "storage/pmsignal.h"
90 #include "storage/proc.h"
91 #include "storage/procarray.h"
92 #include "storage/sinval.h"
93 #include "tcop/tcopprot.h"
94 #include "utils/flatfiles.h"
95 #include "utils/fmgroids.h"
96 #include "utils/lsyscache.h"
97 #include "utils/memutils.h"
98 #include "utils/ps_status.h"
99 #include "utils/syscache.h"
100
101
102 /*
103  * GUC parameters
104  */
105 bool            autovacuum_start_daemon = false;
106 int                     autovacuum_max_workers;
107 int                     autovacuum_naptime;
108 int                     autovacuum_vac_thresh;
109 double          autovacuum_vac_scale;
110 int                     autovacuum_anl_thresh;
111 double          autovacuum_anl_scale;
112 int                     autovacuum_freeze_max_age;
113
114 int                     autovacuum_vac_cost_delay;
115 int                     autovacuum_vac_cost_limit;
116
117 int                     Log_autovacuum = -1;
118
119
120 /* Flags to tell if we are in an autovacuum process */
121 static bool am_autovacuum_launcher = false;
122 static bool am_autovacuum_worker = false;
123
124 /* Flags set by signal handlers */
125 static volatile sig_atomic_t got_SIGHUP = false;
126 static volatile sig_atomic_t got_SIGUSR1 = false;
127 static volatile sig_atomic_t got_SIGTERM = false;
128
129 /* Comparison point for determining whether freeze_max_age is exceeded */
130 static TransactionId recentXid;
131
132 /* Default freeze_min_age to use for autovacuum (varies by database) */
133 static int      default_freeze_min_age;
134
135 /* Memory context for long-lived data */
136 static MemoryContext AutovacMemCxt;
137
138 /* struct to keep track of databases in launcher */
139 typedef struct avl_dbase
140 {
141         Oid                     adl_datid;                      /* hash key -- must be first */
142         TimestampTz     adl_next_worker;
143         int                     adl_score;
144 } avl_dbase;
145
146 /* struct to keep track of databases in worker */
147 typedef struct avw_dbase
148 {
149         Oid                     adw_datid;
150         char       *adw_name;
151         TransactionId adw_frozenxid;
152         PgStat_StatDBEntry *adw_entry;
153 } avw_dbase;
154
155 /* struct to keep track of tables to vacuum and/or analyze, in 1st pass */
156 typedef struct av_relation
157 {
158         Oid             ar_relid;
159         Oid             ar_toastrelid;
160 } av_relation;
161
162 /* struct to keep track of tables to vacuum and/or analyze, after rechecking */
163 typedef struct autovac_table
164 {
165         Oid                     at_relid;
166         Oid                     at_toastrelid;
167         bool            at_dovacuum;
168         bool            at_doanalyze;
169         int                     at_freeze_min_age;
170         int                     at_vacuum_cost_delay;
171         int                     at_vacuum_cost_limit;
172 } autovac_table;
173
174 /*-------------
175  * This struct holds information about a single worker's whereabouts.  We keep
176  * an array of these in shared memory, sized according to
177  * autovacuum_max_workers.
178  *
179  * wi_links             entry into free list or running list
180  * wi_dboid             OID of the database this worker is supposed to work on
181  * wi_tableoid  OID of the table currently being vacuumed
182  * wi_workerpid PID of the running worker, 0 if not yet started
183  * wi_launchtime Time at which this worker was launched
184  * wi_cost_*    Vacuum cost-based delay parameters current in this worker
185  *
186  * All fields are protected by AutovacuumLock, except for wi_tableoid which is
187  * protected by AutovacuumScheduleLock (which is read-only for everyone except
188  * that worker itself).
189  *-------------
190  */
191 typedef struct WorkerInfoData
192 {
193         SHM_QUEUE       wi_links;
194         Oid                     wi_dboid;
195         Oid                     wi_tableoid;
196         int                     wi_workerpid;
197         TimestampTz     wi_launchtime;
198         int                     wi_cost_delay;
199         int                     wi_cost_limit;
200         int                     wi_cost_limit_base;
201 } WorkerInfoData;
202
203 typedef struct WorkerInfoData *WorkerInfo;
204
205 /*
206  * Possible signals received by the launcher from remote processes.  These are
207  * stored atomically in shared memory so that other processes can set them
208  * without locking.
209  */
210 typedef enum 
211 {
212         AutoVacForkFailed,      /* failed trying to start a worker */
213         AutoVacRebalance,       /* rebalance the cost limits */
214         AutoVacNumSignals = AutoVacRebalance    /* must be last */
215 } AutoVacuumSignal;
216
217 /*-------------
218  * The main autovacuum shmem struct.  On shared memory we store this main
219  * struct and the array of WorkerInfo structs.  This struct keeps:
220  *
221  * av_signal            set by other processes to indicate various conditions
222  * av_launcherpid       the PID of the autovacuum launcher
223  * av_freeWorkers       the WorkerInfo freelist
224  * av_runningWorkers the WorkerInfo non-free queue
225  * av_startingWorker pointer to WorkerInfo currently being started (cleared by
226  *                                      the worker itself as soon as it's up and running)
227  *
228  * This struct is protected by AutovacuumLock, except for av_signal and parts
229  * of the worker list (see above).
230  *-------------
231  */
232 typedef struct
233 {
234         sig_atomic_t    av_signal[AutoVacNumSignals];
235         pid_t                   av_launcherpid;
236         SHMEM_OFFSET    av_freeWorkers;
237         SHM_QUEUE               av_runningWorkers;
238         SHMEM_OFFSET    av_startingWorker;
239 } AutoVacuumShmemStruct;
240
241 static AutoVacuumShmemStruct *AutoVacuumShmem;
242
243 /* the database list in the launcher, and the context that contains it */
244 static Dllist *DatabaseList = NULL;
245 static MemoryContext DatabaseListCxt = NULL;
246
247 /* Pointer to my own WorkerInfo, valid on each worker */
248 static WorkerInfo       MyWorkerInfo = NULL;
249
250 /* PID of launcher, valid only in worker while shutting down */
251 int     AutovacuumLauncherPid = 0;
252
253 #ifdef EXEC_BACKEND
254 static pid_t avlauncher_forkexec(void);
255 static pid_t avworker_forkexec(void);
256 #endif
257 NON_EXEC_STATIC void AutoVacWorkerMain(int argc, char *argv[]);
258 NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]);
259
260 static Oid do_start_worker(void);
261 static void launcher_determine_sleep(bool canlaunch, bool recursing,
262                                                  struct timeval *nap);
263 static void launch_worker(TimestampTz now);
264 static List *get_database_list(void);
265 static void rebuild_database_list(Oid newdb);
266 static int db_comparator(const void *a, const void *b);
267 static void autovac_balance_cost(void);
268
269 static void do_autovacuum(void);
270 static void FreeWorkerInfo(int code, Datum arg);
271
272 static void relation_check_autovac(Oid relid, Form_pg_class classForm,
273                                            Form_pg_autovacuum avForm, PgStat_StatTabEntry *tabentry,
274                                            List **table_oids, List **table_toast_list,
275                                            List **toast_oids);
276 static autovac_table *table_recheck_autovac(Oid relid);
277 static void relation_needs_vacanalyze(Oid relid, Form_pg_autovacuum avForm,
278                                                   Form_pg_class classForm,
279                                                   PgStat_StatTabEntry *tabentry, bool *dovacuum,
280                                                   bool *doanalyze);
281
282 static void autovacuum_do_vac_analyze(Oid relid, bool dovacuum,
283                                                   bool doanalyze, int freeze_min_age,
284                                                   BufferAccessStrategy bstrategy);
285 static HeapTuple get_pg_autovacuum_tuple_relid(Relation avRel, Oid relid);
286 static PgStat_StatTabEntry *get_pgstat_tabentry_relid(Oid relid, bool isshared,
287                                                   PgStat_StatDBEntry *shared,
288                                                   PgStat_StatDBEntry *dbentry);
289 static void autovac_report_activity(VacuumStmt *vacstmt, Oid relid);
290 static void avl_sighup_handler(SIGNAL_ARGS);
291 static void avl_sigusr1_handler(SIGNAL_ARGS);
292 static void avl_sigterm_handler(SIGNAL_ARGS);
293 static void avl_quickdie(SIGNAL_ARGS);
294
295
296
297 /********************************************************************
298  *                    AUTOVACUUM LAUNCHER CODE
299  ********************************************************************/
300
301 #ifdef EXEC_BACKEND
302 /*
303  * forkexec routine for the autovacuum launcher process.
304  *
305  * Format up the arglist, then fork and exec.
306  */
307 static pid_t
308 avlauncher_forkexec(void)
309 {
310         char       *av[10];
311         int                     ac = 0;
312
313         av[ac++] = "postgres";
314         av[ac++] = "--forkavlauncher";
315         av[ac++] = NULL;                        /* filled in by postmaster_forkexec */
316         av[ac] = NULL;
317
318         Assert(ac < lengthof(av));
319
320         return postmaster_forkexec(ac, av);
321 }
322
323 /*
324  * We need this set from the outside, before InitProcess is called
325  */
326 void
327 AutovacuumLauncherIAm(void)
328 {
329         am_autovacuum_launcher = true;
330 }
331 #endif
332
333 /*
334  * Main entry point for autovacuum launcher process, to be called from the
335  * postmaster.
336  */
337 int
338 StartAutoVacLauncher(void)
339 {
340         pid_t           AutoVacPID;
341
342 #ifdef EXEC_BACKEND
343         switch ((AutoVacPID = avlauncher_forkexec()))
344 #else
345         switch ((AutoVacPID = fork_process()))
346 #endif
347         {
348                 case -1:
349                         ereport(LOG,
350                                         (errmsg("could not fork autovacuum process: %m")));
351                         return 0;
352
353 #ifndef EXEC_BACKEND
354                 case 0:
355                         /* in postmaster child ... */
356                         /* Close the postmaster's sockets */
357                         ClosePostmasterPorts(false);
358
359                         /* Lose the postmaster's on-exit routines */
360                         on_exit_reset();
361
362                         AutoVacLauncherMain(0, NULL);
363                         break;
364 #endif
365                 default:
366                         return (int) AutoVacPID;
367         }
368
369         /* shouldn't get here */
370         return 0;
371 }
372
373 /*
374  * Main loop for the autovacuum launcher process.
375  */
376 NON_EXEC_STATIC void
377 AutoVacLauncherMain(int argc, char *argv[])
378 {
379         sigjmp_buf      local_sigjmp_buf;
380
381         /* we are a postmaster subprocess now */
382         IsUnderPostmaster = true;
383         am_autovacuum_launcher = true;
384
385         /* reset MyProcPid */
386         MyProcPid = getpid();
387
388         /* record Start Time for logging */
389         MyStartTime = time(NULL);
390
391         /* Identify myself via ps */
392         init_ps_display("autovacuum launcher process", "", "", "");
393
394         SetProcessingMode(InitProcessing);
395
396         /*
397          * If possible, make this process a group leader, so that the postmaster
398          * can signal any child processes too.  (autovacuum probably never has
399          * any child processes, but for consistency we make all postmaster
400          * child processes do this.)
401          */
402 #ifdef HAVE_SETSID
403         if (setsid() < 0)
404                 elog(FATAL, "setsid() failed: %m");
405 #endif
406
407         /*
408          * Set up signal handlers.      Since this is an auxiliary process, it has
409          * particular signal requirements -- no deadlock checker or sinval
410          * catchup, for example.
411          */
412         pqsignal(SIGHUP, avl_sighup_handler);
413
414         pqsignal(SIGINT, SIG_IGN);
415         pqsignal(SIGTERM, avl_sigterm_handler);
416         pqsignal(SIGQUIT, avl_quickdie);
417         pqsignal(SIGALRM, SIG_IGN);
418
419         pqsignal(SIGPIPE, SIG_IGN);
420         pqsignal(SIGUSR1, avl_sigusr1_handler);
421         /* We don't listen for async notifies */
422         pqsignal(SIGUSR2, SIG_IGN);
423         pqsignal(SIGFPE, FloatExceptionHandler);
424         pqsignal(SIGCHLD, SIG_DFL);
425
426         /* Early initialization */
427         BaseInit();
428
429         /*
430          * Create a per-backend PGPROC struct in shared memory, except in the
431          * EXEC_BACKEND case where this was done in SubPostmasterMain. We must do
432          * this before we can use LWLocks (and in the EXEC_BACKEND case we already
433          * had to do some stuff with LWLocks).
434          */
435 #ifndef EXEC_BACKEND
436         InitAuxiliaryProcess();
437 #endif
438
439         /*
440          * Create a memory context that we will do all our work in.  We do this so
441          * that we can reset the context during error recovery and thereby avoid
442          * possible memory leaks.
443          */
444         AutovacMemCxt = AllocSetContextCreate(TopMemoryContext,
445                                                                                   "Autovacuum Launcher",
446                                                                                   ALLOCSET_DEFAULT_MINSIZE,
447                                                                                   ALLOCSET_DEFAULT_INITSIZE,
448                                                                                   ALLOCSET_DEFAULT_MAXSIZE);
449         MemoryContextSwitchTo(AutovacMemCxt);
450
451
452         /*
453          * If an exception is encountered, processing resumes here.
454          *
455          * This code is heavily based on bgwriter.c, q.v.
456          */
457         if (sigsetjmp(local_sigjmp_buf, 1) != 0)
458         {
459                 /* since not using PG_TRY, must reset error stack by hand */
460                 error_context_stack = NULL;
461
462                 /* Prevents interrupts while cleaning up */
463                 HOLD_INTERRUPTS();
464
465                 /* Report the error to the server log */
466                 EmitErrorReport();
467
468                 /*
469                  * These operations are really just a minimal subset of
470                  * AbortTransaction().  We don't have very many resources to worry
471                  * about, but we do have LWLocks.
472                  */
473                 LWLockReleaseAll();
474                 AtEOXact_Files();
475
476                 /*
477                  * Now return to normal top-level context and clear ErrorContext for
478                  * next time.
479                  */
480                 MemoryContextSwitchTo(AutovacMemCxt);
481                 FlushErrorState();
482
483                 /* Flush any leaked data in the top-level context */
484                 MemoryContextResetAndDeleteChildren(AutovacMemCxt);
485
486                 /* don't leave dangling pointers to freed memory */
487                 DatabaseListCxt = NULL;
488                 DatabaseList = NULL;
489
490                 /* Make sure pgstat also considers our stat data as gone */
491                 pgstat_clear_snapshot();
492
493                 /* Now we can allow interrupts again */
494                 RESUME_INTERRUPTS();
495
496                 /*
497                  * Sleep at least 1 second after any error.  We don't want to be
498                  * filling the error logs as fast as we can.
499                  */
500                 pg_usleep(1000000L);
501         }
502
503         /* We can now handle ereport(ERROR) */
504         PG_exception_stack = &local_sigjmp_buf;
505
506         ereport(LOG,
507                         (errmsg("autovacuum launcher started")));
508
509         /* must unblock signals before calling rebuild_database_list */
510         PG_SETMASK(&UnBlockSig);
511
512         /* in emergency mode, just start a worker and go away */
513         if (!autovacuum_start_daemon)
514         {
515                 do_start_worker();
516                 proc_exit(0);           /* done */
517         }
518
519         AutoVacuumShmem->av_launcherpid = MyProcPid;
520
521         /*
522          * Create the initial database list.  The invariant we want this list to
523          * keep is that it's ordered by decreasing next_time.  As soon as an entry
524          * is updated to a higher time, it will be moved to the front (which is
525          * correct because the only operation is to add autovacuum_naptime to the
526          * entry, and time always increases).
527          */
528         rebuild_database_list(InvalidOid);
529
530         for (;;)
531         {
532                 struct timeval nap;
533                 TimestampTz current_time = 0;
534                 bool    can_launch;
535                 Dlelem *elem;
536
537                 /*
538                  * Emergency bailout if postmaster has died.  This is to avoid the
539                  * necessity for manual cleanup of all postmaster children.
540                  */
541                 if (!PostmasterIsAlive(true))
542                         exit(1);
543
544                 launcher_determine_sleep(AutoVacuumShmem->av_freeWorkers !=
545                                                                  INVALID_OFFSET, false, &nap);
546
547                 /*
548                  * Sleep for a while according to schedule.
549                  *
550                  * On some platforms, signals won't interrupt the sleep.  To ensure we
551                  * respond reasonably promptly when someone signals us, break down the
552                  * sleep into 1-second increments, and check for interrupts after each
553                  * nap.
554                  */
555                 while (nap.tv_sec > 0 || nap.tv_usec > 0)
556                 {
557                         uint32  sleeptime;
558
559                         if (nap.tv_sec > 0)
560                         {
561                                 sleeptime = 1000000;
562                                 nap.tv_sec--;
563                         }
564                         else
565                         {
566                                 sleeptime = nap.tv_usec;
567                                 nap.tv_usec = 0;
568                         }
569                         pg_usleep(sleeptime);
570
571                         /*
572                          * Emergency bailout if postmaster has died.  This is to avoid the
573                          * necessity for manual cleanup of all postmaster children.
574                          */
575                         if (!PostmasterIsAlive(true))
576                                 exit(1);
577
578                         if (got_SIGTERM || got_SIGHUP || got_SIGUSR1)
579                                 break;
580                 }
581
582                 /* the normal shutdown case */
583                 if (got_SIGTERM)
584                         break;
585
586                 if (got_SIGHUP)
587                 {
588                         got_SIGHUP = false;
589                         ProcessConfigFile(PGC_SIGHUP);
590
591                         /* shutdown requested in config file */
592                         if (!autovacuum_start_daemon)
593                                 break;
594
595                         /* rebalance in case the default cost parameters changed */
596                         LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
597                         autovac_balance_cost();
598                         LWLockRelease(AutovacuumLock);
599
600                         /* rebuild the list in case the naptime changed */
601                         rebuild_database_list(InvalidOid);
602                 }
603
604                 /*
605                  * a worker finished, or postmaster signalled failure to start a
606                  * worker
607                  */
608                 if (got_SIGUSR1)
609                 {
610                         got_SIGUSR1 = false;
611
612                         /* rebalance cost limits, if needed */
613                         if (AutoVacuumShmem->av_signal[AutoVacRebalance])
614                         {
615                                 LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
616                                 AutoVacuumShmem->av_signal[AutoVacRebalance] = false;
617                                 autovac_balance_cost();
618                                 LWLockRelease(AutovacuumLock);
619                         }
620
621                         if (AutoVacuumShmem->av_signal[AutoVacForkFailed])
622                         {
623                                 /*
624                                  * If the postmaster failed to start a new worker, we sleep
625                                  * for a little while and resend the signal.  The new worker's
626                                  * state is still in memory, so this is sufficient.  After
627                                  * that, we restart the main loop.
628                                  *
629                                  * XXX should we put a limit to the number of times we retry?
630                                  * I don't think it makes much sense, because a future start
631                                  * of a worker will continue to fail in the same way.
632                                  */
633                                 AutoVacuumShmem->av_signal[AutoVacForkFailed] = false;
634                                 pg_usleep(100000L);     /* 100ms */
635                                 SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER);
636                                 continue;
637                         }
638                 }
639
640                 /*
641                  * There are some conditions that we need to check before trying to
642                  * start a launcher.  First, we need to make sure that there is a
643                  * launcher slot available.  Second, we need to make sure that no other
644                  * worker failed while starting up.
645                  */
646
647                 current_time = GetCurrentTimestamp();
648                 LWLockAcquire(AutovacuumLock, LW_SHARED);
649
650                 can_launch = (AutoVacuumShmem->av_freeWorkers != INVALID_OFFSET);
651
652                 if (AutoVacuumShmem->av_startingWorker != INVALID_OFFSET)
653                 {
654                         int             waittime;
655
656                         WorkerInfo worker = (WorkerInfo) MAKE_PTR(AutoVacuumShmem->av_startingWorker);
657
658                         /*
659                          * We can't launch another worker when another one is still
660                          * starting up (or failed while doing so), so just sleep for a bit
661                          * more; that worker will wake us up again as soon as it's ready.
662                          * We will only wait autovacuum_naptime seconds (up to a maximum of
663                          * 60 seconds) for this to happen however.  Note that failure to
664                          * connect to a particular database is not a problem here, because
665                          * the worker removes itself from the startingWorker pointer before
666                          * trying to connect.  Problems detected by the postmaster (like
667                          * fork() failure) are also reported and handled differently.  The
668                          * only problems that may cause this code to fire are errors in the
669                          * earlier sections of AutoVacWorkerMain, before the worker removes
670                          * the WorkerInfo from the startingWorker pointer.
671                          */
672                         waittime = Min(autovacuum_naptime, 60) * 1000;
673                         if (TimestampDifferenceExceeds(worker->wi_launchtime, current_time,
674                                                                                    waittime))
675                         {
676                                 LWLockRelease(AutovacuumLock);
677                                 LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
678                                 /*
679                                  * No other process can put a worker in starting mode, so if
680                                  * startingWorker is still INVALID after exchanging our lock,
681                                  * we assume it's the same one we saw above (so we don't
682                                  * recheck the launch time).
683                                  */
684                                 if (AutoVacuumShmem->av_startingWorker != INVALID_OFFSET)
685                                 {
686                                         worker = (WorkerInfo) MAKE_PTR(AutoVacuumShmem->av_startingWorker);
687                                         worker->wi_dboid = InvalidOid;
688                                         worker->wi_tableoid = InvalidOid;
689                                         worker->wi_workerpid = 0;
690                                         worker->wi_launchtime = 0;
691                                         worker->wi_links.next = AutoVacuumShmem->av_freeWorkers;
692                                         AutoVacuumShmem->av_freeWorkers = MAKE_OFFSET(worker);
693                                         AutoVacuumShmem->av_startingWorker = INVALID_OFFSET;
694                                         elog(WARNING, "worker took too long to start; cancelled");
695                                 }
696                         }
697                         else
698                                 can_launch = false;
699                 }
700                 LWLockRelease(AutovacuumLock);          /* either shared or exclusive */
701
702                 /* if we can't do anything, just go back to sleep */
703                 if (!can_launch)
704                         continue;
705
706                 /* We're OK to start a new worker */
707
708                 elem = DLGetTail(DatabaseList);
709                 if (elem != NULL)
710                 {
711                         avl_dbase *avdb = DLE_VAL(elem);
712
713                         /*
714                          * launch a worker if next_worker is right now or it is in the past
715                          */
716                         if (TimestampDifferenceExceeds(avdb->adl_next_worker,
717                                                                                    current_time, 0))
718                                 launch_worker(current_time);
719                 }
720                 else
721                 {
722                         /*
723                          * Special case when the list is empty: start a worker right away.
724                          * This covers the initial case, when no database is in pgstats
725                          * (thus the list is empty).  Note that the constraints in
726                          * launcher_determine_sleep keep us from starting workers too
727                          * quickly (at most once every autovacuum_naptime when the list is
728                          * empty).
729                          */
730                         launch_worker(current_time);
731                 }
732         }
733
734         /* Normal exit from the autovac launcher is here */
735         ereport(LOG,
736                         (errmsg("autovacuum launcher shutting down")));
737         AutoVacuumShmem->av_launcherpid = 0;
738
739         proc_exit(0);           /* done */
740 }
741
742 /*
743  * Determine the time to sleep, based on the database list.
744  *
745  * The "canlaunch" parameter indicates whether we can start a worker right now,
746  * for example due to the workers being all busy.  If this is false, we will
747  * cause a long sleep, which will be interrupted when a worker exits.
748  */
749 static void
750 launcher_determine_sleep(bool canlaunch, bool recursing, struct timeval *nap)
751 {
752         Dlelem *elem;
753
754         /*
755          * We sleep until the next scheduled vacuum.  We trust that when the
756          * database list was built, care was taken so that no entries have times in
757          * the past; if the first entry has too close a next_worker value, or a
758          * time in the past, we will sleep a small nominal time.
759          */
760         if (!canlaunch)
761         {
762                 nap->tv_sec = autovacuum_naptime;
763                 nap->tv_usec = 0;
764         }
765         else if ((elem = DLGetTail(DatabaseList)) != NULL)
766         {
767                 avl_dbase  *avdb = DLE_VAL(elem);
768                 TimestampTz     current_time = GetCurrentTimestamp();
769                 TimestampTz     next_wakeup;
770                 long    secs;
771                 int             usecs;
772
773                 next_wakeup = avdb->adl_next_worker;
774                 TimestampDifference(current_time, next_wakeup, &secs, &usecs);
775
776                 nap->tv_sec = secs;
777                 nap->tv_usec = usecs;
778         }
779         else
780         {
781                 /* list is empty, sleep for whole autovacuum_naptime seconds  */
782                 nap->tv_sec = autovacuum_naptime;
783                 nap->tv_usec = 0;
784         }
785
786         /*
787          * If the result is exactly zero, it means a database had an entry with
788          * time in the past.  Rebuild the list so that the databases are evenly
789          * distributed again, and recalculate the time to sleep.  This can happen
790          * if there are more tables needing vacuum than workers, and they all take
791          * longer to vacuum than autovacuum_naptime.
792          *
793          * We only recurse once.  rebuild_database_list should always return times
794          * in the future, but it seems best not to trust too much on that.
795          */
796         if (nap->tv_sec == 0 && nap->tv_usec == 0 && !recursing)
797         {
798                 rebuild_database_list(InvalidOid);
799                 launcher_determine_sleep(canlaunch, true, nap);
800                 return;
801         }
802
803         /* 100ms is the smallest time we'll allow the launcher to sleep */
804         if (nap->tv_sec <= 0 && nap->tv_usec <= 100000)
805         {
806                 nap->tv_sec = 0;
807                 nap->tv_usec = 100000;  /* 100 ms */
808         }
809 }
810
811 /*
812  * Build an updated DatabaseList.  It must only contain databases that appear
813  * in pgstats, and must be sorted by next_worker from highest to lowest,
814  * distributed regularly across the next autovacuum_naptime interval.
815  *
816  * Receives the Oid of the database that made this list be generated (we call
817  * this the "new" database, because when the database was already present on
818  * the list, we expect that this function is not called at all).  The
819  * preexisting list, if any, will be used to preserve the order of the
820  * databases in the autovacuum_naptime period.  The new database is put at the
821  * end of the interval.  The actual values are not saved, which should not be
822  * much of a problem.
823  */
824 static void
825 rebuild_database_list(Oid newdb)
826 {
827         List       *dblist;
828         ListCell   *cell;
829         MemoryContext newcxt;
830         MemoryContext oldcxt;
831         MemoryContext tmpcxt;
832         HASHCTL         hctl;
833         int                     score;
834         int                     nelems;
835         HTAB       *dbhash;
836
837         /* use fresh stats */
838         pgstat_clear_snapshot();
839
840         newcxt = AllocSetContextCreate(AutovacMemCxt,
841                                                                    "AV dblist",
842                                                                    ALLOCSET_DEFAULT_MINSIZE,
843                                                                    ALLOCSET_DEFAULT_INITSIZE,
844                                                                    ALLOCSET_DEFAULT_MAXSIZE);
845         tmpcxt = AllocSetContextCreate(newcxt,
846                                                                    "tmp AV dblist",
847                                                                    ALLOCSET_DEFAULT_MINSIZE,
848                                                                    ALLOCSET_DEFAULT_INITSIZE,
849                                                                    ALLOCSET_DEFAULT_MAXSIZE);
850         oldcxt = MemoryContextSwitchTo(tmpcxt);
851
852         /*
853          * Implementing this is not as simple as it sounds, because we need to put
854          * the new database at the end of the list; next the databases that were
855          * already on the list, and finally (at the tail of the list) all the other
856          * databases that are not on the existing list.
857          *
858          * To do this, we build an empty hash table of scored databases.  We will
859          * start with the lowest score (zero) for the new database, then increasing
860          * scores for the databases in the existing list, in order, and lastly
861          * increasing scores for all databases gotten via get_database_list() that
862          * are not already on the hash.
863          *
864          * Then we will put all the hash elements into an array, sort the array by
865          * score, and finally put the array elements into the new doubly linked
866          * list.
867          */
868         hctl.keysize = sizeof(Oid);
869         hctl.entrysize = sizeof(avl_dbase);
870         hctl.hash = oid_hash;
871         hctl.hcxt = tmpcxt;
872         dbhash = hash_create("db hash", 20, &hctl,      /* magic number here FIXME */
873                                                  HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
874
875         /* start by inserting the new database */
876         score = 0;
877         if (OidIsValid(newdb))
878         {
879                 avl_dbase       *db;
880                 PgStat_StatDBEntry *entry;
881
882                 /* only consider this database if it has a pgstat entry */
883                 entry = pgstat_fetch_stat_dbentry(newdb);
884                 if (entry != NULL)
885                 {
886                         /* we assume it isn't found because the hash was just created */
887                         db = hash_search(dbhash, &newdb, HASH_ENTER, NULL);
888
889                         /* hash_search already filled in the key */
890                         db->adl_score = score++;
891                         /* next_worker is filled in later */
892                 }
893         }
894
895         /* Now insert the databases from the existing list */
896         if (DatabaseList != NULL)
897         {
898                 Dlelem  *elem;
899
900                 elem = DLGetHead(DatabaseList);
901                 while (elem != NULL)
902                 {
903                         avl_dbase  *avdb = DLE_VAL(elem);
904                         avl_dbase  *db;
905                         bool            found;
906                         PgStat_StatDBEntry *entry;
907
908                         elem = DLGetSucc(elem);
909
910                         /*
911                          * skip databases with no stat entries -- in particular, this
912                          * gets rid of dropped databases
913                          */
914                         entry = pgstat_fetch_stat_dbentry(avdb->adl_datid);
915                         if (entry == NULL)
916                                 continue;
917
918                         db = hash_search(dbhash, &(avdb->adl_datid), HASH_ENTER, &found);
919
920                         if (!found)
921                         {
922                                 /* hash_search already filled in the key */
923                                 db->adl_score = score++;
924                                 /* next_worker is filled in later */
925                         }
926                 }
927         }
928
929         /* finally, insert all qualifying databases not previously inserted */
930         dblist = get_database_list();
931         foreach(cell, dblist)
932         {
933                 avw_dbase  *avdb = lfirst(cell);
934                 avl_dbase  *db;
935                 bool            found;
936                 PgStat_StatDBEntry *entry;
937
938                 /* only consider databases with a pgstat entry */
939                 entry = pgstat_fetch_stat_dbentry(avdb->adw_datid);
940                 if (entry == NULL)
941                         continue;
942
943                 db = hash_search(dbhash, &(avdb->adw_datid), HASH_ENTER, &found);
944                 /* only update the score if the database was not already on the hash */
945                 if (!found)
946                 {
947                         /* hash_search already filled in the key */
948                         db->adl_score = score++;
949                         /* next_worker is filled in later */
950                 }
951         }
952         nelems = score;
953
954         /* from here on, the allocated memory belongs to the new list */
955         MemoryContextSwitchTo(newcxt);
956         DatabaseList = DLNewList();
957
958         if (nelems > 0)
959         {
960                 TimestampTz             current_time;
961                 int                             millis_increment;
962                 avl_dbase          *dbary;
963                 avl_dbase          *db;
964                 HASH_SEQ_STATUS seq;
965                 int                             i;
966
967                 /* put all the hash elements into an array */
968                 dbary = palloc(nelems * sizeof(avl_dbase));
969
970                 i = 0;
971                 hash_seq_init(&seq, dbhash);
972                 while ((db = hash_seq_search(&seq)) != NULL)
973                         memcpy(&(dbary[i++]), db, sizeof(avl_dbase));
974
975                 /* sort the array */
976                 qsort(dbary, nelems, sizeof(avl_dbase), db_comparator);
977
978                 /* this is the time interval between databases in the schedule */
979                 millis_increment = 1000.0 * autovacuum_naptime / nelems;
980                 current_time = GetCurrentTimestamp();
981
982                 /*
983                  * move the elements from the array into the dllist, setting the 
984                  * next_worker while walking the array
985                  */
986                 for (i = 0; i < nelems; i++)
987                 {
988                         avl_dbase  *db = &(dbary[i]);
989                         Dlelem     *elem;
990
991                         current_time = TimestampTzPlusMilliseconds(current_time,
992                                                                                                            millis_increment);
993                         db->adl_next_worker = current_time;
994
995                         elem = DLNewElem(db);
996                         /* later elements should go closer to the head of the list */
997                         DLAddHead(DatabaseList, elem);
998                 }
999         }
1000
1001         /* all done, clean up memory */
1002         if (DatabaseListCxt != NULL)
1003                 MemoryContextDelete(DatabaseListCxt);
1004         MemoryContextDelete(tmpcxt);
1005         DatabaseListCxt = newcxt;
1006         MemoryContextSwitchTo(oldcxt);
1007 }
1008
1009 /* qsort comparator for avl_dbase, using adl_score */
1010 static int
1011 db_comparator(const void *a, const void *b)
1012 {
1013         if (((avl_dbase *) a)->adl_score == ((avl_dbase *) b)->adl_score)
1014                 return 0;
1015         else
1016                 return (((avl_dbase *) a)->adl_score < ((avl_dbase *) b)->adl_score) ? 1 : -1;
1017 }
1018
1019 /*
1020  * do_start_worker
1021  *
1022  * Bare-bones procedure for starting an autovacuum worker from the launcher.
1023  * It determines what database to work on, sets up shared memory stuff and
1024  * signals postmaster to start the worker.  It fails gracefully if invoked when
1025  * autovacuum_workers are already active.
1026  *
1027  * Return value is the OID of the database that the worker is going to process,
1028  * or InvalidOid if no worker was actually started.
1029  */
1030 static Oid
1031 do_start_worker(void)
1032 {
1033         List       *dblist;
1034         ListCell   *cell;
1035         TransactionId xidForceLimit;
1036         bool            for_xid_wrap;
1037         avw_dbase  *avdb;
1038         TimestampTz     current_time;
1039         bool            skipit = false;
1040
1041         /* return quickly when there are no free workers */
1042         LWLockAcquire(AutovacuumLock, LW_SHARED);
1043         if (AutoVacuumShmem->av_freeWorkers == INVALID_OFFSET)
1044         {
1045                 LWLockRelease(AutovacuumLock);
1046                 return InvalidOid;
1047         }
1048         LWLockRelease(AutovacuumLock);
1049
1050         /* use fresh stats */
1051         pgstat_clear_snapshot();
1052
1053         /* Get a list of databases */
1054         dblist = get_database_list();
1055
1056         /*
1057          * Determine the oldest datfrozenxid/relfrozenxid that we will allow
1058          * to pass without forcing a vacuum.  (This limit can be tightened for
1059          * particular tables, but not loosened.)
1060          */
1061         recentXid = ReadNewTransactionId();
1062         xidForceLimit = recentXid - autovacuum_freeze_max_age;
1063         /* ensure it's a "normal" XID, else TransactionIdPrecedes misbehaves */
1064         if (xidForceLimit < FirstNormalTransactionId)
1065                 xidForceLimit -= FirstNormalTransactionId;
1066
1067         /*
1068          * Choose a database to connect to.  We pick the database that was least
1069          * recently auto-vacuumed, or one that needs vacuuming to prevent Xid
1070          * wraparound-related data loss.  If any db at risk of wraparound is
1071          * found, we pick the one with oldest datfrozenxid, independently of
1072          * autovacuum times.
1073          *
1074          * Note that a database with no stats entry is not considered, except for
1075          * Xid wraparound purposes.  The theory is that if no one has ever
1076          * connected to it since the stats were last initialized, it doesn't need
1077          * vacuuming.
1078          *
1079          * XXX This could be improved if we had more info about whether it needs
1080          * vacuuming before connecting to it.  Perhaps look through the pgstats
1081          * data for the database's tables?  One idea is to keep track of the
1082          * number of new and dead tuples per database in pgstats.  However it
1083          * isn't clear how to construct a metric that measures that and not cause
1084          * starvation for less busy databases.
1085          */
1086         avdb = NULL;
1087         for_xid_wrap = false;
1088         current_time = GetCurrentTimestamp();
1089         foreach(cell, dblist)
1090         {
1091                 avw_dbase  *tmp = lfirst(cell);
1092                 Dlelem     *elem;
1093
1094                 /* Find pgstat entry if any */
1095                 tmp->adw_entry = pgstat_fetch_stat_dbentry(tmp->adw_datid);
1096
1097                 /* Check to see if this one is at risk of wraparound */
1098                 if (TransactionIdPrecedes(tmp->adw_frozenxid, xidForceLimit))
1099                 {
1100                         if (avdb == NULL ||
1101                                 TransactionIdPrecedes(tmp->adw_frozenxid, avdb->adw_frozenxid))
1102                                 avdb = tmp;
1103                         for_xid_wrap = true;
1104                         continue;
1105                 }
1106                 else if (for_xid_wrap)
1107                         continue;                       /* ignore not-at-risk DBs */
1108
1109                 /*
1110                  * Otherwise, skip a database with no pgstat entry; it means it
1111                  * hasn't seen any activity.
1112                  */
1113                 if (!tmp->adw_entry)
1114                         continue;
1115
1116                 /*
1117                  * Also, skip a database that appears on the database list as having
1118                  * been processed recently (less than autovacuum_naptime seconds ago).
1119                  * We do this so that we don't select a database which we just
1120                  * selected, but that pgstat hasn't gotten around to updating the last
1121                  * autovacuum time yet.
1122                  */
1123                 skipit = false;
1124                 elem = DatabaseList ? DLGetTail(DatabaseList) : NULL;
1125
1126                 while (elem != NULL)
1127                 {
1128                         avl_dbase *dbp = DLE_VAL(elem);
1129
1130                         if (dbp->adl_datid == tmp->adw_datid)
1131                         {
1132                                 /*
1133                                  * Skip this database if its next_worker value falls between
1134                                  * the current time and the current time plus naptime.
1135                                  */
1136                                 if (!TimestampDifferenceExceeds(dbp->adl_next_worker,
1137                                                                                            current_time, 0) &&
1138                                         !TimestampDifferenceExceeds(current_time,
1139                                                                                                 dbp->adl_next_worker,
1140                                                                                                 autovacuum_naptime * 1000))
1141                                         skipit = true;
1142
1143                                 break;
1144                         }
1145                         elem = DLGetPred(elem);
1146                 }
1147                 if (skipit)
1148                         continue;
1149
1150                 /*
1151                  * Remember the db with oldest autovac time.  (If we are here,
1152                  * both tmp->entry and db->entry must be non-null.)
1153                  */
1154                 if (avdb == NULL ||
1155                         tmp->adw_entry->last_autovac_time < avdb->adw_entry->last_autovac_time)
1156                         avdb = tmp;
1157         }
1158
1159         /* Found a database -- process it */
1160         if (avdb != NULL)
1161         {
1162                 WorkerInfo      worker;
1163                 SHMEM_OFFSET sworker;
1164
1165                 LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
1166
1167                 /*
1168                  * Get a worker entry from the freelist.  We checked above, so there
1169                  * really should be a free slot -- complain very loudly if there isn't.
1170                  */
1171                 sworker = AutoVacuumShmem->av_freeWorkers;
1172                 if (sworker == INVALID_OFFSET)
1173                         elog(FATAL, "no free worker found");
1174
1175                 worker = (WorkerInfo) MAKE_PTR(sworker);
1176                 AutoVacuumShmem->av_freeWorkers = worker->wi_links.next;
1177
1178                 worker->wi_dboid = avdb->adw_datid;
1179                 worker->wi_workerpid = 0;
1180                 worker->wi_launchtime = GetCurrentTimestamp();
1181
1182                 AutoVacuumShmem->av_startingWorker = sworker;
1183
1184                 LWLockRelease(AutovacuumLock);
1185
1186                 SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER);
1187
1188                 return avdb->adw_datid;
1189         }
1190         else if (skipit)
1191         {
1192                 /*
1193                  * If we skipped all databases on the list, rebuild it, because it
1194                  * probably contains a dropped database.
1195                  */
1196                 rebuild_database_list(InvalidOid);
1197         }
1198
1199         return InvalidOid;
1200 }
1201
1202 /*
1203  * launch_worker
1204  *
1205  * Wrapper for starting a worker from the launcher.  Besides actually starting
1206  * it, update the database list to reflect the next time that another one will
1207  * need to be started on the selected database.  The actual database choice is
1208  * left to do_start_worker.
1209  *
1210  * This routine is also expected to insert an entry into the database list if
1211  * the selected database was previously absent from the list.  It returns the
1212  * new database list.
1213  */
1214 static void
1215 launch_worker(TimestampTz now)
1216 {
1217         Oid             dbid;
1218         Dlelem *elem;
1219
1220         dbid = do_start_worker();
1221         if (OidIsValid(dbid))
1222         {
1223                 /*
1224                  * Walk the database list and update the corresponding entry.  If the
1225                  * database is not on the list, we'll recreate the list.
1226                  */
1227                 elem = (DatabaseList == NULL) ? NULL : DLGetHead(DatabaseList);
1228                 while (elem != NULL)
1229                 {
1230                         avl_dbase *avdb = DLE_VAL(elem);
1231
1232                         if (avdb->adl_datid == dbid)
1233                         {
1234                                 /*
1235                                  * add autovacuum_naptime seconds to the current time, and use
1236                                  * that as the new "next_worker" field for this database.
1237                                  */
1238                                 avdb->adl_next_worker =
1239                                         TimestampTzPlusMilliseconds(now, autovacuum_naptime * 1000);
1240
1241                                 DLMoveToFront(elem);
1242                                 break;
1243                         }
1244                         elem = DLGetSucc(elem);
1245                 }
1246
1247                 /*
1248                  * If the database was not present in the database list, we rebuild the
1249                  * list.  It's possible that the database does not get into the list
1250                  * anyway, for example if it's a database that doesn't have a pgstat
1251                  * entry, but this is not a problem because we don't want to schedule
1252                  * workers regularly into those in any case.
1253                  */
1254                 if (elem == NULL)
1255                         rebuild_database_list(dbid);
1256         }
1257 }
1258
1259 /*
1260  * Called from postmaster to signal a failure to fork a process to become
1261  * worker.  The postmaster should kill(SIGUSR1) the launcher shortly
1262  * after calling this function.
1263  */
1264 void
1265 AutoVacWorkerFailed(void)
1266 {
1267         AutoVacuumShmem->av_signal[AutoVacForkFailed] = true;
1268 }
1269
1270 /* SIGHUP: set flag to re-read config file at next convenient time */
1271 static void
1272 avl_sighup_handler(SIGNAL_ARGS)
1273 {
1274         got_SIGHUP = true;
1275 }
1276
1277 /* SIGUSR1: a worker is up and running, or just finished */
1278 static void
1279 avl_sigusr1_handler(SIGNAL_ARGS)
1280 {
1281         got_SIGUSR1 = true;
1282 }
1283
1284 /* SIGTERM: time to die */
1285 static void
1286 avl_sigterm_handler(SIGNAL_ARGS)
1287 {
1288         got_SIGTERM = true;
1289 }
1290
1291 /*
1292  * avl_quickdie occurs when signalled SIGQUIT from postmaster.
1293  *
1294  * Some backend has bought the farm, so we need to stop what we're doing
1295  * and exit.
1296  */
1297 static void
1298 avl_quickdie(SIGNAL_ARGS)
1299 {
1300         PG_SETMASK(&BlockSig);
1301
1302         /*
1303          * DO NOT proc_exit() -- we're here because shared memory may be
1304          * corrupted, so we don't want to try to clean up our transaction. Just
1305          * nail the windows shut and get out of town.
1306          *
1307          * Note we do exit(2) not exit(0).      This is to force the postmaster into a
1308          * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random
1309          * backend.  This is necessary precisely because we don't clean up our
1310          * shared memory state.
1311          */
1312         exit(2);
1313 }
1314
1315
1316 /********************************************************************
1317  *                    AUTOVACUUM WORKER CODE
1318  ********************************************************************/
1319
1320 #ifdef EXEC_BACKEND
1321 /*
1322  * forkexec routines for the autovacuum worker.
1323  *
1324  * Format up the arglist, then fork and exec.
1325  */
1326 static pid_t
1327 avworker_forkexec(void)
1328 {
1329         char       *av[10];
1330         int                     ac = 0;
1331
1332         av[ac++] = "postgres";
1333         av[ac++] = "--forkavworker";
1334         av[ac++] = NULL;                        /* filled in by postmaster_forkexec */
1335         av[ac] = NULL;
1336
1337         Assert(ac < lengthof(av));
1338
1339         return postmaster_forkexec(ac, av);
1340 }
1341
1342 /*
1343  * We need this set from the outside, before InitProcess is called
1344  */
1345 void
1346 AutovacuumWorkerIAm(void)
1347 {
1348         am_autovacuum_worker = true;
1349 }
1350 #endif
1351
1352 /*
1353  * Main entry point for autovacuum worker process.
1354  *
1355  * This code is heavily based on pgarch.c, q.v.
1356  */
1357 int
1358 StartAutoVacWorker(void)
1359 {
1360         pid_t           worker_pid;
1361
1362 #ifdef EXEC_BACKEND
1363         switch ((worker_pid = avworker_forkexec()))
1364 #else
1365         switch ((worker_pid = fork_process()))
1366 #endif
1367         {
1368                 case -1:
1369                         ereport(LOG,
1370                                         (errmsg("could not fork autovacuum process: %m")));
1371                         return 0;
1372
1373 #ifndef EXEC_BACKEND
1374                 case 0:
1375                         /* in postmaster child ... */
1376                         /* Close the postmaster's sockets */
1377                         ClosePostmasterPorts(false);
1378
1379                         /* Lose the postmaster's on-exit routines */
1380                         on_exit_reset();
1381
1382                         AutoVacWorkerMain(0, NULL);
1383                         break;
1384 #endif
1385                 default:
1386                         return (int) worker_pid;
1387         }
1388
1389         /* shouldn't get here */
1390         return 0;
1391 }
1392
1393 /*
1394  * AutoVacWorkerMain
1395  */
1396 NON_EXEC_STATIC void
1397 AutoVacWorkerMain(int argc, char *argv[])
1398 {
1399         sigjmp_buf      local_sigjmp_buf;
1400         Oid                     dbid;
1401
1402         /* we are a postmaster subprocess now */
1403         IsUnderPostmaster = true;
1404         am_autovacuum_worker = true;
1405
1406         /* reset MyProcPid */
1407         MyProcPid = getpid();
1408
1409         /* record Start Time for logging */
1410         MyStartTime = time(NULL);
1411
1412         /* Identify myself via ps */
1413         init_ps_display("autovacuum worker process", "", "", "");
1414
1415         SetProcessingMode(InitProcessing);
1416
1417         /*
1418          * If possible, make this process a group leader, so that the postmaster
1419          * can signal any child processes too.  (autovacuum probably never has
1420          * any child processes, but for consistency we make all postmaster
1421          * child processes do this.)
1422          */
1423 #ifdef HAVE_SETSID
1424         if (setsid() < 0)
1425                 elog(FATAL, "setsid() failed: %m");
1426 #endif
1427
1428         /*
1429          * Set up signal handlers.      We operate on databases much like a regular
1430          * backend, so we use the same signal handling.  See equivalent code in
1431          * tcop/postgres.c.
1432          *
1433          * Currently, we don't pay attention to postgresql.conf changes that
1434          * happen during a single daemon iteration, so we can ignore SIGHUP.
1435          */
1436         pqsignal(SIGHUP, SIG_IGN);
1437
1438         /*
1439          * SIGINT is used to signal cancelling the current table's vacuum;
1440          * SIGTERM means abort and exit cleanly, and SIGQUIT means abandon ship.
1441          */
1442         pqsignal(SIGINT, StatementCancelHandler);
1443         pqsignal(SIGTERM, die);
1444         pqsignal(SIGQUIT, quickdie);
1445         pqsignal(SIGALRM, handle_sig_alarm);
1446
1447         pqsignal(SIGPIPE, SIG_IGN);
1448         pqsignal(SIGUSR1, CatchupInterruptHandler);
1449         /* We don't listen for async notifies */
1450         pqsignal(SIGUSR2, SIG_IGN);
1451         pqsignal(SIGFPE, FloatExceptionHandler);
1452         pqsignal(SIGCHLD, SIG_DFL);
1453
1454         /* Early initialization */
1455         BaseInit();
1456
1457         /*
1458          * Create a per-backend PGPROC struct in shared memory, except in the
1459          * EXEC_BACKEND case where this was done in SubPostmasterMain. We must do
1460          * this before we can use LWLocks (and in the EXEC_BACKEND case we already
1461          * had to do some stuff with LWLocks).
1462          */
1463 #ifndef EXEC_BACKEND
1464         InitProcess();
1465 #endif
1466
1467         /*
1468          * If an exception is encountered, processing resumes here.
1469          *
1470          * See notes in postgres.c about the design of this coding.
1471          */
1472         if (sigsetjmp(local_sigjmp_buf, 1) != 0)
1473         {
1474                 /* Prevents interrupts while cleaning up */
1475                 HOLD_INTERRUPTS();
1476
1477                 /* Report the error to the server log */
1478                 EmitErrorReport();
1479
1480                 /*
1481                  * We can now go away.  Note that because we called InitProcess, a
1482                  * callback was registered to do ProcKill, which will clean up
1483                  * necessary state.
1484                  */
1485                 proc_exit(0);
1486         }
1487
1488         /* We can now handle ereport(ERROR) */
1489         PG_exception_stack = &local_sigjmp_buf;
1490
1491         PG_SETMASK(&UnBlockSig);
1492
1493         /*
1494          * Force zero_damaged_pages OFF in the autovac process, even if it is set
1495          * in postgresql.conf.  We don't really want such a dangerous option being
1496          * applied non-interactively.
1497          */
1498         SetConfigOption("zero_damaged_pages", "false", PGC_SUSET, PGC_S_OVERRIDE);
1499
1500         /*
1501          * Force statement_timeout to zero to avoid a timeout setting from
1502          * preventing regular maintenance from being executed.
1503          */
1504         SetConfigOption("statement_timeout", "0", PGC_SUSET, PGC_S_OVERRIDE);
1505
1506         /*
1507          * Get the info about the database we're going to work on.
1508          */
1509         LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
1510
1511         /*
1512          * beware of startingWorker being INVALID; this should normally not happen,
1513          * but if a worker fails after forking and before this, the launcher might
1514          * have decided to remove it from the queue and start again.
1515          */
1516         if (AutoVacuumShmem->av_startingWorker != INVALID_OFFSET)
1517         {
1518                 MyWorkerInfo = (WorkerInfo) MAKE_PTR(AutoVacuumShmem->av_startingWorker);
1519                 dbid = MyWorkerInfo->wi_dboid;
1520                 MyWorkerInfo->wi_workerpid = MyProcPid;
1521
1522                 /* insert into the running list */
1523                 SHMQueueInsertBefore(&AutoVacuumShmem->av_runningWorkers, 
1524                                                          &MyWorkerInfo->wi_links);
1525
1526                 /*
1527                  * remove from the "starting" pointer, so that the launcher can start
1528                  * a new worker if required
1529                  */
1530                 AutoVacuumShmem->av_startingWorker = INVALID_OFFSET;
1531                 LWLockRelease(AutovacuumLock);
1532
1533                 on_shmem_exit(FreeWorkerInfo, 0);
1534
1535                 /* wake up the launcher */
1536                 if (AutoVacuumShmem->av_launcherpid != 0)
1537                         kill(AutoVacuumShmem->av_launcherpid, SIGUSR1);
1538         }
1539         else
1540         {
1541                 /* no worker entry for me, go away */
1542                 elog(WARNING, "autovacuum worker started without a worker entry");
1543                 dbid = InvalidOid;
1544                 LWLockRelease(AutovacuumLock);
1545         }
1546
1547         if (OidIsValid(dbid))
1548         {
1549                 char    *dbname;
1550
1551                 /*
1552                  * Report autovac startup to the stats collector.  We deliberately do
1553                  * this before InitPostgres, so that the last_autovac_time will get
1554                  * updated even if the connection attempt fails.  This is to prevent
1555                  * autovac from getting "stuck" repeatedly selecting an unopenable
1556                  * database, rather than making any progress on stuff it can connect
1557                  * to.
1558                  */
1559                 pgstat_report_autovac(dbid);
1560
1561                 /*
1562                  * Connect to the selected database
1563                  *
1564                  * Note: if we have selected a just-deleted database (due to using
1565                  * stale stats info), we'll fail and exit here.
1566                  */
1567                 InitPostgres(NULL, dbid, NULL, &dbname);
1568                 SetProcessingMode(NormalProcessing);
1569                 set_ps_display(dbname, false);
1570                 ereport(DEBUG1,
1571                                 (errmsg("autovacuum: processing database \"%s\"", dbname)));
1572
1573                 /* And do an appropriate amount of work */
1574                 recentXid = ReadNewTransactionId();
1575                 do_autovacuum();
1576         }
1577
1578         /*
1579          * The launcher will be notified of my death in ProcKill, *if* we managed
1580          * to get a worker slot at all
1581          */
1582
1583         /* All done, go away */
1584         proc_exit(0);
1585 }
1586
1587 /*
1588  * Return a WorkerInfo to the free list
1589  */
1590 static void
1591 FreeWorkerInfo(int code, Datum arg)
1592 {
1593         if (MyWorkerInfo != NULL)
1594         {
1595                 LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
1596
1597                 /*
1598                  * Wake the launcher up so that he can launch a new worker immediately
1599                  * if required.  We only save the launcher's PID in local memory here;
1600                  * the actual signal will be sent when the PGPROC is recycled.  Note
1601                  * that we always do this, so that the launcher can rebalance the cost
1602                  * limit setting of the remaining workers.
1603                  *
1604                  * We somewhat ignore the risk that the launcher changes its PID
1605                  * between we reading it and the actual kill; we expect ProcKill to be
1606                  * called shortly after us, and we assume that PIDs are not reused too
1607                  * quickly after a process exits.
1608                  */
1609                 AutovacuumLauncherPid = AutoVacuumShmem->av_launcherpid;
1610
1611                 SHMQueueDelete(&MyWorkerInfo->wi_links);
1612                 MyWorkerInfo->wi_links.next = AutoVacuumShmem->av_freeWorkers;
1613                 MyWorkerInfo->wi_dboid = InvalidOid;
1614                 MyWorkerInfo->wi_tableoid = InvalidOid;
1615                 MyWorkerInfo->wi_workerpid = 0;
1616                 MyWorkerInfo->wi_launchtime = 0;
1617                 MyWorkerInfo->wi_cost_delay = 0;
1618                 MyWorkerInfo->wi_cost_limit = 0;
1619                 MyWorkerInfo->wi_cost_limit_base = 0;
1620                 AutoVacuumShmem->av_freeWorkers = MAKE_OFFSET(MyWorkerInfo);
1621                 /* not mine anymore */
1622                 MyWorkerInfo = NULL;
1623
1624                 /*
1625                  * now that we're inactive, cause a rebalancing of the surviving
1626                  * workers
1627                  */
1628                 AutoVacuumShmem->av_signal[AutoVacRebalance] = true;
1629                 LWLockRelease(AutovacuumLock);
1630         }
1631 }
1632
1633 /*
1634  * Update the cost-based delay parameters, so that multiple workers consume
1635  * each a fraction of the total available I/O.
1636  */
1637 void
1638 AutoVacuumUpdateDelay(void)
1639 {
1640         if (MyWorkerInfo)
1641         {
1642                 VacuumCostDelay = MyWorkerInfo->wi_cost_delay;
1643                 VacuumCostLimit = MyWorkerInfo->wi_cost_limit;
1644         }
1645 }
1646
1647 /*
1648  * autovac_balance_cost
1649  *              Recalculate the cost limit setting for each active workers.
1650  *
1651  * Caller must hold the AutovacuumLock in exclusive mode.
1652  */
1653 static void
1654 autovac_balance_cost(void)
1655 {
1656         WorkerInfo      worker;
1657         /*
1658          * note: in cost_limit, zero also means use value from elsewhere, because
1659          * zero is not a valid value.
1660          */
1661         int         vac_cost_limit = (autovacuum_vac_cost_limit > 0 ?
1662                                                                   autovacuum_vac_cost_limit : VacuumCostLimit);
1663         int         vac_cost_delay = (autovacuum_vac_cost_delay >= 0 ?
1664                                                                   autovacuum_vac_cost_delay : VacuumCostDelay);
1665         double      cost_total;
1666         double      cost_avail;
1667
1668         /* not set? nothing to do */
1669         if (vac_cost_limit <= 0 || vac_cost_delay <= 0)
1670                 return;
1671
1672         /* caculate the total base cost limit of active workers */
1673         cost_total = 0.0;
1674         worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
1675                                                                            &AutoVacuumShmem->av_runningWorkers,
1676                                                                            offsetof(WorkerInfoData, wi_links));
1677         while (worker)
1678         {
1679                 if (worker->wi_workerpid != 0 &&
1680                         worker->wi_cost_limit_base > 0 && worker->wi_cost_delay > 0)
1681                         cost_total +=
1682                                 (double) worker->wi_cost_limit_base / worker->wi_cost_delay;
1683
1684                 worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
1685                                                                                    &worker->wi_links,
1686                                                                                    offsetof(WorkerInfoData, wi_links));
1687         }
1688         /* there are no cost limits -- nothing to do */
1689         if (cost_total <= 0)
1690                 return;
1691
1692         /*
1693          * Adjust each cost limit of active workers to balance the total of
1694          * cost limit to autovacuum_vacuum_cost_limit.
1695          */
1696         cost_avail = (double) vac_cost_limit / vac_cost_delay;
1697         worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
1698                                                                            &AutoVacuumShmem->av_runningWorkers,
1699                                                                            offsetof(WorkerInfoData, wi_links));
1700         while (worker)
1701         {
1702                 if (worker->wi_workerpid != 0 &&
1703                         worker->wi_cost_limit_base > 0 && worker->wi_cost_delay > 0)
1704                 {
1705                         int     limit = (int)
1706                                 (cost_avail * worker->wi_cost_limit_base / cost_total);
1707
1708                         /*
1709                          * We put a lower bound of 1 to the cost_limit, to avoid division-
1710                          * by-zero in the vacuum code.
1711                          */
1712                         worker->wi_cost_limit = Max(Min(limit, worker->wi_cost_limit_base), 1);
1713
1714                         elog(DEBUG2, "autovac_balance_cost(pid=%u db=%u, rel=%u, cost_limit=%d, cost_delay=%d)",
1715                                  worker->wi_workerpid, worker->wi_dboid,
1716                                  worker->wi_tableoid, worker->wi_cost_limit, worker->wi_cost_delay);
1717                 }
1718
1719                 worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
1720                                                                                    &worker->wi_links,
1721                                                                                    offsetof(WorkerInfoData, wi_links));
1722         }
1723 }
1724
1725 /*
1726  * get_database_list
1727  *
1728  *              Return a list of all databases.  Note we cannot use pg_database,
1729  *              because we aren't connected; we use the flat database file.
1730  */
1731 static List *
1732 get_database_list(void)
1733 {
1734         char       *filename;
1735         List       *dblist = NIL;
1736         char            thisname[NAMEDATALEN];
1737         FILE       *db_file;
1738         Oid                     db_id;
1739         Oid                     db_tablespace;
1740         TransactionId db_frozenxid;
1741
1742         filename = database_getflatfilename();
1743         db_file = AllocateFile(filename, "r");
1744         if (db_file == NULL)
1745                 ereport(FATAL,
1746                                 (errcode_for_file_access(),
1747                                  errmsg("could not open file \"%s\": %m", filename)));
1748
1749         while (read_pg_database_line(db_file, thisname, &db_id,
1750                                                                  &db_tablespace, &db_frozenxid))
1751         {
1752                 avw_dbase *avdb;
1753
1754                 avdb = (avw_dbase *) palloc(sizeof(avw_dbase));
1755
1756                 avdb->adw_datid = db_id;
1757                 avdb->adw_name = pstrdup(thisname);
1758                 avdb->adw_frozenxid = db_frozenxid;
1759                 /* this gets set later: */
1760                 avdb->adw_entry = NULL;
1761
1762                 dblist = lappend(dblist, avdb);
1763         }
1764
1765         FreeFile(db_file);
1766         pfree(filename);
1767
1768         return dblist;
1769 }
1770
1771 /*
1772  * Process a database table-by-table
1773  *
1774  * Note that CHECK_FOR_INTERRUPTS is supposed to be used in certain spots in
1775  * order not to ignore shutdown commands for too long.
1776  */
1777 static void
1778 do_autovacuum(void)
1779 {
1780         Relation        classRel,
1781                                 avRel;
1782         HeapTuple       tuple;
1783         HeapScanDesc relScan;
1784         Form_pg_database dbForm;
1785         List       *table_oids = NIL;
1786         List       *toast_oids = NIL;
1787         List       *table_toast_list = NIL;
1788         ListCell   * volatile cell;
1789         PgStat_StatDBEntry *shared;
1790         PgStat_StatDBEntry *dbentry;
1791         BufferAccessStrategy bstrategy;
1792
1793         /*
1794          * StartTransactionCommand and CommitTransactionCommand will automatically
1795          * switch to other contexts.  We need this one to keep the list of
1796          * relations to vacuum/analyze across transactions.
1797          */
1798         AutovacMemCxt = AllocSetContextCreate(TopMemoryContext,
1799                                                                                   "AV worker",
1800                                                                                   ALLOCSET_DEFAULT_MINSIZE,
1801                                                                                   ALLOCSET_DEFAULT_INITSIZE,
1802                                                                                   ALLOCSET_DEFAULT_MAXSIZE);
1803         MemoryContextSwitchTo(AutovacMemCxt);
1804
1805         /*
1806          * may be NULL if we couldn't find an entry (only happens if we
1807          * are forcing a vacuum for anti-wrap purposes).
1808          */
1809         dbentry = pgstat_fetch_stat_dbentry(MyDatabaseId);
1810
1811         /* Start a transaction so our commands have one to play into. */
1812         StartTransactionCommand();
1813
1814         /* functions in indexes may want a snapshot set */
1815         ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
1816
1817         /*
1818          * Clean up any dead statistics collector entries for this DB. We always
1819          * want to do this exactly once per DB-processing cycle, even if we find
1820          * nothing worth vacuuming in the database.
1821          */
1822         pgstat_vacuum_tabstat();
1823
1824         /*
1825          * Find the pg_database entry and select the default freeze_min_age.
1826          * We use zero in template and nonconnectable databases,
1827          * else the system-wide default.
1828          */
1829         tuple = SearchSysCache(DATABASEOID,
1830                                                    ObjectIdGetDatum(MyDatabaseId),
1831                                                    0, 0, 0);
1832         if (!HeapTupleIsValid(tuple))
1833                 elog(ERROR, "cache lookup failed for database %u", MyDatabaseId);
1834         dbForm = (Form_pg_database) GETSTRUCT(tuple);
1835
1836         if (dbForm->datistemplate || !dbForm->datallowconn)
1837                 default_freeze_min_age = 0;
1838         else
1839                 default_freeze_min_age = vacuum_freeze_min_age;
1840
1841         ReleaseSysCache(tuple);
1842
1843         /* StartTransactionCommand changed elsewhere */
1844         MemoryContextSwitchTo(AutovacMemCxt);
1845
1846         /* The database hash where pgstat keeps shared relations */
1847         shared = pgstat_fetch_stat_dbentry(InvalidOid);
1848
1849         classRel = heap_open(RelationRelationId, AccessShareLock);
1850         avRel = heap_open(AutovacuumRelationId, AccessShareLock);
1851
1852         /*
1853          * Scan pg_class and determine which tables to vacuum.
1854          *
1855          * The stats subsystem collects stats for toast tables independently of
1856          * the stats for their parent tables.  We need to check those stats since
1857          * in cases with short, wide tables there might be proportionally much
1858          * more activity in the toast table than in its parent.
1859          *
1860          * Since we can only issue VACUUM against the parent table, we need to
1861          * transpose a decision to vacuum a toast table into a decision to vacuum
1862          * its parent.  There's no point in considering ANALYZE on a toast table,
1863          * either.      To support this, we keep a list of OIDs of toast tables that
1864          * need vacuuming alongside the list of regular tables.  Regular tables
1865          * will be entered into the table list even if they appear not to need
1866          * vacuuming; we go back and re-mark them after finding all the vacuumable
1867          * toast tables.
1868          */
1869         relScan = heap_beginscan(classRel, SnapshotNow, 0, NULL);
1870
1871         while ((tuple = heap_getnext(relScan, ForwardScanDirection)) != NULL)
1872         {
1873                 Form_pg_class classForm = (Form_pg_class) GETSTRUCT(tuple);
1874                 Form_pg_autovacuum avForm = NULL;
1875                 PgStat_StatTabEntry *tabentry;
1876                 HeapTuple       avTup;
1877                 Oid                     relid;
1878
1879                 /* Consider only regular and toast tables. */
1880                 if (classForm->relkind != RELKIND_RELATION &&
1881                         classForm->relkind != RELKIND_TOASTVALUE)
1882                         continue;
1883
1884                 /*
1885                  * Skip temp tables (i.e. those in temp namespaces).  We cannot safely
1886                  * process other backends' temp tables.
1887                  */
1888                 if (isAnyTempNamespace(classForm->relnamespace))
1889                         continue;
1890
1891                 relid = HeapTupleGetOid(tuple);
1892
1893                 /* Fetch the pg_autovacuum tuple for the relation, if any */
1894                 avTup = get_pg_autovacuum_tuple_relid(avRel, relid);
1895                 if (HeapTupleIsValid(avTup))
1896                         avForm = (Form_pg_autovacuum) GETSTRUCT(avTup);
1897
1898                 /* Fetch the pgstat entry for this table */
1899                 tabentry = get_pgstat_tabentry_relid(relid, classForm->relisshared,
1900                                                                                          shared, dbentry);
1901
1902                 relation_check_autovac(relid, classForm, avForm, tabentry,
1903                                                            &table_oids, &table_toast_list, &toast_oids);
1904
1905                 if (HeapTupleIsValid(avTup))
1906                         heap_freetuple(avTup);
1907         }
1908
1909         heap_endscan(relScan);
1910         heap_close(avRel, AccessShareLock);
1911         heap_close(classRel, AccessShareLock);
1912
1913         /*
1914          * Add to the list of tables to vacuum, the OIDs of the tables that
1915          * correspond to the saved OIDs of toast tables needing vacuum.
1916          */
1917         foreach(cell, toast_oids)
1918         {
1919                 Oid             toastoid = lfirst_oid(cell);
1920                 ListCell *cell2;
1921
1922                 foreach(cell2, table_toast_list)
1923                 {
1924                         av_relation        *ar = lfirst(cell2);
1925
1926                         if (ar->ar_toastrelid == toastoid)
1927                         {
1928                                 table_oids = lappend_oid(table_oids, ar->ar_relid);
1929                                 break;
1930                         }
1931                 }
1932         }
1933
1934         list_free_deep(table_toast_list);
1935         table_toast_list = NIL;
1936         list_free(toast_oids);
1937         toast_oids = NIL;
1938
1939         /*
1940          * Create a buffer access strategy object for VACUUM to use.  We want
1941          * to use the same one across all the vacuum operations we perform,
1942          * since the point is for VACUUM not to blow out the shared cache.
1943          */
1944         bstrategy = GetAccessStrategy(BAS_VACUUM);
1945
1946         /*
1947          * create a memory context to act as fake PortalContext, so that the
1948          * contexts created in the vacuum code are cleaned up for each table.
1949          */
1950         PortalContext = AllocSetContextCreate(AutovacMemCxt,
1951                                                                                   "Autovacuum Portal",
1952                                                                                   ALLOCSET_DEFAULT_INITSIZE,
1953                                                                                   ALLOCSET_DEFAULT_MINSIZE,
1954                                                                                   ALLOCSET_DEFAULT_MAXSIZE);
1955
1956         /*
1957          * Perform operations on collected tables.
1958          */
1959         foreach(cell, table_oids)
1960         {
1961                 Oid             relid = lfirst_oid(cell);
1962                 autovac_table *tab;
1963                 WorkerInfo      worker;
1964                 bool        skipit;
1965
1966                 CHECK_FOR_INTERRUPTS();
1967
1968                 /*
1969                  * hold schedule lock from here until we're sure that this table
1970                  * still needs vacuuming.  We also need the AutovacuumLock to walk
1971                  * the worker array, but we'll let go of that one quickly.
1972                  */
1973                 LWLockAcquire(AutovacuumScheduleLock, LW_EXCLUSIVE);
1974                 LWLockAcquire(AutovacuumLock, LW_SHARED);
1975
1976                 /*
1977                  * Check whether the table is being vacuumed concurrently by another
1978                  * worker.
1979                  */
1980                 skipit = false;
1981                 worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
1982                                                                                    &AutoVacuumShmem->av_runningWorkers,
1983                                                                                    offsetof(WorkerInfoData, wi_links));
1984                 while (worker)
1985                 {
1986                         /* ignore myself */
1987                         if (worker == MyWorkerInfo)
1988                                 goto next_worker;
1989
1990                         /* ignore workers in other databases */
1991                         if (worker->wi_dboid != MyDatabaseId)
1992                                 goto next_worker;
1993
1994                         if (worker->wi_tableoid == relid)
1995                         {
1996                                 skipit = true;
1997                                 break;
1998                         }
1999
2000 next_worker:
2001                         worker = (WorkerInfo) SHMQueueNext(&AutoVacuumShmem->av_runningWorkers,
2002                                                                                            &worker->wi_links,
2003                                                                                            offsetof(WorkerInfoData, wi_links));
2004                 }
2005                 LWLockRelease(AutovacuumLock);
2006                 if (skipit)
2007                 {
2008                         LWLockRelease(AutovacuumScheduleLock);
2009                         continue;
2010                 }
2011
2012                 /*
2013                  * Check whether pgstat data still says we need to vacuum this table.
2014                  * It could have changed if something else processed the table while we
2015                  * weren't looking.
2016                  *
2017                  * FIXME we ignore the possibility that the table was finished being
2018                  * vacuumed in the last 500ms (PGSTAT_STAT_INTERVAL).  This is a bug.
2019                  */
2020                 MemoryContextSwitchTo(AutovacMemCxt);
2021                 tab = table_recheck_autovac(relid);
2022                 if (tab == NULL)
2023                 {
2024                         /* someone else vacuumed the table */
2025                         LWLockRelease(AutovacuumScheduleLock);
2026                         continue;
2027                 }
2028
2029                 /*
2030                  * Ok, good to go.  Store the table in shared memory before releasing
2031                  * the lock so that other workers don't vacuum it concurrently.
2032                  */
2033                 MyWorkerInfo->wi_tableoid = relid;
2034                 LWLockRelease(AutovacuumScheduleLock);
2035
2036                 /* Set the initial vacuum cost parameters for this table */
2037                 VacuumCostDelay = tab->at_vacuum_cost_delay;
2038                 VacuumCostLimit = tab->at_vacuum_cost_limit;
2039
2040                 /*
2041                  * Advertise my cost delay parameters for the balancing algorithm, and
2042                  * do a balance
2043                  */
2044                 LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
2045                 MyWorkerInfo->wi_cost_delay = tab->at_vacuum_cost_delay;
2046                 MyWorkerInfo->wi_cost_limit = tab->at_vacuum_cost_limit;
2047                 MyWorkerInfo->wi_cost_limit_base = tab->at_vacuum_cost_limit;
2048                 autovac_balance_cost();
2049                 LWLockRelease(AutovacuumLock);
2050
2051                 /* clean up memory before each iteration */
2052                 MemoryContextResetAndDeleteChildren(PortalContext);
2053
2054                 /*
2055                  * We will abort vacuuming the current table if we are interrupted, and
2056                  * continue with the next one in schedule; but if anything else
2057                  * happens, we will do our usual error handling which is to cause the
2058                  * worker process to exit.
2059                  */
2060                 PG_TRY();
2061                 {
2062                         /* have at it */
2063                         MemoryContextSwitchTo(TopTransactionContext);
2064                         autovacuum_do_vac_analyze(tab->at_relid,
2065                                                                           tab->at_dovacuum,
2066                                                                           tab->at_doanalyze,
2067                                                                           tab->at_freeze_min_age,
2068                                                                           bstrategy);
2069                 }
2070                 PG_CATCH();
2071                 {
2072                         ErrorData          *errdata;
2073
2074                         MemoryContextSwitchTo(TopTransactionContext);
2075                         errdata = CopyErrorData();
2076
2077                         /*
2078                          * If we errored out due to a cancel request, abort and restart the
2079                          * transaction and go to the next table.  Otherwise rethrow the
2080                          * error so that the outermost handler deals with it.
2081                          */
2082                         if (errdata->sqlerrcode == ERRCODE_QUERY_CANCELED)
2083                         {
2084                                 HOLD_INTERRUPTS();
2085                                 elog(LOG, "cancelling autovacuum of table \"%s.%s.%s\"",
2086                                          get_database_name(MyDatabaseId),
2087                                          get_namespace_name(get_rel_namespace(tab->at_relid)),
2088                                          get_rel_name(tab->at_relid));
2089
2090                                 AbortOutOfAnyTransaction();
2091                                 FlushErrorState();
2092                                 MemoryContextResetAndDeleteChildren(PortalContext);
2093
2094                                 /* restart our transaction for the following operations */
2095                                 StartTransactionCommand();
2096                                 RESUME_INTERRUPTS();
2097                         }
2098                         else
2099                                 PG_RE_THROW();
2100                 }
2101                 PG_END_TRY();
2102
2103                 /* be tidy */
2104                 pfree(tab);
2105         }
2106
2107         /*
2108          * Update pg_database.datfrozenxid, and truncate pg_clog if possible.
2109          * We only need to do this once, not after each table.
2110          */
2111         vac_update_datfrozenxid();
2112
2113         /* Finally close out the last transaction. */
2114         CommitTransactionCommand();
2115 }
2116
2117 /*
2118  * Returns a copy of the pg_autovacuum tuple for the given relid, or NULL if
2119  * there isn't any.  avRel is pg_autovacuum, already open and suitably locked.
2120  */
2121 static HeapTuple
2122 get_pg_autovacuum_tuple_relid(Relation avRel, Oid relid)
2123 {
2124         ScanKeyData entry[1];
2125         SysScanDesc avScan;
2126         HeapTuple       avTup;
2127
2128         ScanKeyInit(&entry[0],
2129                                 Anum_pg_autovacuum_vacrelid,
2130                                 BTEqualStrategyNumber, F_OIDEQ,
2131                                 ObjectIdGetDatum(relid));
2132
2133         avScan = systable_beginscan(avRel, AutovacuumRelidIndexId, true,
2134                                                                 SnapshotNow, 1, entry);
2135
2136         avTup = systable_getnext(avScan);
2137
2138         if (HeapTupleIsValid(avTup))
2139                 avTup = heap_copytuple(avTup);
2140
2141         systable_endscan(avScan);
2142
2143         return avTup;
2144 }
2145
2146 /*
2147  * get_pgstat_tabentry_relid
2148  *
2149  * Fetch the pgstat entry of a table, either local to a database or shared.
2150  */
2151 static PgStat_StatTabEntry *
2152 get_pgstat_tabentry_relid(Oid relid, bool isshared, PgStat_StatDBEntry *shared,
2153                                                   PgStat_StatDBEntry *dbentry)
2154 {
2155         PgStat_StatTabEntry *tabentry = NULL;
2156
2157         if (isshared)
2158         {
2159                 if (PointerIsValid(shared))
2160                         tabentry = hash_search(shared->tables, &relid,
2161                                                                    HASH_FIND, NULL);
2162         }
2163         else if (PointerIsValid(dbentry))
2164                 tabentry = hash_search(dbentry->tables, &relid,
2165                                                            HASH_FIND, NULL);
2166
2167         return tabentry;
2168 }
2169
2170 /*
2171  * relation_check_autovac
2172  *
2173  * For a given relation (either a plain table or TOAST table), check whether it
2174  * needs vacuum or analyze.
2175  *
2176  * Plain tables that need either are added to the table_list.  TOAST tables
2177  * that need vacuum are added to toast_list.  Plain tables that don't need
2178  * either but which have a TOAST table are added, as a struct, to
2179  * table_toast_list.  The latter is to allow appending the OIDs of the plain
2180  * tables whose TOAST table needs vacuuming into the plain tables list, which
2181  * allows us to substantially reduce the number of "rechecks" that we need to
2182  * do later on.
2183  */
2184 static void
2185 relation_check_autovac(Oid relid, Form_pg_class classForm,
2186                                            Form_pg_autovacuum avForm, PgStat_StatTabEntry *tabentry,
2187                                            List **table_oids, List **table_toast_list,
2188                                            List **toast_oids)
2189 {
2190         bool    dovacuum;
2191         bool    doanalyze;
2192
2193         relation_needs_vacanalyze(relid, avForm, classForm, tabentry,
2194                                                           &dovacuum, &doanalyze);
2195
2196         if (classForm->relkind == RELKIND_TOASTVALUE)
2197         {
2198                 if (dovacuum)
2199                         *toast_oids = lappend_oid(*toast_oids, relid);
2200         }
2201         else
2202         {
2203                 Assert(classForm->relkind == RELKIND_RELATION);
2204
2205                 if (dovacuum || doanalyze)
2206                         *table_oids = lappend_oid(*table_oids, relid);
2207                 else if (OidIsValid(classForm->reltoastrelid))
2208                 {
2209                         av_relation        *rel = palloc(sizeof(av_relation));
2210
2211                         rel->ar_relid = relid;
2212                         rel->ar_toastrelid = classForm->reltoastrelid;
2213
2214                         *table_toast_list = lappend(*table_toast_list, rel);
2215                 }
2216         }
2217 }
2218
2219 /*
2220  * table_recheck_autovac
2221  *
2222  * Recheck whether a plain table still needs vacuum or analyze; be it because
2223  * it does directly, or because its TOAST table does.  Return value is a valid
2224  * autovac_table pointer if it does, NULL otherwise.
2225  */
2226 static autovac_table *
2227 table_recheck_autovac(Oid relid)
2228 {
2229         Form_pg_autovacuum avForm = NULL;
2230         Form_pg_class classForm;
2231         HeapTuple       classTup;
2232         HeapTuple       avTup;
2233         Relation        avRel;
2234         bool            dovacuum;
2235         bool            doanalyze;
2236         autovac_table *tab = NULL;
2237         PgStat_StatTabEntry *tabentry;
2238         bool            doit = false;
2239         PgStat_StatDBEntry *shared;
2240         PgStat_StatDBEntry *dbentry;
2241
2242         /* use fresh stats */
2243         pgstat_clear_snapshot();
2244
2245         shared = pgstat_fetch_stat_dbentry(InvalidOid);
2246         dbentry = pgstat_fetch_stat_dbentry(MyDatabaseId);
2247
2248         /* fetch the relation's relcache entry */
2249         classTup = SearchSysCacheCopy(RELOID,
2250                                                                   ObjectIdGetDatum(relid),
2251                                                                   0, 0, 0);
2252         if (!HeapTupleIsValid(classTup))
2253                 return NULL;
2254         classForm = (Form_pg_class) GETSTRUCT(classTup);
2255
2256         /* fetch the pg_autovacuum entry, if any */
2257         avRel = heap_open(AutovacuumRelationId, AccessShareLock);
2258         avTup = get_pg_autovacuum_tuple_relid(avRel, relid);
2259         if (HeapTupleIsValid(avTup))
2260                 avForm = (Form_pg_autovacuum) GETSTRUCT(avTup);
2261
2262         /* fetch the pgstat table entry */
2263         tabentry = get_pgstat_tabentry_relid(relid, classForm->relisshared,
2264                                                                                  shared, dbentry);
2265
2266         relation_needs_vacanalyze(relid, avForm, classForm, tabentry,
2267                                                           &dovacuum, &doanalyze);
2268
2269         /* OK, it needs vacuum by itself */
2270         if (dovacuum)
2271                 doit = true;
2272         /* it doesn't need vacuum, but what about it's TOAST table? */
2273         else if (OidIsValid(classForm->reltoastrelid))
2274         {
2275                 Oid             toastrelid = classForm->reltoastrelid;
2276                 HeapTuple       toastClassTup;
2277
2278                 toastClassTup = SearchSysCacheCopy(RELOID,
2279                                                                                    ObjectIdGetDatum(toastrelid),
2280                                                                                    0, 0, 0);
2281                 if (HeapTupleIsValid(toastClassTup))
2282                 {
2283                         bool                    toast_dovacuum;
2284                         bool                    toast_doanalyze;
2285                         Form_pg_class   toastClassForm;
2286                         PgStat_StatTabEntry *toasttabentry;
2287
2288                         toastClassForm = (Form_pg_class) GETSTRUCT(toastClassTup);
2289                         toasttabentry = get_pgstat_tabentry_relid(toastrelid,
2290                                                                                                           toastClassForm->relisshared,
2291                                                                                                           shared, dbentry);
2292
2293                         /* note we use the pg_autovacuum entry for the main table */
2294                         relation_needs_vacanalyze(toastrelid, avForm, toastClassForm,
2295                                                                           toasttabentry, &toast_dovacuum,
2296                                                                           &toast_doanalyze);
2297                         /* we only consider VACUUM for toast tables */
2298                         if (toast_dovacuum)
2299                         {
2300                                 dovacuum = true;
2301                                 doit = true;
2302                         }
2303
2304                         heap_freetuple(toastClassTup);
2305                 }
2306         }
2307
2308         if (doanalyze)
2309                 doit = true;
2310
2311         if (doit)
2312         {
2313                 int                     freeze_min_age;
2314                 int                     vac_cost_limit;
2315                 int                     vac_cost_delay;
2316
2317                 /*
2318                  * Calculate the vacuum cost parameters and the minimum freeze age.  If
2319                  * there is a tuple in pg_autovacuum, use it; else, use the GUC
2320                  * defaults.  Note that the fields may contain "-1" (or indeed any
2321                  * negative value), which means use the GUC defaults for each setting.
2322                  * In cost_limit, the value 0 also means to use the value from
2323                  * elsewhere.
2324                  */
2325                 if (avForm != NULL)
2326                 {
2327                         vac_cost_limit = (avForm->vac_cost_limit > 0) ?
2328                                 avForm->vac_cost_limit :
2329                                 ((autovacuum_vac_cost_limit > 0) ?
2330                                  autovacuum_vac_cost_limit : VacuumCostLimit);
2331
2332                         vac_cost_delay = (avForm->vac_cost_delay >= 0) ?
2333                                 avForm->vac_cost_delay :
2334                                 ((autovacuum_vac_cost_delay >= 0) ?
2335                                  autovacuum_vac_cost_delay : VacuumCostDelay);
2336
2337                         freeze_min_age = (avForm->freeze_min_age >= 0) ?
2338                                 avForm->freeze_min_age : default_freeze_min_age;
2339                 }
2340                 else
2341                 {
2342                         vac_cost_limit = (autovacuum_vac_cost_limit > 0) ?
2343                                 autovacuum_vac_cost_limit : VacuumCostLimit;
2344
2345                         vac_cost_delay = (autovacuum_vac_cost_delay >= 0) ?
2346                                 autovacuum_vac_cost_delay : VacuumCostDelay;
2347
2348                         freeze_min_age = default_freeze_min_age;
2349                 }
2350
2351                 tab = palloc(sizeof(autovac_table));
2352                 tab->at_relid = relid;
2353                 tab->at_dovacuum = dovacuum;
2354                 tab->at_doanalyze = doanalyze;
2355                 tab->at_freeze_min_age = freeze_min_age;
2356                 tab->at_vacuum_cost_limit = vac_cost_limit;
2357                 tab->at_vacuum_cost_delay = vac_cost_delay;
2358         }
2359
2360         heap_close(avRel, AccessShareLock);
2361         if (HeapTupleIsValid(avTup))
2362                 heap_freetuple(avTup);
2363         heap_freetuple(classTup);
2364
2365         return tab;
2366 }
2367
2368 /*
2369  * relation_needs_vacanalyze
2370  *
2371  * Check whether a relation needs to be vacuumed or analyzed; return each into
2372  * "dovacuum" and "doanalyze", respectively.  avForm and tabentry can be NULL,
2373  * classForm shouldn't.
2374  *
2375  * A table needs to be vacuumed if the number of dead tuples exceeds a
2376  * threshold.  This threshold is calculated as
2377  *
2378  * threshold = vac_base_thresh + vac_scale_factor * reltuples
2379  *
2380  * For analyze, the analysis done is that the number of tuples inserted,
2381  * deleted and updated since the last analyze exceeds a threshold calculated
2382  * in the same fashion as above.  Note that the collector actually stores
2383  * the number of tuples (both live and dead) that there were as of the last
2384  * analyze.  This is asymmetric to the VACUUM case.
2385  *
2386  * We also force vacuum if the table's relfrozenxid is more than freeze_max_age
2387  * transactions back.
2388  *
2389  * A table whose pg_autovacuum.enabled value is false, is automatically
2390  * skipped (unless we have to vacuum it due to freeze_max_age).  Thus
2391  * autovacuum can be disabled for specific tables.  Also, when the stats
2392  * collector does not have data about a table, it will be skipped.
2393  *
2394  * A table whose vac_base_thresh value is <0 takes the base value from the
2395  * autovacuum_vacuum_threshold GUC variable.  Similarly, a vac_scale_factor
2396  * value <0 is substituted with the value of
2397  * autovacuum_vacuum_scale_factor GUC variable.  Ditto for analyze.
2398  */
2399 static void
2400 relation_needs_vacanalyze(Oid relid,
2401                                                   Form_pg_autovacuum avForm,
2402                                                   Form_pg_class classForm,
2403                                                   PgStat_StatTabEntry *tabentry,
2404                                                   /* output params below */
2405                                                   bool *dovacuum,
2406                                                   bool *doanalyze)
2407 {
2408         bool            force_vacuum;
2409         float4          reltuples;              /* pg_class.reltuples */
2410         /* constants from pg_autovacuum or GUC variables */
2411         int                     vac_base_thresh,
2412                                 anl_base_thresh;
2413         float4          vac_scale_factor,
2414                                 anl_scale_factor;
2415         /* thresholds calculated from above constants */
2416         float4          vacthresh,
2417                                 anlthresh;
2418         /* number of vacuum (resp. analyze) tuples at this time */
2419         float4          vactuples,
2420                                 anltuples;
2421         /* freeze parameters */
2422         int                     freeze_max_age;
2423         TransactionId xidForceLimit;
2424
2425         AssertArg(classForm != NULL);
2426         AssertArg(OidIsValid(relid));
2427
2428         /*
2429          * Determine vacuum/analyze equation parameters.  If there is a tuple in
2430          * pg_autovacuum, use it; else, use the GUC defaults.  Note that the fields
2431          * may contain "-1" (or indeed any negative value), which means use the GUC
2432          * defaults for each setting.
2433          */
2434         if (avForm != NULL)
2435         {
2436                 vac_scale_factor = (avForm->vac_scale_factor >= 0) ?
2437                         avForm->vac_scale_factor : autovacuum_vac_scale;
2438                 vac_base_thresh = (avForm->vac_base_thresh >= 0) ?
2439                         avForm->vac_base_thresh : autovacuum_vac_thresh;
2440
2441                 anl_scale_factor = (avForm->anl_scale_factor >= 0) ?
2442                         avForm->anl_scale_factor : autovacuum_anl_scale;
2443                 anl_base_thresh = (avForm->anl_base_thresh >= 0) ?
2444                         avForm->anl_base_thresh : autovacuum_anl_thresh;
2445
2446                 freeze_max_age = (avForm->freeze_max_age >= 0) ?
2447                         Min(avForm->freeze_max_age, autovacuum_freeze_max_age) :
2448                         autovacuum_freeze_max_age;
2449         }
2450         else
2451         {
2452                 vac_scale_factor = autovacuum_vac_scale;
2453                 vac_base_thresh = autovacuum_vac_thresh;
2454
2455                 anl_scale_factor = autovacuum_anl_scale;
2456                 anl_base_thresh = autovacuum_anl_thresh;
2457
2458                 freeze_max_age = autovacuum_freeze_max_age;
2459         }
2460
2461         /* Force vacuum if table is at risk of wraparound */
2462         xidForceLimit = recentXid - freeze_max_age;
2463         if (xidForceLimit < FirstNormalTransactionId)
2464                 xidForceLimit -= FirstNormalTransactionId;
2465         force_vacuum = (TransactionIdIsNormal(classForm->relfrozenxid) &&
2466                                         TransactionIdPrecedes(classForm->relfrozenxid,
2467                                                                                   xidForceLimit));
2468
2469         /* User disabled it in pg_autovacuum?  (But ignore if at risk) */
2470         if (avForm && !avForm->enabled && !force_vacuum)
2471         {
2472                 *doanalyze = false;
2473                 *dovacuum = false;
2474                 return;
2475         }
2476
2477         if (PointerIsValid(tabentry))
2478         {
2479                 reltuples = classForm->reltuples;
2480                 vactuples = tabentry->n_dead_tuples;
2481                 anltuples = tabentry->n_live_tuples + tabentry->n_dead_tuples -
2482                         tabentry->last_anl_tuples;
2483
2484                 vacthresh = (float4) vac_base_thresh + vac_scale_factor * reltuples;
2485                 anlthresh = (float4) anl_base_thresh + anl_scale_factor * reltuples;
2486
2487                 /*
2488                  * Note that we don't need to take special consideration for stat
2489                  * reset, because if that happens, the last vacuum and analyze counts
2490                  * will be reset too.
2491                  */
2492                 elog(DEBUG3, "%s: vac: %.0f (threshold %.0f), anl: %.0f (threshold %.0f)",
2493                          NameStr(classForm->relname),
2494                          vactuples, vacthresh, anltuples, anlthresh);
2495
2496                 /* Determine if this table needs vacuum or analyze. */
2497                 *dovacuum = force_vacuum || (vactuples > vacthresh);
2498                 *doanalyze = (anltuples > anlthresh);
2499         }
2500         else
2501         {
2502                 /*
2503                  * Skip a table not found in stat hash, unless we have to force
2504                  * vacuum for anti-wrap purposes.  If it's not acted upon, there's
2505                  * no need to vacuum it.
2506                  */
2507                 *dovacuum = force_vacuum;
2508                 *doanalyze = false;
2509         }
2510
2511         /* ANALYZE refuses to work with pg_statistics */
2512         if (relid == StatisticRelationId)
2513                 *doanalyze = false;
2514 }
2515
2516 /*
2517  * autovacuum_do_vac_analyze
2518  *              Vacuum and/or analyze the specified table
2519  */
2520 static void
2521 autovacuum_do_vac_analyze(Oid relid, bool dovacuum, bool doanalyze,
2522                                                   int freeze_min_age,
2523                                                   BufferAccessStrategy bstrategy)
2524 {
2525         VacuumStmt      vacstmt;
2526         MemoryContext old_cxt;
2527
2528         MemSet(&vacstmt, 0, sizeof(vacstmt));
2529
2530         /*
2531          * The list must survive transaction boundaries, so make sure we create it
2532          * in a long-lived context
2533          */
2534         old_cxt = MemoryContextSwitchTo(AutovacMemCxt);
2535
2536         /* Set up command parameters */
2537         vacstmt.type = T_VacuumStmt;
2538         vacstmt.vacuum = dovacuum;
2539         vacstmt.full = false;
2540         vacstmt.analyze = doanalyze;
2541         vacstmt.freeze_min_age = freeze_min_age;
2542         vacstmt.verbose = false;
2543         vacstmt.relation = NULL;        /* not used since we pass a relids list */
2544         vacstmt.va_cols = NIL;
2545
2546         /* Let pgstat know what we're doing */
2547         autovac_report_activity(&vacstmt, relid);
2548
2549         vacuum(&vacstmt, list_make1_oid(relid), bstrategy, true);
2550         MemoryContextSwitchTo(old_cxt);
2551 }
2552
2553 /*
2554  * autovac_report_activity
2555  *              Report to pgstat what autovacuum is doing
2556  *
2557  * We send a SQL string corresponding to what the user would see if the
2558  * equivalent command was to be issued manually.
2559  *
2560  * Note we assume that we are going to report the next command as soon as we're
2561  * done with the current one, and exiting right after the last one, so we don't
2562  * bother to report "<IDLE>" or some such.
2563  */
2564 static void
2565 autovac_report_activity(VacuumStmt *vacstmt, Oid relid)
2566 {
2567         char       *relname = get_rel_name(relid);
2568         char       *nspname = get_namespace_name(get_rel_namespace(relid));
2569 #define MAX_AUTOVAC_ACTIV_LEN (NAMEDATALEN * 2 + 32)
2570         char            activity[MAX_AUTOVAC_ACTIV_LEN];
2571
2572         /* Report the command and possible options */
2573         if (vacstmt->vacuum)
2574                 snprintf(activity, MAX_AUTOVAC_ACTIV_LEN,
2575                                  "VACUUM%s",
2576                                  vacstmt->analyze ? " ANALYZE" : "");
2577         else
2578                 snprintf(activity, MAX_AUTOVAC_ACTIV_LEN,
2579                                  "ANALYZE");
2580
2581         /*
2582          * Report the qualified name of the relation.
2583          *
2584          * Paranoia is appropriate here in case relation was recently dropped
2585          * --- the lsyscache routines we just invoked will return NULL rather
2586          * than failing.
2587          */
2588         if (relname && nspname)
2589         {
2590                 int                     len = strlen(activity);
2591
2592                 snprintf(activity + len, MAX_AUTOVAC_ACTIV_LEN - len,
2593                                  " %s.%s", nspname, relname);
2594         }
2595
2596         pgstat_report_activity(activity);
2597 }
2598
2599 /*
2600  * AutoVacuumingActive
2601  *              Check GUC vars and report whether the autovacuum process should be
2602  *              running.
2603  */
2604 bool
2605 AutoVacuumingActive(void)
2606 {
2607         if (!autovacuum_start_daemon || !pgstat_collect_startcollector ||
2608                 !pgstat_collect_tuplelevel)
2609                 return false;
2610         return true;
2611 }
2612
2613 /*
2614  * autovac_init
2615  *              This is called at postmaster initialization.
2616  *
2617  * Annoy the user if he got it wrong.
2618  */
2619 void
2620 autovac_init(void)
2621 {
2622         if (!autovacuum_start_daemon)
2623                 return;
2624
2625         if (!pgstat_collect_startcollector || !pgstat_collect_tuplelevel)
2626         {
2627                 ereport(WARNING,
2628                                 (errmsg("autovacuum not started because of misconfiguration"),
2629                                  errhint("Enable options \"stats_start_collector\" and \"stats_row_level\".")));
2630
2631                 /*
2632                  * Set the GUC var so we don't fork autovacuum uselessly, and also to
2633                  * help debugging.
2634                  */
2635                 autovacuum_start_daemon = false;
2636         }
2637 }
2638
2639 /*
2640  * IsAutoVacuum functions
2641  *              Return whether this is either a launcher autovacuum process or a worker
2642  *              process.
2643  */
2644 bool
2645 IsAutoVacuumLauncherProcess(void)
2646 {
2647         return am_autovacuum_launcher;
2648 }
2649
2650 bool
2651 IsAutoVacuumWorkerProcess(void)
2652 {
2653         return am_autovacuum_worker;
2654 }
2655
2656
2657 /*
2658  * AutoVacuumShmemSize
2659  *              Compute space needed for autovacuum-related shared memory
2660  */
2661 Size
2662 AutoVacuumShmemSize(void)
2663 {
2664         Size    size;
2665
2666         /*
2667          * Need the fixed struct and the array of WorkerInfoData.
2668          */
2669         size = sizeof(AutoVacuumShmemStruct);
2670         size = MAXALIGN(size);
2671         size = add_size(size, mul_size(autovacuum_max_workers,
2672                                                                    sizeof(WorkerInfoData)));
2673         return size;
2674 }
2675
2676 /*
2677  * AutoVacuumShmemInit
2678  *              Allocate and initialize autovacuum-related shared memory
2679  */
2680 void
2681 AutoVacuumShmemInit(void)
2682 {
2683         bool        found;
2684
2685         AutoVacuumShmem = (AutoVacuumShmemStruct *)
2686                 ShmemInitStruct("AutoVacuum Data",
2687                                                 AutoVacuumShmemSize(),
2688                                                 &found);
2689         if (AutoVacuumShmem == NULL)
2690                 ereport(FATAL,
2691                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2692                                  errmsg("not enough shared memory for autovacuum")));
2693
2694         if (!IsUnderPostmaster)
2695         {
2696                 WorkerInfo      worker;
2697                 int                     i;
2698
2699                 Assert(!found);
2700
2701                 AutoVacuumShmem->av_launcherpid = 0;
2702                 AutoVacuumShmem->av_freeWorkers = INVALID_OFFSET;
2703                 SHMQueueInit(&AutoVacuumShmem->av_runningWorkers);
2704                 AutoVacuumShmem->av_startingWorker = INVALID_OFFSET;
2705
2706                 worker = (WorkerInfo) ((char *) AutoVacuumShmem +
2707                                                            MAXALIGN(sizeof(AutoVacuumShmemStruct)));
2708
2709                 /* initialize the WorkerInfo free list */
2710                 for (i = 0; i < autovacuum_max_workers; i++)
2711                 {
2712                         worker[i].wi_links.next = AutoVacuumShmem->av_freeWorkers;
2713                         AutoVacuumShmem->av_freeWorkers = MAKE_OFFSET(&worker[i]);
2714                 }
2715         }
2716         else
2717                 Assert(found);
2718 }