OSDN Git Service

Reduce impact of btree page reuse on Hot Standby by fixing off-by-1 error.
[pg-rex/syncrep.git] / src / backend / commands / comment.c
1 /*-------------------------------------------------------------------------
2  *
3  * comment.c
4  *
5  * PostgreSQL object comments utility code.
6  *
7  * Copyright (c) 1996-2011, PostgreSQL Global Development Group
8  *
9  * IDENTIFICATION
10  *        src/backend/commands/comment.c
11  *
12  *-------------------------------------------------------------------------
13  */
14
15 #include "postgres.h"
16
17 #include "access/genam.h"
18 #include "access/heapam.h"
19 #include "catalog/indexing.h"
20 #include "catalog/objectaddress.h"
21 #include "catalog/pg_description.h"
22 #include "catalog/pg_shdescription.h"
23 #include "commands/comment.h"
24 #include "commands/dbcommands.h"
25 #include "miscadmin.h"
26 #include "utils/builtins.h"
27 #include "utils/fmgroids.h"
28 #include "utils/tqual.h"
29
30
31 /*
32  * CommentObject --
33  *
34  * This routine is used to add the associated comment into
35  * pg_description for the object specified by the given SQL command.
36  */
37 void
38 CommentObject(CommentStmt *stmt)
39 {
40         ObjectAddress address;
41         Relation        relation;
42
43         /*
44          * When loading a dump, we may see a COMMENT ON DATABASE for the old name
45          * of the database.  Erroring out would prevent pg_restore from completing
46          * (which is really pg_restore's fault, but for now we will work around
47          * the problem here).  Consensus is that the best fix is to treat wrong
48          * database name as a WARNING not an ERROR; hence, the following special
49          * case.  (If the length of stmt->objname is not 1, get_object_address
50          * will throw an error below; that's OK.)
51          */
52         if (stmt->objtype == OBJECT_DATABASE && list_length(stmt->objname) == 1)
53         {
54                 char       *database = strVal(linitial(stmt->objname));
55
56                 if (!OidIsValid(get_database_oid(database, true)))
57                 {
58                         ereport(WARNING,
59                                         (errcode(ERRCODE_UNDEFINED_DATABASE),
60                                          errmsg("database \"%s\" does not exist", database)));
61                         return;
62                 }
63         }
64
65         /*
66          * Translate the parser representation that identifies this object into an
67          * ObjectAddress.  get_object_address() will throw an error if the object
68          * does not exist, and will also acquire a lock on the target to guard
69          * against concurrent DROP operations.
70          */
71         address = get_object_address(stmt->objtype, stmt->objname, stmt->objargs,
72                                                                  &relation, ShareUpdateExclusiveLock);
73
74         /* Require ownership of the target object. */
75         check_object_ownership(GetUserId(), stmt->objtype, address,
76                                                    stmt->objname, stmt->objargs, relation);
77
78         /* Perform other integrity checks as needed. */
79         switch (stmt->objtype)
80         {
81                 case OBJECT_COLUMN:
82
83                         /*
84                          * Allow comments only on columns of tables, views, composite
85                          * types, and foreign tables (which are the only relkinds for
86                          * which pg_dump will dump per-column comments).  In particular we
87                          * wish to disallow comments on index columns, because the naming
88                          * of an index's columns may change across PG versions, so dumping
89                          * per-column comments could create reload failures.
90                          */
91                         if (relation->rd_rel->relkind != RELKIND_RELATION &&
92                                 relation->rd_rel->relkind != RELKIND_VIEW &&
93                                 relation->rd_rel->relkind != RELKIND_COMPOSITE_TYPE &&
94                                 relation->rd_rel->relkind != RELKIND_FOREIGN_TABLE)
95                                 ereport(ERROR,
96                                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
97                                                  errmsg("\"%s\" is not a table, view, composite type, or foreign table",
98                                                                 RelationGetRelationName(relation))));
99                         break;
100                 default:
101                         break;
102         }
103
104         /*
105          * Databases, tablespaces, and roles are cluster-wide objects, so any
106          * comments on those objects are recorded in the shared pg_shdescription
107          * catalog.  Comments on all other objects are recorded in pg_description.
108          */
109         if (stmt->objtype == OBJECT_DATABASE || stmt->objtype == OBJECT_TABLESPACE
110                 || stmt->objtype == OBJECT_ROLE)
111                 CreateSharedComments(address.objectId, address.classId, stmt->comment);
112         else
113                 CreateComments(address.objectId, address.classId, address.objectSubId,
114                                            stmt->comment);
115
116         /*
117          * If get_object_address() opened the relation for us, we close it to keep
118          * the reference count correct - but we retain any locks acquired by
119          * get_object_address() until commit time, to guard against concurrent
120          * activity.
121          */
122         if (relation != NULL)
123                 relation_close(relation, NoLock);
124 }
125
126 /*
127  * CreateComments --
128  *
129  * Create a comment for the specified object descriptor.  Inserts a new
130  * pg_description tuple, or replaces an existing one with the same key.
131  *
132  * If the comment given is null or an empty string, instead delete any
133  * existing comment for the specified key.
134  */
135 void
136 CreateComments(Oid oid, Oid classoid, int32 subid, char *comment)
137 {
138         Relation        description;
139         ScanKeyData skey[3];
140         SysScanDesc sd;
141         HeapTuple       oldtuple;
142         HeapTuple       newtuple = NULL;
143         Datum           values[Natts_pg_description];
144         bool            nulls[Natts_pg_description];
145         bool            replaces[Natts_pg_description];
146         int                     i;
147
148         /* Reduce empty-string to NULL case */
149         if (comment != NULL && strlen(comment) == 0)
150                 comment = NULL;
151
152         /* Prepare to form or update a tuple, if necessary */
153         if (comment != NULL)
154         {
155                 for (i = 0; i < Natts_pg_description; i++)
156                 {
157                         nulls[i] = false;
158                         replaces[i] = true;
159                 }
160                 values[Anum_pg_description_objoid - 1] = ObjectIdGetDatum(oid);
161                 values[Anum_pg_description_classoid - 1] = ObjectIdGetDatum(classoid);
162                 values[Anum_pg_description_objsubid - 1] = Int32GetDatum(subid);
163                 values[Anum_pg_description_description - 1] = CStringGetTextDatum(comment);
164         }
165
166         /* Use the index to search for a matching old tuple */
167
168         ScanKeyInit(&skey[0],
169                                 Anum_pg_description_objoid,
170                                 BTEqualStrategyNumber, F_OIDEQ,
171                                 ObjectIdGetDatum(oid));
172         ScanKeyInit(&skey[1],
173                                 Anum_pg_description_classoid,
174                                 BTEqualStrategyNumber, F_OIDEQ,
175                                 ObjectIdGetDatum(classoid));
176         ScanKeyInit(&skey[2],
177                                 Anum_pg_description_objsubid,
178                                 BTEqualStrategyNumber, F_INT4EQ,
179                                 Int32GetDatum(subid));
180
181         description = heap_open(DescriptionRelationId, RowExclusiveLock);
182
183         sd = systable_beginscan(description, DescriptionObjIndexId, true,
184                                                         SnapshotNow, 3, skey);
185
186         while ((oldtuple = systable_getnext(sd)) != NULL)
187         {
188                 /* Found the old tuple, so delete or update it */
189
190                 if (comment == NULL)
191                         simple_heap_delete(description, &oldtuple->t_self);
192                 else
193                 {
194                         newtuple = heap_modify_tuple(oldtuple, RelationGetDescr(description), values,
195                                                                                  nulls, replaces);
196                         simple_heap_update(description, &oldtuple->t_self, newtuple);
197                 }
198
199                 break;                                  /* Assume there can be only one match */
200         }
201
202         systable_endscan(sd);
203
204         /* If we didn't find an old tuple, insert a new one */
205
206         if (newtuple == NULL && comment != NULL)
207         {
208                 newtuple = heap_form_tuple(RelationGetDescr(description),
209                                                                    values, nulls);
210                 simple_heap_insert(description, newtuple);
211         }
212
213         /* Update indexes, if necessary */
214         if (newtuple != NULL)
215         {
216                 CatalogUpdateIndexes(description, newtuple);
217                 heap_freetuple(newtuple);
218         }
219
220         /* Done */
221
222         heap_close(description, NoLock);
223 }
224
225 /*
226  * CreateSharedComments --
227  *
228  * Create a comment for the specified shared object descriptor.  Inserts a
229  * new pg_shdescription tuple, or replaces an existing one with the same key.
230  *
231  * If the comment given is null or an empty string, instead delete any
232  * existing comment for the specified key.
233  */
234 void
235 CreateSharedComments(Oid oid, Oid classoid, char *comment)
236 {
237         Relation        shdescription;
238         ScanKeyData skey[2];
239         SysScanDesc sd;
240         HeapTuple       oldtuple;
241         HeapTuple       newtuple = NULL;
242         Datum           values[Natts_pg_shdescription];
243         bool            nulls[Natts_pg_shdescription];
244         bool            replaces[Natts_pg_shdescription];
245         int                     i;
246
247         /* Reduce empty-string to NULL case */
248         if (comment != NULL && strlen(comment) == 0)
249                 comment = NULL;
250
251         /* Prepare to form or update a tuple, if necessary */
252         if (comment != NULL)
253         {
254                 for (i = 0; i < Natts_pg_shdescription; i++)
255                 {
256                         nulls[i] = false;
257                         replaces[i] = true;
258                 }
259                 values[Anum_pg_shdescription_objoid - 1] = ObjectIdGetDatum(oid);
260                 values[Anum_pg_shdescription_classoid - 1] = ObjectIdGetDatum(classoid);
261                 values[Anum_pg_shdescription_description - 1] = CStringGetTextDatum(comment);
262         }
263
264         /* Use the index to search for a matching old tuple */
265
266         ScanKeyInit(&skey[0],
267                                 Anum_pg_shdescription_objoid,
268                                 BTEqualStrategyNumber, F_OIDEQ,
269                                 ObjectIdGetDatum(oid));
270         ScanKeyInit(&skey[1],
271                                 Anum_pg_shdescription_classoid,
272                                 BTEqualStrategyNumber, F_OIDEQ,
273                                 ObjectIdGetDatum(classoid));
274
275         shdescription = heap_open(SharedDescriptionRelationId, RowExclusiveLock);
276
277         sd = systable_beginscan(shdescription, SharedDescriptionObjIndexId, true,
278                                                         SnapshotNow, 2, skey);
279
280         while ((oldtuple = systable_getnext(sd)) != NULL)
281         {
282                 /* Found the old tuple, so delete or update it */
283
284                 if (comment == NULL)
285                         simple_heap_delete(shdescription, &oldtuple->t_self);
286                 else
287                 {
288                         newtuple = heap_modify_tuple(oldtuple, RelationGetDescr(shdescription),
289                                                                                  values, nulls, replaces);
290                         simple_heap_update(shdescription, &oldtuple->t_self, newtuple);
291                 }
292
293                 break;                                  /* Assume there can be only one match */
294         }
295
296         systable_endscan(sd);
297
298         /* If we didn't find an old tuple, insert a new one */
299
300         if (newtuple == NULL && comment != NULL)
301         {
302                 newtuple = heap_form_tuple(RelationGetDescr(shdescription),
303                                                                    values, nulls);
304                 simple_heap_insert(shdescription, newtuple);
305         }
306
307         /* Update indexes, if necessary */
308         if (newtuple != NULL)
309         {
310                 CatalogUpdateIndexes(shdescription, newtuple);
311                 heap_freetuple(newtuple);
312         }
313
314         /* Done */
315
316         heap_close(shdescription, NoLock);
317 }
318
319 /*
320  * DeleteComments -- remove comments for an object
321  *
322  * If subid is nonzero then only comments matching it will be removed.
323  * If subid is zero, all comments matching the oid/classoid will be removed
324  * (this corresponds to deleting a whole object).
325  */
326 void
327 DeleteComments(Oid oid, Oid classoid, int32 subid)
328 {
329         Relation        description;
330         ScanKeyData skey[3];
331         int                     nkeys;
332         SysScanDesc sd;
333         HeapTuple       oldtuple;
334
335         /* Use the index to search for all matching old tuples */
336
337         ScanKeyInit(&skey[0],
338                                 Anum_pg_description_objoid,
339                                 BTEqualStrategyNumber, F_OIDEQ,
340                                 ObjectIdGetDatum(oid));
341         ScanKeyInit(&skey[1],
342                                 Anum_pg_description_classoid,
343                                 BTEqualStrategyNumber, F_OIDEQ,
344                                 ObjectIdGetDatum(classoid));
345
346         if (subid != 0)
347         {
348                 ScanKeyInit(&skey[2],
349                                         Anum_pg_description_objsubid,
350                                         BTEqualStrategyNumber, F_INT4EQ,
351                                         Int32GetDatum(subid));
352                 nkeys = 3;
353         }
354         else
355                 nkeys = 2;
356
357         description = heap_open(DescriptionRelationId, RowExclusiveLock);
358
359         sd = systable_beginscan(description, DescriptionObjIndexId, true,
360                                                         SnapshotNow, nkeys, skey);
361
362         while ((oldtuple = systable_getnext(sd)) != NULL)
363                 simple_heap_delete(description, &oldtuple->t_self);
364
365         /* Done */
366
367         systable_endscan(sd);
368         heap_close(description, RowExclusiveLock);
369 }
370
371 /*
372  * DeleteSharedComments -- remove comments for a shared object
373  */
374 void
375 DeleteSharedComments(Oid oid, Oid classoid)
376 {
377         Relation        shdescription;
378         ScanKeyData skey[2];
379         SysScanDesc sd;
380         HeapTuple       oldtuple;
381
382         /* Use the index to search for all matching old tuples */
383
384         ScanKeyInit(&skey[0],
385                                 Anum_pg_shdescription_objoid,
386                                 BTEqualStrategyNumber, F_OIDEQ,
387                                 ObjectIdGetDatum(oid));
388         ScanKeyInit(&skey[1],
389                                 Anum_pg_shdescription_classoid,
390                                 BTEqualStrategyNumber, F_OIDEQ,
391                                 ObjectIdGetDatum(classoid));
392
393         shdescription = heap_open(SharedDescriptionRelationId, RowExclusiveLock);
394
395         sd = systable_beginscan(shdescription, SharedDescriptionObjIndexId, true,
396                                                         SnapshotNow, 2, skey);
397
398         while ((oldtuple = systable_getnext(sd)) != NULL)
399                 simple_heap_delete(shdescription, &oldtuple->t_self);
400
401         /* Done */
402
403         systable_endscan(sd);
404         heap_close(shdescription, RowExclusiveLock);
405 }
406
407 /*
408  * GetComment -- get the comment for an object, or null if not found.
409  */
410 char *
411 GetComment(Oid oid, Oid classoid, int32 subid)
412 {
413         Relation        description;
414         ScanKeyData skey[3];
415         SysScanDesc sd;
416         TupleDesc       tupdesc;
417         HeapTuple       tuple;
418         char       *comment;
419
420         /* Use the index to search for a matching old tuple */
421
422         ScanKeyInit(&skey[0],
423                                 Anum_pg_description_objoid,
424                                 BTEqualStrategyNumber, F_OIDEQ,
425                                 ObjectIdGetDatum(oid));
426         ScanKeyInit(&skey[1],
427                                 Anum_pg_description_classoid,
428                                 BTEqualStrategyNumber, F_OIDEQ,
429                                 ObjectIdGetDatum(classoid));
430         ScanKeyInit(&skey[2],
431                                 Anum_pg_description_objsubid,
432                                 BTEqualStrategyNumber, F_INT4EQ,
433                                 Int32GetDatum(subid));
434
435         description = heap_open(DescriptionRelationId, AccessShareLock);
436         tupdesc = RelationGetDescr(description);
437
438         sd = systable_beginscan(description, DescriptionObjIndexId, true,
439                                                         SnapshotNow, 3, skey);
440
441         comment = NULL;
442         while ((tuple = systable_getnext(sd)) != NULL)
443         {
444                 Datum           value;
445                 bool            isnull;
446
447                 /* Found the tuple, get description field */
448                 value = heap_getattr(tuple, Anum_pg_description_description, tupdesc, &isnull);
449                 if (!isnull)
450                         comment = TextDatumGetCString(value);
451                 break;                                  /* Assume there can be only one match */
452         }
453
454         systable_endscan(sd);
455
456         /* Done */
457         heap_close(description, AccessShareLock);
458
459         return comment;
460 }