OSDN Git Service

Fix various infelicities that have snuck into usage of errdetail() and
[pg-rex/syncrep.git] / src / backend / catalog / pg_shdepend.c
1 /*-------------------------------------------------------------------------
2  *
3  * pg_shdepend.c
4  *        routines to support manipulation of the pg_shdepend relation
5  *
6  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/catalog/pg_shdepend.c,v 1.24 2008/03/24 19:12:49 tgl Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16
17 #include "access/genam.h"
18 #include "access/heapam.h"
19 #include "access/xact.h"
20 #include "catalog/catalog.h"
21 #include "catalog/dependency.h"
22 #include "catalog/indexing.h"
23 #include "catalog/pg_authid.h"
24 #include "catalog/pg_conversion.h"
25 #include "catalog/pg_database.h"
26 #include "catalog/pg_language.h"
27 #include "catalog/pg_namespace.h"
28 #include "catalog/pg_operator.h"
29 #include "catalog/pg_proc.h"
30 #include "catalog/pg_shdepend.h"
31 #include "catalog/pg_tablespace.h"
32 #include "catalog/pg_type.h"
33 #include "commands/conversioncmds.h"
34 #include "commands/defrem.h"
35 #include "commands/schemacmds.h"
36 #include "commands/tablecmds.h"
37 #include "commands/typecmds.h"
38 #include "miscadmin.h"
39 #include "utils/acl.h"
40 #include "utils/fmgroids.h"
41 #include "utils/syscache.h"
42
43
44 typedef enum
45 {
46         LOCAL_OBJECT,
47         SHARED_OBJECT,
48         REMOTE_OBJECT
49 } objectType;
50
51 static int getOidListDiff(Oid *list1, int nlist1, Oid *list2, int nlist2,
52                            Oid **diff);
53 static Oid      classIdGetDbId(Oid classId);
54 static void shdepLockAndCheckObject(Oid classId, Oid objectId);
55 static void shdepChangeDep(Relation sdepRel, Oid classid, Oid objid,
56                            Oid refclassid, Oid refobjid,
57                            SharedDependencyType deptype);
58 static void shdepAddDependency(Relation sdepRel, Oid classId, Oid objectId,
59                                    Oid refclassId, Oid refobjId,
60                                    SharedDependencyType deptype);
61 static void shdepDropDependency(Relation sdepRel, Oid classId, Oid objectId,
62                                         Oid refclassId, Oid refobjId,
63                                         SharedDependencyType deptype);
64 static void storeObjectDescription(StringInfo descs, objectType type,
65                                            ObjectAddress *object,
66                                            SharedDependencyType deptype,
67                                            int count);
68 static bool isSharedObjectPinned(Oid classId, Oid objectId, Relation sdepRel);
69
70
71 /*
72  * recordSharedDependencyOn
73  *
74  * Record a dependency between 2 objects via their respective ObjectAddresses.
75  * The first argument is the dependent object, the second the one it
76  * references (which must be a shared object).
77  *
78  * This locks the referenced object and makes sure it still exists.
79  * Then it creates an entry in pg_shdepend.  The lock is kept until
80  * the end of the transaction.
81  *
82  * Dependencies on pinned objects are not recorded.
83  */
84 void
85 recordSharedDependencyOn(ObjectAddress *depender,
86                                                  ObjectAddress *referenced,
87                                                  SharedDependencyType deptype)
88 {
89         Relation        sdepRel;
90
91         /*
92          * Objects in pg_shdepend can't have SubIds.
93          */
94         Assert(depender->objectSubId == 0);
95         Assert(referenced->objectSubId == 0);
96
97         /*
98          * During bootstrap, do nothing since pg_shdepend may not exist yet.
99          * initdb will fill in appropriate pg_shdepend entries after bootstrap.
100          */
101         if (IsBootstrapProcessingMode())
102                 return;
103
104         sdepRel = heap_open(SharedDependRelationId, RowExclusiveLock);
105
106         /* If the referenced object is pinned, do nothing. */
107         if (!isSharedObjectPinned(referenced->classId, referenced->objectId,
108                                                           sdepRel))
109         {
110                 shdepAddDependency(sdepRel, depender->classId, depender->objectId,
111                                                    referenced->classId, referenced->objectId,
112                                                    deptype);
113         }
114
115         heap_close(sdepRel, RowExclusiveLock);
116 }
117
118 /*
119  * recordDependencyOnOwner
120  *
121  * A convenient wrapper of recordSharedDependencyOn -- register the specified
122  * user as owner of the given object.
123  *
124  * Note: it's the caller's responsibility to ensure that there isn't an owner
125  * entry for the object already.
126  */
127 void
128 recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
129 {
130         ObjectAddress myself,
131                                 referenced;
132
133         myself.classId = classId;
134         myself.objectId = objectId;
135         myself.objectSubId = 0;
136
137         referenced.classId = AuthIdRelationId;
138         referenced.objectId = owner;
139         referenced.objectSubId = 0;
140
141         recordSharedDependencyOn(&myself, &referenced, SHARED_DEPENDENCY_OWNER);
142 }
143
144 /*
145  * shdepChangeDep
146  *
147  * Update shared dependency records to account for an updated referenced
148  * object.      This is an internal workhorse for operations such as changing
149  * an object's owner.
150  *
151  * There must be no more than one existing entry for the given dependent
152  * object and dependency type!  So in practice this can only be used for
153  * updating SHARED_DEPENDENCY_OWNER entries, which should have that property.
154  *
155  * If there is no previous entry, we assume it was referencing a PINned
156  * object, so we create a new entry.  If the new referenced object is
157  * PINned, we don't create an entry (and drop the old one, if any).
158  *
159  * sdepRel must be the pg_shdepend relation, already opened and suitably
160  * locked.
161  */
162 static void
163 shdepChangeDep(Relation sdepRel, Oid classid, Oid objid,
164                            Oid refclassid, Oid refobjid,
165                            SharedDependencyType deptype)
166 {
167         Oid                     dbid = classIdGetDbId(classid);
168         HeapTuple       oldtup = NULL;
169         HeapTuple       scantup;
170         ScanKeyData key[3];
171         SysScanDesc scan;
172
173         /*
174          * Make sure the new referenced object doesn't go away while we record the
175          * dependency.
176          */
177         shdepLockAndCheckObject(refclassid, refobjid);
178
179         /*
180          * Look for a previous entry
181          */
182         ScanKeyInit(&key[0],
183                                 Anum_pg_shdepend_dbid,
184                                 BTEqualStrategyNumber, F_OIDEQ,
185                                 ObjectIdGetDatum(dbid));
186         ScanKeyInit(&key[1],
187                                 Anum_pg_shdepend_classid,
188                                 BTEqualStrategyNumber, F_OIDEQ,
189                                 ObjectIdGetDatum(classid));
190         ScanKeyInit(&key[2],
191                                 Anum_pg_shdepend_objid,
192                                 BTEqualStrategyNumber, F_OIDEQ,
193                                 ObjectIdGetDatum(objid));
194
195         scan = systable_beginscan(sdepRel, SharedDependDependerIndexId, true,
196                                                           SnapshotNow, 3, key);
197
198         while ((scantup = systable_getnext(scan)) != NULL)
199         {
200                 /* Ignore if not of the target dependency type */
201                 if (((Form_pg_shdepend) GETSTRUCT(scantup))->deptype != deptype)
202                         continue;
203                 /* Caller screwed up if multiple matches */
204                 if (oldtup)
205                         elog(ERROR,
206                                  "multiple pg_shdepend entries for object %u/%u deptype %c",
207                                  classid, objid, deptype);
208                 oldtup = heap_copytuple(scantup);
209         }
210
211         systable_endscan(scan);
212
213         if (isSharedObjectPinned(refclassid, refobjid, sdepRel))
214         {
215                 /* No new entry needed, so just delete existing entry if any */
216                 if (oldtup)
217                         simple_heap_delete(sdepRel, &oldtup->t_self);
218         }
219         else if (oldtup)
220         {
221                 /* Need to update existing entry */
222                 Form_pg_shdepend shForm = (Form_pg_shdepend) GETSTRUCT(oldtup);
223
224                 /* Since oldtup is a copy, we can just modify it in-memory */
225                 shForm->refclassid = refclassid;
226                 shForm->refobjid = refobjid;
227
228                 simple_heap_update(sdepRel, &oldtup->t_self, oldtup);
229
230                 /* keep indexes current */
231                 CatalogUpdateIndexes(sdepRel, oldtup);
232         }
233         else
234         {
235                 /* Need to insert new entry */
236                 Datum           values[Natts_pg_shdepend];
237                 bool            nulls[Natts_pg_shdepend];
238
239                 memset(nulls, 0, sizeof(nulls));
240
241                 values[Anum_pg_shdepend_dbid - 1] = ObjectIdGetDatum(dbid);
242                 values[Anum_pg_shdepend_classid - 1] = ObjectIdGetDatum(classid);
243                 values[Anum_pg_shdepend_objid - 1] = ObjectIdGetDatum(objid);
244
245                 values[Anum_pg_shdepend_refclassid - 1] = ObjectIdGetDatum(refclassid);
246                 values[Anum_pg_shdepend_refobjid - 1] = ObjectIdGetDatum(refobjid);
247                 values[Anum_pg_shdepend_deptype - 1] = CharGetDatum(deptype);
248
249                 /*
250                  * we are reusing oldtup just to avoid declaring a new variable, but
251                  * it's certainly a new tuple
252                  */
253                 oldtup = heap_form_tuple(RelationGetDescr(sdepRel), values, nulls);
254                 simple_heap_insert(sdepRel, oldtup);
255
256                 /* keep indexes current */
257                 CatalogUpdateIndexes(sdepRel, oldtup);
258         }
259
260         if (oldtup)
261                 heap_freetuple(oldtup);
262 }
263
264 /*
265  * changeDependencyOnOwner
266  *
267  * Update the shared dependencies to account for the new owner.
268  */
269 void
270 changeDependencyOnOwner(Oid classId, Oid objectId, Oid newOwnerId)
271 {
272         Relation        sdepRel;
273
274         sdepRel = heap_open(SharedDependRelationId, RowExclusiveLock);
275
276         /* Adjust the SHARED_DEPENDENCY_OWNER entry */
277         shdepChangeDep(sdepRel, classId, objectId,
278                                    AuthIdRelationId, newOwnerId,
279                                    SHARED_DEPENDENCY_OWNER);
280
281         /*----------
282          * There should never be a SHARED_DEPENDENCY_ACL entry for the owner,
283          * so get rid of it if there is one.  This can happen if the new owner
284          * was previously granted some rights to the object.
285          *
286          * This step is analogous to aclnewowner's removal of duplicate entries
287          * in the ACL.  We have to do it to handle this scenario:
288          *              A grants some rights on an object to B
289          *              ALTER OWNER changes the object's owner to B
290          *              ALTER OWNER changes the object's owner to C
291          * The third step would remove all mention of B from the object's ACL,
292          * but we'd still have a SHARED_DEPENDENCY_ACL for B if we did not do
293          * things this way.
294          *
295          * The rule against having a SHARED_DEPENDENCY_ACL entry for the owner
296          * allows us to fix things up in just this one place, without having
297          * to make the various ALTER OWNER routines each know about it.
298          *----------
299          */
300         shdepDropDependency(sdepRel, classId, objectId,
301                                                 AuthIdRelationId, newOwnerId,
302                                                 SHARED_DEPENDENCY_ACL);
303
304         heap_close(sdepRel, RowExclusiveLock);
305 }
306
307 /*
308  * getOidListDiff
309  *              Helper for updateAclDependencies.
310  *
311  * Takes two Oid arrays and returns elements from the first not found in the
312  * second.      We assume both arrays are sorted and de-duped, and that the
313  * second array does not contain any values not found in the first.
314  *
315  * NOTE: Both input arrays are pfreed.
316  */
317 static int
318 getOidListDiff(Oid *list1, int nlist1, Oid *list2, int nlist2, Oid **diff)
319 {
320         Oid                *result;
321         int                     i,
322                                 j,
323                                 k = 0;
324
325         AssertArg(nlist1 >= nlist2 && nlist2 >= 0);
326
327         result = palloc(sizeof(Oid) * (nlist1 - nlist2));
328         *diff = result;
329
330         for (i = 0, j = 0; i < nlist1 && j < nlist2;)
331         {
332                 if (list1[i] == list2[j])
333                 {
334                         i++;
335                         j++;
336                 }
337                 else if (list1[i] < list2[j])
338                 {
339                         result[k++] = list1[i];
340                         i++;
341                 }
342                 else
343                 {
344                         /* can't happen */
345                         elog(WARNING, "invalid element %u in shorter list", list2[j]);
346                         j++;
347                 }
348         }
349
350         for (; i < nlist1; i++)
351                 result[k++] = list1[i];
352
353         /* We should have copied the exact number of elements */
354         AssertState(k == (nlist1 - nlist2));
355
356         if (list1)
357                 pfree(list1);
358         if (list2)
359                 pfree(list2);
360
361         return k;
362 }
363
364 /*
365  * updateAclDependencies
366  *              Update the pg_shdepend info for an object's ACL during GRANT/REVOKE.
367  *
368  * classId, objectId: identify the object whose ACL this is
369  * ownerId: role owning the object
370  * isGrant: are we adding or removing ACL entries?
371  * noldmembers, oldmembers: array of roleids appearing in old ACL
372  * nnewmembers, newmembers: array of roleids appearing in new ACL
373  *
374  * We calculate the difference between the new and old lists of roles,
375  * and then insert (if it's a grant) or delete (if it's a revoke) from
376  * pg_shdepend as appropiate.
377  *
378  * Note that we can't insert blindly at grant, because we would end up with
379  * duplicate registered dependencies.  We could check for existence of the
380  * tuple before inserting, but that seems to be more expensive than what we are
381  * doing now.  On the other hand, we can't just delete the tuples blindly at
382  * revoke, because the user may still have other privileges.
383  *
384  * NOTE: Both input arrays must be sorted and de-duped.  They are pfreed
385  * before return.
386  */
387 void
388 updateAclDependencies(Oid classId, Oid objectId, Oid ownerId, bool isGrant,
389                                           int noldmembers, Oid *oldmembers,
390                                           int nnewmembers, Oid *newmembers)
391 {
392         Relation        sdepRel;
393         Oid                *diff;
394         int                     ndiff,
395                                 i;
396
397         /*
398          * Calculate the differences between the old and new lists.
399          */
400         if (isGrant)
401                 ndiff = getOidListDiff(newmembers, nnewmembers,
402                                                            oldmembers, noldmembers, &diff);
403         else
404                 ndiff = getOidListDiff(oldmembers, noldmembers,
405                                                            newmembers, nnewmembers, &diff);
406
407         if (ndiff > 0)
408         {
409                 sdepRel = heap_open(SharedDependRelationId, RowExclusiveLock);
410
411                 /* Add or drop the respective dependency */
412                 for (i = 0; i < ndiff; i++)
413                 {
414                         Oid                     roleid = diff[i];
415
416                         /*
417                          * Skip the owner: he has an OWNER shdep entry instead. (This is
418                          * not just a space optimization; it makes ALTER OWNER easier. See
419                          * notes in changeDependencyOnOwner.)
420                          */
421                         if (roleid == ownerId)
422                                 continue;
423
424                         /* Skip pinned roles */
425                         if (isSharedObjectPinned(AuthIdRelationId, roleid, sdepRel))
426                                 continue;
427
428                         if (isGrant)
429                                 shdepAddDependency(sdepRel, classId, objectId,
430                                                                    AuthIdRelationId, roleid,
431                                                                    SHARED_DEPENDENCY_ACL);
432                         else
433                                 shdepDropDependency(sdepRel, classId, objectId,
434                                                                         AuthIdRelationId, roleid,
435                                                                         SHARED_DEPENDENCY_ACL);
436                 }
437
438                 heap_close(sdepRel, RowExclusiveLock);
439         }
440
441         pfree(diff);
442 }
443
444 /*
445  * A struct to keep track of dependencies found in other databases.
446  */
447 typedef struct
448 {
449         Oid                     dbOid;
450         int                     count;
451 } remoteDep;
452
453 /*
454  * checkSharedDependencies
455  *
456  * Check whether there are shared dependency entries for a given shared
457  * object.      Returns a string containing a newline-separated list of object
458  * descriptions that depend on the shared object, or NULL if none is found.
459  * The size of the returned string is limited to about MAX_REPORTED_DEPS lines;
460  * if there are more objects than that, the output is returned truncated at
461  * that point while the full message is logged to the postmaster log.
462  *
463  * We can find three different kinds of dependencies: dependencies on objects
464  * of the current database; dependencies on shared objects; and dependencies
465  * on objects local to other databases.  We can (and do) provide descriptions
466  * of the two former kinds of objects, but we can't do that for "remote"
467  * objects, so we just provide a count of them.
468  *
469  * If we find a SHARED_DEPENDENCY_PIN entry, we can error out early.
470  */
471 char *
472 checkSharedDependencies(Oid classId, Oid objectId)
473 {
474         Relation        sdepRel;
475         ScanKeyData key[2];
476         SysScanDesc scan;
477         HeapTuple       tup;
478         int                     numReportedDeps = 0;
479         int                     numNotReportedDeps = 0;
480         int                     numNotReportedDbs = 0;
481         List       *remDeps = NIL;
482         ListCell   *cell;
483         ObjectAddress object;
484         StringInfoData descs;
485         StringInfoData alldescs;
486
487         /*
488          * We limit the number of dependencies reported to the client to
489          * MAX_REPORTED_DEPS, since client software may not deal well with
490          * enormous error strings.      The server log always gets a full report,
491          * which is collected in a separate StringInfo if and only if we detect
492          * that the client report is going to be truncated.
493          */
494 #define MAX_REPORTED_DEPS 100
495
496         initStringInfo(&descs);
497         initStringInfo(&alldescs);
498
499         sdepRel = heap_open(SharedDependRelationId, AccessShareLock);
500
501         ScanKeyInit(&key[0],
502                                 Anum_pg_shdepend_refclassid,
503                                 BTEqualStrategyNumber, F_OIDEQ,
504                                 ObjectIdGetDatum(classId));
505         ScanKeyInit(&key[1],
506                                 Anum_pg_shdepend_refobjid,
507                                 BTEqualStrategyNumber, F_OIDEQ,
508                                 ObjectIdGetDatum(objectId));
509
510         scan = systable_beginscan(sdepRel, SharedDependReferenceIndexId, true,
511                                                           SnapshotNow, 2, key);
512
513         while (HeapTupleIsValid(tup = systable_getnext(scan)))
514         {
515                 Form_pg_shdepend sdepForm = (Form_pg_shdepend) GETSTRUCT(tup);
516
517                 /* This case can be dispatched quickly */
518                 if (sdepForm->deptype == SHARED_DEPENDENCY_PIN)
519                 {
520                         object.classId = classId;
521                         object.objectId = objectId;
522                         object.objectSubId = 0;
523                         ereport(ERROR,
524                                         (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
525                                          errmsg("cannot drop %s because it is required by the database system",
526                                                         getObjectDescription(&object))));
527                 }
528
529                 object.classId = sdepForm->classid;
530                 object.objectId = sdepForm->objid;
531                 object.objectSubId = 0;
532
533                 /*
534                  * If it's a dependency local to this database or it's a shared
535                  * object, describe it.
536                  *
537                  * If it's a remote dependency, keep track of it so we can report the
538                  * number of them later.
539                  */
540                 if (sdepForm->dbid == MyDatabaseId)
541                 {
542                         if (numReportedDeps < MAX_REPORTED_DEPS)
543                         {
544                                 numReportedDeps++;
545                                 storeObjectDescription(&descs, LOCAL_OBJECT, &object,
546                                                                            sdepForm->deptype, 0);
547                         }
548                         else
549                         {
550                                 numNotReportedDeps++;
551                                 /* initialize the server-only log line */
552                                 if (alldescs.len == 0)
553                                         appendBinaryStringInfo(&alldescs, descs.data, descs.len);
554
555                                 storeObjectDescription(&alldescs, LOCAL_OBJECT, &object,
556                                                                            sdepForm->deptype, 0);
557                         }
558                 }
559                 else if (sdepForm->dbid == InvalidOid)
560                 {
561                         if (numReportedDeps < MAX_REPORTED_DEPS)
562                         {
563                                 numReportedDeps++;
564                                 storeObjectDescription(&descs, SHARED_OBJECT, &object,
565                                                                            sdepForm->deptype, 0);
566                         }
567                         else
568                         {
569                                 numNotReportedDeps++;
570                                 /* initialize the server-only log line */
571                                 if (alldescs.len == 0)
572                                         appendBinaryStringInfo(&alldescs, descs.data, descs.len);
573
574                                 storeObjectDescription(&alldescs, SHARED_OBJECT, &object,
575                                                                            sdepForm->deptype, 0);
576                         }
577                 }
578                 else
579                 {
580                         /* It's not local nor shared, so it must be remote. */
581                         remoteDep  *dep;
582                         bool            stored = false;
583
584                         /*
585                          * XXX this info is kept on a simple List.      Maybe it's not good
586                          * for performance, but using a hash table seems needlessly
587                          * complex.  The expected number of databases is not high anyway,
588                          * I suppose.
589                          */
590                         foreach(cell, remDeps)
591                         {
592                                 dep = lfirst(cell);
593                                 if (dep->dbOid == sdepForm->dbid)
594                                 {
595                                         dep->count++;
596                                         stored = true;
597                                         break;
598                                 }
599                         }
600                         if (!stored)
601                         {
602                                 dep = (remoteDep *) palloc(sizeof(remoteDep));
603                                 dep->dbOid = sdepForm->dbid;
604                                 dep->count = 1;
605                                 remDeps = lappend(remDeps, dep);
606                         }
607                 }
608         }
609
610         systable_endscan(scan);
611
612         heap_close(sdepRel, AccessShareLock);
613
614         /*
615          * Report dependencies on remote databases.  If we're truncating the
616          * output already, don't put a line per database, but a single one for all
617          * of them.  Otherwise add as many as fit in MAX_REPORTED_DEPS.
618          */
619         foreach(cell, remDeps)
620         {
621                 remoteDep  *dep = lfirst(cell);
622
623                 object.classId = DatabaseRelationId;
624                 object.objectId = dep->dbOid;
625                 object.objectSubId = 0;
626
627                 if (numReportedDeps < MAX_REPORTED_DEPS)
628                 {
629                         numReportedDeps++;
630                         storeObjectDescription(&descs, REMOTE_OBJECT, &object,
631                                                                    SHARED_DEPENDENCY_INVALID, dep->count);
632                 }
633                 else
634                 {
635                         numNotReportedDbs++;
636                         /* initialize the server-only log line */
637                         if (alldescs.len == 0)
638                                 appendBinaryStringInfo(&alldescs, descs.data, descs.len);
639
640                         storeObjectDescription(&alldescs, REMOTE_OBJECT, &object,
641                                                                    SHARED_DEPENDENCY_INVALID, dep->count);
642                 }
643         }
644
645         list_free_deep(remDeps);
646
647         if (descs.len == 0)
648         {
649                 pfree(descs.data);
650                 pfree(alldescs.data);
651                 return NULL;
652         }
653
654         if (numNotReportedDeps > 0)
655                 appendStringInfo(&descs, _("\nand %d other objects "
656                                                                    "(see server log for list)"),
657                                                  numNotReportedDeps);
658         if (numNotReportedDbs > 0)
659                 appendStringInfo(&descs, _("\nand objects in %d other databases "
660                                                                    "(see server log for list)"),
661                                                  numNotReportedDbs);
662
663         if (numNotReportedDeps > 0 || numNotReportedDbs > 0)
664         {
665                 ObjectAddress obj;
666
667                 obj.classId = classId;
668                 obj.objectId = objectId;
669                 obj.objectSubId = 0;
670                 ereport(LOG,
671                                 (errmsg("there are objects dependent on %s",
672                                                 getObjectDescription(&obj)),
673                                  errdetail("%s", alldescs.data)));
674         }
675
676         pfree(alldescs.data);
677
678         return descs.data;
679 }
680
681 /*
682  * copyTemplateDependencies
683  *
684  * Routine to create the initial shared dependencies of a new database.
685  * We simply copy the dependencies from the template database.
686  */
687 void
688 copyTemplateDependencies(Oid templateDbId, Oid newDbId)
689 {
690         Relation        sdepRel;
691         TupleDesc       sdepDesc;
692         ScanKeyData key[1];
693         SysScanDesc scan;
694         HeapTuple       tup;
695         CatalogIndexState indstate;
696         Datum           values[Natts_pg_shdepend];
697         bool            nulls[Natts_pg_shdepend];
698         bool            replace[Natts_pg_shdepend];
699
700         sdepRel = heap_open(SharedDependRelationId, RowExclusiveLock);
701         sdepDesc = RelationGetDescr(sdepRel);
702
703         indstate = CatalogOpenIndexes(sdepRel);
704
705         /* Scan all entries with dbid = templateDbId */
706         ScanKeyInit(&key[0],
707                                 Anum_pg_shdepend_dbid,
708                                 BTEqualStrategyNumber, F_OIDEQ,
709                                 ObjectIdGetDatum(templateDbId));
710
711         scan = systable_beginscan(sdepRel, SharedDependDependerIndexId, true,
712                                                           SnapshotNow, 1, key);
713
714         /* Set up to copy the tuples except for inserting newDbId */
715         memset(values, 0, sizeof(values));
716         memset(nulls, false, sizeof(nulls));
717         memset(replace, false, sizeof(replace));
718
719         replace[Anum_pg_shdepend_dbid - 1] = true;
720         values[Anum_pg_shdepend_dbid - 1] = ObjectIdGetDatum(newDbId);
721
722         /*
723          * Copy the entries of the original database, changing the database Id to
724          * that of the new database.  Note that because we are not copying rows
725          * with dbId == 0 (ie, rows describing dependent shared objects) we won't
726          * copy the ownership dependency of the template database itself; this is
727          * what we want.
728          */
729         while (HeapTupleIsValid(tup = systable_getnext(scan)))
730         {
731                 HeapTuple       newtup;
732
733                 newtup = heap_modify_tuple(tup, sdepDesc, values, nulls, replace);
734                 simple_heap_insert(sdepRel, newtup);
735
736                 /* Keep indexes current */
737                 CatalogIndexInsert(indstate, newtup);
738
739                 heap_freetuple(newtup);
740         }
741
742         systable_endscan(scan);
743
744         CatalogCloseIndexes(indstate);
745         heap_close(sdepRel, RowExclusiveLock);
746 }
747
748 /*
749  * dropDatabaseDependencies
750  *
751  * Delete pg_shdepend entries corresponding to a database that's being
752  * dropped.
753  */
754 void
755 dropDatabaseDependencies(Oid databaseId)
756 {
757         Relation        sdepRel;
758         ScanKeyData key[1];
759         SysScanDesc scan;
760         HeapTuple       tup;
761
762         sdepRel = heap_open(SharedDependRelationId, RowExclusiveLock);
763
764         /*
765          * First, delete all the entries that have the database Oid in the dbid
766          * field.
767          */
768         ScanKeyInit(&key[0],
769                                 Anum_pg_shdepend_dbid,
770                                 BTEqualStrategyNumber, F_OIDEQ,
771                                 ObjectIdGetDatum(databaseId));
772         /* We leave the other index fields unspecified */
773
774         scan = systable_beginscan(sdepRel, SharedDependDependerIndexId, true,
775                                                           SnapshotNow, 1, key);
776
777         while (HeapTupleIsValid(tup = systable_getnext(scan)))
778         {
779                 simple_heap_delete(sdepRel, &tup->t_self);
780         }
781
782         systable_endscan(scan);
783
784         /* Now delete all entries corresponding to the database itself */
785         shdepDropDependency(sdepRel, DatabaseRelationId, databaseId,
786                                                 InvalidOid, InvalidOid,
787                                                 SHARED_DEPENDENCY_INVALID);
788
789         heap_close(sdepRel, RowExclusiveLock);
790 }
791
792 /*
793  * deleteSharedDependencyRecordsFor
794  *
795  * Delete all pg_shdepend entries corresponding to an object that's being
796  * dropped or modified.  The object is assumed to be either a shared object
797  * or local to the current database (the classId tells us which).
798  */
799 void
800 deleteSharedDependencyRecordsFor(Oid classId, Oid objectId)
801 {
802         Relation        sdepRel;
803
804         sdepRel = heap_open(SharedDependRelationId, RowExclusiveLock);
805
806         shdepDropDependency(sdepRel, classId, objectId,
807                                                 InvalidOid, InvalidOid,
808                                                 SHARED_DEPENDENCY_INVALID);
809
810         heap_close(sdepRel, RowExclusiveLock);
811 }
812
813 /*
814  * shdepAddDependency
815  *              Internal workhorse for inserting into pg_shdepend
816  *
817  * sdepRel must be the pg_shdepend relation, already opened and suitably
818  * locked.
819  */
820 static void
821 shdepAddDependency(Relation sdepRel, Oid classId, Oid objectId,
822                                    Oid refclassId, Oid refobjId,
823                                    SharedDependencyType deptype)
824 {
825         HeapTuple       tup;
826         Datum           values[Natts_pg_shdepend];
827         bool            nulls[Natts_pg_shdepend];
828
829         /*
830          * Make sure the object doesn't go away while we record the dependency on
831          * it.  DROP routines should lock the object exclusively before they check
832          * shared dependencies.
833          */
834         shdepLockAndCheckObject(refclassId, refobjId);
835
836         memset(nulls, false, sizeof(nulls));
837
838         /*
839          * Form the new tuple and record the dependency.
840          */
841         values[Anum_pg_shdepend_dbid - 1] = ObjectIdGetDatum(classIdGetDbId(classId));
842         values[Anum_pg_shdepend_classid - 1] = ObjectIdGetDatum(classId);
843         values[Anum_pg_shdepend_objid - 1] = ObjectIdGetDatum(objectId);
844
845         values[Anum_pg_shdepend_refclassid - 1] = ObjectIdGetDatum(refclassId);
846         values[Anum_pg_shdepend_refobjid - 1] = ObjectIdGetDatum(refobjId);
847         values[Anum_pg_shdepend_deptype - 1] = CharGetDatum(deptype);
848
849         tup = heap_form_tuple(sdepRel->rd_att, values, nulls);
850
851         simple_heap_insert(sdepRel, tup);
852
853         /* keep indexes current */
854         CatalogUpdateIndexes(sdepRel, tup);
855
856         /* clean up */
857         heap_freetuple(tup);
858 }
859
860 /*
861  * shdepDropDependency
862  *              Internal workhorse for deleting entries from pg_shdepend.
863  *
864  * We drop entries having the following properties:
865  *      dependent object is the one identified by classId/objectId
866  *      if refclassId isn't InvalidOid, it must match the entry's refclassid
867  *      if refobjId isn't InvalidOid, it must match the entry's refobjid
868  *      if deptype isn't SHARED_DEPENDENCY_INVALID, it must match entry's deptype
869  *
870  * sdepRel must be the pg_shdepend relation, already opened and suitably
871  * locked.
872  */
873 static void
874 shdepDropDependency(Relation sdepRel, Oid classId, Oid objectId,
875                                         Oid refclassId, Oid refobjId,
876                                         SharedDependencyType deptype)
877 {
878         ScanKeyData key[3];
879         SysScanDesc scan;
880         HeapTuple       tup;
881
882         /* Scan for entries matching the dependent object */
883         ScanKeyInit(&key[0],
884                                 Anum_pg_shdepend_dbid,
885                                 BTEqualStrategyNumber, F_OIDEQ,
886                                 ObjectIdGetDatum(classIdGetDbId(classId)));
887         ScanKeyInit(&key[1],
888                                 Anum_pg_shdepend_classid,
889                                 BTEqualStrategyNumber, F_OIDEQ,
890                                 ObjectIdGetDatum(classId));
891         ScanKeyInit(&key[2],
892                                 Anum_pg_shdepend_objid,
893                                 BTEqualStrategyNumber, F_OIDEQ,
894                                 ObjectIdGetDatum(objectId));
895
896         scan = systable_beginscan(sdepRel, SharedDependDependerIndexId, true,
897                                                           SnapshotNow, 3, key);
898
899         while (HeapTupleIsValid(tup = systable_getnext(scan)))
900         {
901                 Form_pg_shdepend shdepForm = (Form_pg_shdepend) GETSTRUCT(tup);
902
903                 /* Filter entries according to additional parameters */
904                 if (OidIsValid(refclassId) && shdepForm->refclassid != refclassId)
905                         continue;
906                 if (OidIsValid(refobjId) && shdepForm->refobjid != refobjId)
907                         continue;
908                 if (deptype != SHARED_DEPENDENCY_INVALID &&
909                         shdepForm->deptype != deptype)
910                         continue;
911
912                 /* OK, delete it */
913                 simple_heap_delete(sdepRel, &tup->t_self);
914         }
915
916         systable_endscan(scan);
917 }
918
919 /*
920  * classIdGetDbId
921  *
922  * Get the database Id that should be used in pg_shdepend, given the OID
923  * of the catalog containing the object.  For shared objects, it's 0
924  * (InvalidOid); for all other objects, it's the current database Id.
925  */
926 static Oid
927 classIdGetDbId(Oid classId)
928 {
929         Oid                     dbId;
930
931         if (IsSharedRelation(classId))
932                 dbId = InvalidOid;
933         else
934                 dbId = MyDatabaseId;
935
936         return dbId;
937 }
938
939 /*
940  * shdepLockAndCheckObject
941  *
942  * Lock the object that we are about to record a dependency on.
943  * After it's locked, verify that it hasn't been dropped while we
944  * weren't looking.  If the object has been dropped, this function
945  * does not return!
946  */
947 static void
948 shdepLockAndCheckObject(Oid classId, Oid objectId)
949 {
950         /* AccessShareLock should be OK, since we are not modifying the object */
951         LockSharedObject(classId, objectId, 0, AccessShareLock);
952
953         switch (classId)
954         {
955                 case AuthIdRelationId:
956                         if (!SearchSysCacheExists(AUTHOID,
957                                                                           ObjectIdGetDatum(objectId),
958                                                                           0, 0, 0))
959                                 ereport(ERROR,
960                                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
961                                                  errmsg("role %u was concurrently dropped",
962                                                                 objectId)));
963                         break;
964
965                         /*
966                          * Currently, this routine need not support any other shared
967                          * object types besides roles.  If we wanted to record explicit
968                          * dependencies on databases or tablespaces, we'd need code along
969                          * these lines:
970                          */
971 #ifdef NOT_USED
972                 case TableSpaceRelationId:
973                         {
974                                 /* For lack of a syscache on pg_tablespace, do this: */
975                                 char       *tablespace = get_tablespace_name(objectId);
976
977                                 if (tablespace == NULL)
978                                         ereport(ERROR,
979                                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
980                                                          errmsg("tablespace %u was concurrently dropped",
981                                                                         objectId)));
982                                 pfree(tablespace);
983                                 break;
984                         }
985 #endif
986
987                 default:
988                         elog(ERROR, "unrecognized shared classId: %u", classId);
989         }
990 }
991
992
993 /*
994  * storeObjectDescription
995  *              Append the description of a dependent object to "descs"
996  *
997  * While searching for dependencies of a shared object, we stash the
998  * descriptions of dependent objects we find in a single string, which we
999  * later pass to ereport() in the DETAIL field when somebody attempts to
1000  * drop a referenced shared object.
1001  *
1002  * When type is LOCAL_OBJECT or SHARED_OBJECT, we expect object to be the
1003  * dependent object, deptype is the dependency type, and count is not used.
1004  * When type is REMOTE_OBJECT, we expect object to be the database object,
1005  * and count to be nonzero; deptype is not used in this case.
1006  */
1007 static void
1008 storeObjectDescription(StringInfo descs, objectType type,
1009                                            ObjectAddress *object,
1010                                            SharedDependencyType deptype,
1011                                            int count)
1012 {
1013         char       *objdesc = getObjectDescription(object);
1014
1015         /* separate entries with a newline */
1016         if (descs->len != 0)
1017                 appendStringInfoChar(descs, '\n');
1018
1019         switch (type)
1020         {
1021                 case LOCAL_OBJECT:
1022                 case SHARED_OBJECT:
1023                         if (deptype == SHARED_DEPENDENCY_OWNER)
1024                                 appendStringInfo(descs, _("owner of %s"), objdesc);
1025                         else if (deptype == SHARED_DEPENDENCY_ACL)
1026                                 appendStringInfo(descs, _("access to %s"), objdesc);
1027                         else
1028                                 elog(ERROR, "unrecognized dependency type: %d",
1029                                          (int) deptype);
1030                         break;
1031
1032                 case REMOTE_OBJECT:
1033                         /* translator: %s will always be "database %s" */
1034                         appendStringInfo(descs, _("%d objects in %s"), count, objdesc);
1035                         break;
1036
1037                 default:
1038                         elog(ERROR, "unrecognized object type: %d", type);
1039         }
1040
1041         pfree(objdesc);
1042 }
1043
1044
1045 /*
1046  * isSharedObjectPinned
1047  *              Return whether a given shared object has a SHARED_DEPENDENCY_PIN entry.
1048  *
1049  * sdepRel must be the pg_shdepend relation, already opened and suitably
1050  * locked.
1051  */
1052 static bool
1053 isSharedObjectPinned(Oid classId, Oid objectId, Relation sdepRel)
1054 {
1055         bool            result = false;
1056         ScanKeyData key[2];
1057         SysScanDesc scan;
1058         HeapTuple       tup;
1059
1060         ScanKeyInit(&key[0],
1061                                 Anum_pg_shdepend_refclassid,
1062                                 BTEqualStrategyNumber, F_OIDEQ,
1063                                 ObjectIdGetDatum(classId));
1064         ScanKeyInit(&key[1],
1065                                 Anum_pg_shdepend_refobjid,
1066                                 BTEqualStrategyNumber, F_OIDEQ,
1067                                 ObjectIdGetDatum(objectId));
1068
1069         scan = systable_beginscan(sdepRel, SharedDependReferenceIndexId, true,
1070                                                           SnapshotNow, 2, key);
1071
1072         /*
1073          * Since we won't generate additional pg_shdepend entries for pinned
1074          * objects, there can be at most one entry referencing a pinned object.
1075          * Hence, it's sufficient to look at the first returned tuple; we don't
1076          * need to loop.
1077          */
1078         tup = systable_getnext(scan);
1079         if (HeapTupleIsValid(tup))
1080         {
1081                 Form_pg_shdepend shdepForm = (Form_pg_shdepend) GETSTRUCT(tup);
1082
1083                 if (shdepForm->deptype == SHARED_DEPENDENCY_PIN)
1084                         result = true;
1085         }
1086
1087         systable_endscan(scan);
1088
1089         return result;
1090 }
1091
1092 /*
1093  * shdepDropOwned
1094  *
1095  * Drop the objects owned by any one of the given RoleIds.      If a role has
1096  * access to an object, the grant will be removed as well (but the object
1097  * will not, of course.)
1098  *
1099  * We can revoke grants immediately while doing the scan, but drops are
1100  * saved up and done all at once with performMultipleDeletions.  This
1101  * is necessary so that we don't get failures from trying to delete
1102  * interdependent objects in the wrong order.
1103  */
1104 void
1105 shdepDropOwned(List *roleids, DropBehavior behavior)
1106 {
1107         Relation        sdepRel;
1108         ListCell   *cell;
1109         ObjectAddresses *deleteobjs;
1110
1111         deleteobjs = new_object_addresses();
1112
1113         /*
1114          * We don't need this strong a lock here, but we'll call routines that
1115          * acquire RowExclusiveLock.  Better get that right now to avoid potential
1116          * deadlock failures.
1117          */
1118         sdepRel = heap_open(SharedDependRelationId, RowExclusiveLock);
1119
1120         /*
1121          * For each role, find the dependent objects and drop them using the
1122          * regular (non-shared) dependency management.
1123          */
1124         foreach(cell, roleids)
1125         {
1126                 Oid                     roleid = lfirst_oid(cell);
1127                 ScanKeyData key[2];
1128                 SysScanDesc scan;
1129                 HeapTuple       tuple;
1130
1131                 /* Doesn't work for pinned objects */
1132                 if (isSharedObjectPinned(AuthIdRelationId, roleid, sdepRel))
1133                 {
1134                         ObjectAddress obj;
1135
1136                         obj.classId = AuthIdRelationId;
1137                         obj.objectId = roleid;
1138                         obj.objectSubId = 0;
1139
1140                         ereport(ERROR,
1141                                         (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
1142                                    errmsg("cannot drop objects owned by %s because they are "
1143                                                   "required by the database system",
1144                                                   getObjectDescription(&obj))));
1145                 }
1146
1147                 ScanKeyInit(&key[0],
1148                                         Anum_pg_shdepend_refclassid,
1149                                         BTEqualStrategyNumber, F_OIDEQ,
1150                                         ObjectIdGetDatum(AuthIdRelationId));
1151                 ScanKeyInit(&key[1],
1152                                         Anum_pg_shdepend_refobjid,
1153                                         BTEqualStrategyNumber, F_OIDEQ,
1154                                         ObjectIdGetDatum(roleid));
1155
1156                 scan = systable_beginscan(sdepRel, SharedDependReferenceIndexId, true,
1157                                                                   SnapshotNow, 2, key);
1158
1159                 while ((tuple = systable_getnext(scan)) != NULL)
1160                 {
1161                         ObjectAddress obj;
1162                         GrantObjectType objtype;
1163                         InternalGrant istmt;
1164                         Form_pg_shdepend sdepForm = (Form_pg_shdepend) GETSTRUCT(tuple);
1165
1166                         /* We only operate on objects in the current database */
1167                         if (sdepForm->dbid != MyDatabaseId)
1168                                 continue;
1169
1170                         switch (sdepForm->deptype)
1171                         {
1172                                         /* Shouldn't happen */
1173                                 case SHARED_DEPENDENCY_PIN:
1174                                 case SHARED_DEPENDENCY_INVALID:
1175                                         elog(ERROR, "unexpected dependency type");
1176                                         break;
1177                                 case SHARED_DEPENDENCY_ACL:
1178                                         switch (sdepForm->classid)
1179                                         {
1180                                                 case RelationRelationId:
1181                                                         /* it's OK to use RELATION for a sequence */
1182                                                         istmt.objtype = ACL_OBJECT_RELATION;
1183                                                         break;
1184                                                 case DatabaseRelationId:
1185                                                         istmt.objtype = ACL_OBJECT_DATABASE;
1186                                                         break;
1187                                                 case ProcedureRelationId:
1188                                                         istmt.objtype = ACL_OBJECT_FUNCTION;
1189                                                         break;
1190                                                 case LanguageRelationId:
1191                                                         istmt.objtype = ACL_OBJECT_LANGUAGE;
1192                                                         break;
1193                                                 case NamespaceRelationId:
1194                                                         istmt.objtype = ACL_OBJECT_NAMESPACE;
1195                                                         break;
1196                                                 case TableSpaceRelationId:
1197                                                         istmt.objtype = ACL_OBJECT_TABLESPACE;
1198                                                         break;
1199                                                 default:
1200                                                         elog(ERROR, "unexpected object type %d",
1201                                                                  sdepForm->classid);
1202                                                         /* keep compiler quiet */
1203                                                         objtype = (GrantObjectType) 0;
1204                                                         break;
1205                                         }
1206                                         istmt.is_grant = false;
1207                                         istmt.objects = list_make1_oid(sdepForm->objid);
1208                                         istmt.all_privs = true;
1209                                         istmt.privileges = ACL_NO_RIGHTS;
1210                                         istmt.grantees = list_make1_oid(roleid);
1211                                         istmt.grant_option = false;
1212                                         istmt.behavior = DROP_CASCADE;
1213
1214                                         ExecGrantStmt_oids(&istmt);
1215                                         break;
1216                                 case SHARED_DEPENDENCY_OWNER:
1217                                         /* Save it for deletion below */
1218                                         obj.classId = sdepForm->classid;
1219                                         obj.objectId = sdepForm->objid;
1220                                         obj.objectSubId = 0;
1221                                         add_exact_object_address(&obj, deleteobjs);
1222                                         break;
1223                         }
1224                 }
1225
1226                 systable_endscan(scan);
1227         }
1228
1229         /* the dependency mechanism does the actual work */
1230         performMultipleDeletions(deleteobjs, behavior);
1231
1232         heap_close(sdepRel, RowExclusiveLock);
1233
1234         free_object_addresses(deleteobjs);
1235 }
1236
1237 /*
1238  * shdepReassignOwned
1239  *
1240  * Change the owner of objects owned by any of the roles in roleids to
1241  * newrole.  Grants are not touched.
1242  */
1243 void
1244 shdepReassignOwned(List *roleids, Oid newrole)
1245 {
1246         Relation        sdepRel;
1247         ListCell   *cell;
1248
1249         /*
1250          * We don't need this strong a lock here, but we'll call routines that
1251          * acquire RowExclusiveLock.  Better get that right now to avoid potential
1252          * deadlock problems.
1253          */
1254         sdepRel = heap_open(SharedDependRelationId, RowExclusiveLock);
1255
1256         foreach(cell, roleids)
1257         {
1258                 SysScanDesc scan;
1259                 ScanKeyData key[2];
1260                 HeapTuple       tuple;
1261                 Oid                     roleid = lfirst_oid(cell);
1262
1263                 /* Refuse to work on pinned roles */
1264                 if (isSharedObjectPinned(AuthIdRelationId, roleid, sdepRel))
1265                 {
1266                         ObjectAddress obj;
1267
1268                         obj.classId = AuthIdRelationId;
1269                         obj.objectId = roleid;
1270                         obj.objectSubId = 0;
1271
1272                         ereport(ERROR,
1273                                         (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
1274                                    errmsg("cannot drop objects owned by %s because they are "
1275                                                   "required by the database system",
1276                                                   getObjectDescription(&obj))));
1277
1278                         /*
1279                          * There's no need to tell the whole truth, which is that we
1280                          * didn't track these dependencies at all ...
1281                          */
1282                 }
1283
1284                 ScanKeyInit(&key[0],
1285                                         Anum_pg_shdepend_refclassid,
1286                                         BTEqualStrategyNumber, F_OIDEQ,
1287                                         ObjectIdGetDatum(AuthIdRelationId));
1288                 ScanKeyInit(&key[1],
1289                                         Anum_pg_shdepend_refobjid,
1290                                         BTEqualStrategyNumber, F_OIDEQ,
1291                                         ObjectIdGetDatum(roleid));
1292
1293                 scan = systable_beginscan(sdepRel, SharedDependReferenceIndexId, true,
1294                                                                   SnapshotNow, 2, key);
1295
1296                 while ((tuple = systable_getnext(scan)) != NULL)
1297                 {
1298                         Form_pg_shdepend sdepForm = (Form_pg_shdepend) GETSTRUCT(tuple);
1299
1300                         /* We only operate on objects in the current database */
1301                         if (sdepForm->dbid != MyDatabaseId)
1302                                 continue;
1303
1304                         /* Unexpected because we checked for pins above */
1305                         if (sdepForm->deptype == SHARED_DEPENDENCY_PIN)
1306                                 elog(ERROR, "unexpected shared pin");
1307
1308                         /* We leave non-owner dependencies alone */
1309                         if (sdepForm->deptype != SHARED_DEPENDENCY_OWNER)
1310                                 continue;
1311
1312                         /* Issue the appropriate ALTER OWNER call */
1313                         switch (sdepForm->classid)
1314                         {
1315                                 case ConversionRelationId:
1316                                         AlterConversionOwner_oid(sdepForm->objid, newrole);
1317                                         break;
1318
1319                                 case TypeRelationId:
1320                                         AlterTypeOwnerInternal(sdepForm->objid, newrole, true);
1321                                         break;
1322
1323                                 case OperatorRelationId:
1324                                         AlterOperatorOwner_oid(sdepForm->objid, newrole);
1325                                         break;
1326
1327                                 case NamespaceRelationId:
1328                                         AlterSchemaOwner_oid(sdepForm->objid, newrole);
1329                                         break;
1330
1331                                 case RelationRelationId:
1332
1333                                         /*
1334                                          * Pass recursing = true so that we don't fail on indexes,
1335                                          * owned sequences, etc when we happen to visit them
1336                                          * before their parent table.
1337                                          */
1338                                         ATExecChangeOwner(sdepForm->objid, newrole, true);
1339                                         break;
1340
1341                                 case ProcedureRelationId:
1342                                         AlterFunctionOwner_oid(sdepForm->objid, newrole);
1343                                         break;
1344
1345                                 default:
1346                                         elog(ERROR, "unexpected classid %d", sdepForm->classid);
1347                                         break;
1348                         }
1349                         /* Make sure the next iteration will see my changes */
1350                         CommandCounterIncrement();
1351                 }
1352
1353                 systable_endscan(scan);
1354         }
1355
1356         heap_close(sdepRel, RowExclusiveLock);
1357 }