1 /*-------------------------------------------------------------------------
4 * publication manipulation
6 * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
10 * src/backend/commands/publicationcmds.c
12 *-------------------------------------------------------------------------
17 #include "access/genam.h"
18 #include "access/htup_details.h"
19 #include "access/table.h"
20 #include "access/xact.h"
21 #include "catalog/catalog.h"
22 #include "catalog/indexing.h"
23 #include "catalog/namespace.h"
24 #include "catalog/objectaccess.h"
25 #include "catalog/objectaddress.h"
26 #include "catalog/partition.h"
27 #include "catalog/pg_inherits.h"
28 #include "catalog/pg_namespace.h"
29 #include "catalog/pg_proc.h"
30 #include "catalog/pg_publication.h"
31 #include "catalog/pg_publication_namespace.h"
32 #include "catalog/pg_publication_rel.h"
33 #include "catalog/pg_type.h"
34 #include "commands/dbcommands.h"
35 #include "commands/defrem.h"
36 #include "commands/event_trigger.h"
37 #include "commands/publicationcmds.h"
39 #include "miscadmin.h"
40 #include "nodes/nodeFuncs.h"
41 #include "parser/parse_clause.h"
42 #include "parser/parse_collate.h"
43 #include "parser/parse_relation.h"
44 #include "storage/lmgr.h"
45 #include "utils/acl.h"
46 #include "utils/array.h"
47 #include "utils/builtins.h"
48 #include "utils/catcache.h"
49 #include "utils/fmgroids.h"
50 #include "utils/inval.h"
51 #include "utils/lsyscache.h"
52 #include "utils/rel.h"
53 #include "utils/syscache.h"
54 #include "utils/varlena.h"
58 * Information used to validate the columns in the row filter expression. See
59 * contain_invalid_rfcolumn_walker for details.
61 typedef struct rf_context
63 Bitmapset
*bms_replident
; /* bitset of replica identity columns */
64 bool pubviaroot
; /* true if we are validating the parent
65 * relation's row filter */
66 Oid relid
; /* relid of the relation */
67 Oid parentid
; /* relid of the parent relation */
70 static List
*OpenTableList(List
*tables
);
71 static void CloseTableList(List
*rels
);
72 static void LockSchemaList(List
*schemalist
);
73 static void PublicationAddTables(Oid pubid
, List
*rels
, bool if_not_exists
,
74 AlterPublicationStmt
*stmt
);
75 static void PublicationDropTables(Oid pubid
, List
*rels
, bool missing_ok
);
76 static void PublicationAddSchemas(Oid pubid
, List
*schemas
, bool if_not_exists
,
77 AlterPublicationStmt
*stmt
);
78 static void PublicationDropSchemas(Oid pubid
, List
*schemas
, bool missing_ok
);
82 parse_publication_options(ParseState
*pstate
,
85 PublicationActions
*pubactions
,
86 bool *publish_via_partition_root_given
,
87 bool *publish_via_partition_root
)
91 *publish_given
= false;
92 *publish_via_partition_root_given
= false;
95 pubactions
->pubinsert
= true;
96 pubactions
->pubupdate
= true;
97 pubactions
->pubdelete
= true;
98 pubactions
->pubtruncate
= true;
99 *publish_via_partition_root
= false;
104 DefElem
*defel
= (DefElem
*) lfirst(lc
);
106 if (strcmp(defel
->defname
, "publish") == 0)
113 errorConflictingDefElem(defel
, pstate
);
116 * If publish option was given only the explicitly listed actions
117 * should be published.
119 pubactions
->pubinsert
= false;
120 pubactions
->pubupdate
= false;
121 pubactions
->pubdelete
= false;
122 pubactions
->pubtruncate
= false;
124 *publish_given
= true;
125 publish
= defGetString(defel
);
127 if (!SplitIdentifierString(publish
, ',', &publish_list
))
129 (errcode(ERRCODE_SYNTAX_ERROR
),
130 errmsg("invalid list syntax in parameter \"%s\"",
133 /* Process the option list. */
134 foreach(lc2
, publish_list
)
136 char *publish_opt
= (char *) lfirst(lc2
);
138 if (strcmp(publish_opt
, "insert") == 0)
139 pubactions
->pubinsert
= true;
140 else if (strcmp(publish_opt
, "update") == 0)
141 pubactions
->pubupdate
= true;
142 else if (strcmp(publish_opt
, "delete") == 0)
143 pubactions
->pubdelete
= true;
144 else if (strcmp(publish_opt
, "truncate") == 0)
145 pubactions
->pubtruncate
= true;
148 (errcode(ERRCODE_SYNTAX_ERROR
),
149 errmsg("unrecognized value for publication option \"%s\": \"%s\"",
150 "publish", publish_opt
)));
153 else if (strcmp(defel
->defname
, "publish_via_partition_root") == 0)
155 if (*publish_via_partition_root_given
)
156 errorConflictingDefElem(defel
, pstate
);
157 *publish_via_partition_root_given
= true;
158 *publish_via_partition_root
= defGetBoolean(defel
);
162 (errcode(ERRCODE_SYNTAX_ERROR
),
163 errmsg("unrecognized publication parameter: \"%s\"", defel
->defname
)));
168 * Convert the PublicationObjSpecType list into schema oid list and
169 * PublicationTable list.
172 ObjectsInPublicationToOids(List
*pubobjspec_list
, ParseState
*pstate
,
173 List
**rels
, List
**schemas
)
176 PublicationObjSpec
*pubobj
;
178 if (!pubobjspec_list
)
181 foreach(cell
, pubobjspec_list
)
186 pubobj
= (PublicationObjSpec
*) lfirst(cell
);
188 switch (pubobj
->pubobjtype
)
190 case PUBLICATIONOBJ_TABLE
:
191 *rels
= lappend(*rels
, pubobj
->pubtable
);
193 case PUBLICATIONOBJ_TABLES_IN_SCHEMA
:
194 schemaid
= get_namespace_oid(pubobj
->name
, false);
196 /* Filter out duplicates if user specifies "sch1, sch1" */
197 *schemas
= list_append_unique_oid(*schemas
, schemaid
);
199 case PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA
:
200 search_path
= fetch_search_path(false);
201 if (search_path
== NIL
) /* nothing valid in search_path? */
203 errcode(ERRCODE_UNDEFINED_SCHEMA
),
204 errmsg("no schema has been selected for CURRENT_SCHEMA"));
206 schemaid
= linitial_oid(search_path
);
207 list_free(search_path
);
209 /* Filter out duplicates if user specifies "sch1, sch1" */
210 *schemas
= list_append_unique_oid(*schemas
, schemaid
);
213 /* shouldn't happen */
214 elog(ERROR
, "invalid publication object type %d", pubobj
->pubobjtype
);
221 * Returns true if any of the columns used in the row filter WHERE expression is
222 * not part of REPLICA IDENTITY, false otherwise.
225 contain_invalid_rfcolumn_walker(Node
*node
, rf_context
*context
)
232 Var
*var
= (Var
*) node
;
233 AttrNumber attnum
= var
->varattno
;
236 * If pubviaroot is true, we are validating the row filter of the
237 * parent table, but the bitmap contains the replica identity
238 * information of the child table. So, get the column number of the
239 * child table as parent and child column order could be different.
241 if (context
->pubviaroot
)
243 char *colname
= get_attname(context
->parentid
, attnum
, false);
245 attnum
= get_attnum(context
->relid
, colname
);
248 if (!bms_is_member(attnum
- FirstLowInvalidHeapAttributeNumber
,
249 context
->bms_replident
))
253 return expression_tree_walker(node
, contain_invalid_rfcolumn_walker
,
258 * Check if all columns referenced in the filter expression are part of the
259 * REPLICA IDENTITY index or not.
261 * Returns true if any invalid column is found.
264 pub_rf_contains_invalid_column(Oid pubid
, Relation relation
, List
*ancestors
,
268 Oid relid
= RelationGetRelid(relation
);
269 Oid publish_as_relid
= RelationGetRelid(relation
);
275 * FULL means all columns are in the REPLICA IDENTITY, so all columns are
276 * allowed in the row filter and we can skip the validation.
278 if (relation
->rd_rel
->relreplident
== REPLICA_IDENTITY_FULL
)
282 * For a partition, if pubviaroot is true, find the topmost ancestor that
283 * is published via this publication as we need to use its row filter
284 * expression to filter the partition's changes.
286 * Note that even though the row filter used is for an ancestor, the
287 * REPLICA IDENTITY used will be for the actual child table.
289 if (pubviaroot
&& relation
->rd_rel
->relispartition
)
292 = GetTopMostAncestorInPublication(pubid
, ancestors
, NULL
);
294 if (!OidIsValid(publish_as_relid
))
295 publish_as_relid
= relid
;
298 rftuple
= SearchSysCache2(PUBLICATIONRELMAP
,
299 ObjectIdGetDatum(publish_as_relid
),
300 ObjectIdGetDatum(pubid
));
302 if (!HeapTupleIsValid(rftuple
))
305 rfdatum
= SysCacheGetAttr(PUBLICATIONRELMAP
, rftuple
,
306 Anum_pg_publication_rel_prqual
,
311 rf_context context
= {0};
313 Bitmapset
*bms
= NULL
;
315 context
.pubviaroot
= pubviaroot
;
316 context
.parentid
= publish_as_relid
;
317 context
.relid
= relid
;
319 /* Remember columns that are part of the REPLICA IDENTITY */
320 bms
= RelationGetIndexAttrBitmap(relation
,
321 INDEX_ATTR_BITMAP_IDENTITY_KEY
);
323 context
.bms_replident
= bms
;
324 rfnode
= stringToNode(TextDatumGetCString(rfdatum
));
325 result
= contain_invalid_rfcolumn_walker(rfnode
, &context
);
328 ReleaseSysCache(rftuple
);
334 * Check if all columns referenced in the REPLICA IDENTITY are covered by
337 * Returns true if any replica identity column is not covered by column list.
340 pub_collist_contains_invalid_column(Oid pubid
, Relation relation
, List
*ancestors
,
344 Oid relid
= RelationGetRelid(relation
);
345 Oid publish_as_relid
= RelationGetRelid(relation
);
351 * For a partition, if pubviaroot is true, find the topmost ancestor that
352 * is published via this publication as we need to use its column list for
355 * Note that even though the column list used is for an ancestor, the
356 * REPLICA IDENTITY used will be for the actual child table.
358 if (pubviaroot
&& relation
->rd_rel
->relispartition
)
360 publish_as_relid
= GetTopMostAncestorInPublication(pubid
, ancestors
, NULL
);
362 if (!OidIsValid(publish_as_relid
))
363 publish_as_relid
= relid
;
366 tuple
= SearchSysCache2(PUBLICATIONRELMAP
,
367 ObjectIdGetDatum(publish_as_relid
),
368 ObjectIdGetDatum(pubid
));
370 if (!HeapTupleIsValid(tuple
))
373 datum
= SysCacheGetAttr(PUBLICATIONRELMAP
, tuple
,
374 Anum_pg_publication_rel_prattrs
,
381 Bitmapset
*columns
= NULL
;
383 /* With REPLICA IDENTITY FULL, no column list is allowed. */
384 if (relation
->rd_rel
->relreplident
== REPLICA_IDENTITY_FULL
)
387 /* Transform the column list datum to a bitmapset. */
388 columns
= pub_collist_to_bitmapset(NULL
, datum
, NULL
);
390 /* Remember columns that are part of the REPLICA IDENTITY */
391 idattrs
= RelationGetIndexAttrBitmap(relation
,
392 INDEX_ATTR_BITMAP_IDENTITY_KEY
);
395 * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are
396 * offset (to handle system columns the usual way), while column list
397 * does not use offset, so we can't do bms_is_subset(). Instead, we
398 * have to loop over the idattrs and check all of them are in the
402 while ((x
= bms_next_member(idattrs
, x
)) >= 0)
404 AttrNumber attnum
= (x
+ FirstLowInvalidHeapAttributeNumber
);
407 * If pubviaroot is true, we are validating the column list of the
408 * parent table, but the bitmap contains the replica identity
409 * information of the child table. The parent/child attnums may
410 * not match, so translate them to the parent - get the attname
411 * from the child, and look it up in the parent.
415 /* attribute name in the child table */
416 char *colname
= get_attname(relid
, attnum
, false);
419 * Determine the attnum for the attribute name in parent (we
420 * are using the column list defined on the parent).
422 attnum
= get_attnum(publish_as_relid
, colname
);
425 /* replica identity column, not covered by the column list */
426 if (!bms_is_member(attnum
, columns
))
437 ReleaseSysCache(tuple
);
442 /* check_functions_in_node callback */
444 contain_mutable_or_user_functions_checker(Oid func_id
, void *context
)
446 return (func_volatile(func_id
) != PROVOLATILE_IMMUTABLE
||
447 func_id
>= FirstNormalObjectId
);
451 * The row filter walker checks if the row filter expression is a "simple
454 * It allows only simple or compound expressions such as:
457 * - (Var Op Const) AND/OR (Var Op Const)
459 * (where Var is a column of the table this filter belongs to)
461 * The simple expression has the following restrictions:
462 * - User-defined operators are not allowed;
463 * - User-defined functions are not allowed;
464 * - User-defined types are not allowed;
465 * - User-defined collations are not allowed;
466 * - Non-immutable built-in functions are not allowed;
467 * - System columns are not allowed.
471 * We don't allow user-defined functions/operators/types/collations because
472 * (a) if a user drops a user-defined object used in a row filter expression or
473 * if there is any other error while using it, the logical decoding
474 * infrastructure won't be able to recover from such an error even if the
475 * object is recreated again because a historic snapshot is used to evaluate
477 * (b) a user-defined function can be used to access tables that could have
478 * unpleasant results because a historic snapshot is used. That's why only
479 * immutable built-in functions are allowed in row filter expressions.
481 * We don't allow system columns because currently, we don't have that
482 * information in the tuple passed to downstream. Also, as we don't replicate
483 * those to subscribers, there doesn't seem to be a need for a filter on those
486 * We can allow other node types after more analysis and testing.
489 check_simple_rowfilter_expr_walker(Node
*node
, ParseState
*pstate
)
491 char *errdetail_msg
= NULL
;
496 switch (nodeTag(node
))
499 /* System columns are not allowed. */
500 if (((Var
*) node
)->varattno
< InvalidAttrNumber
)
501 errdetail_msg
= _("System columns are not allowed.");
506 /* OK, except user-defined operators are not allowed. */
507 if (((OpExpr
*) node
)->opno
>= FirstNormalObjectId
)
508 errdetail_msg
= _("User-defined operators are not allowed.");
510 case T_ScalarArrayOpExpr
:
511 /* OK, except user-defined operators are not allowed. */
512 if (((ScalarArrayOpExpr
*) node
)->opno
>= FirstNormalObjectId
)
513 errdetail_msg
= _("User-defined operators are not allowed.");
516 * We don't need to check the hashfuncid and negfuncid of
517 * ScalarArrayOpExpr as those functions are only built for a
521 case T_RowCompareExpr
:
525 /* OK, except user-defined operators are not allowed. */
526 foreach(opid
, ((RowCompareExpr
*) node
)->opnos
)
528 if (lfirst_oid(opid
) >= FirstNormalObjectId
)
530 errdetail_msg
= _("User-defined operators are not allowed.");
554 errdetail_msg
= _("Only columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions are allowed.");
559 * For all the supported nodes, if we haven't already found a problem,
560 * check the types, functions, and collations used in it. We check List
561 * by walking through each element.
563 if (!errdetail_msg
&& !IsA(node
, List
))
565 if (exprType(node
) >= FirstNormalObjectId
)
566 errdetail_msg
= _("User-defined types are not allowed.");
567 else if (check_functions_in_node(node
, contain_mutable_or_user_functions_checker
,
569 errdetail_msg
= _("User-defined or built-in mutable functions are not allowed.");
570 else if (exprCollation(node
) >= FirstNormalObjectId
||
571 exprInputCollation(node
) >= FirstNormalObjectId
)
572 errdetail_msg
= _("User-defined collations are not allowed.");
576 * If we found a problem in this node, throw error now. Otherwise keep
581 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED
),
582 errmsg("invalid publication WHERE expression"),
583 errdetail_internal("%s", errdetail_msg
),
584 parser_errposition(pstate
, exprLocation(node
))));
586 return expression_tree_walker(node
, check_simple_rowfilter_expr_walker
,
591 * Check if the row filter expression is a "simple expression".
593 * See check_simple_rowfilter_expr_walker for details.
596 check_simple_rowfilter_expr(Node
*node
, ParseState
*pstate
)
598 return check_simple_rowfilter_expr_walker(node
, pstate
);
602 * Transform the publication WHERE expression for all the relations in the list,
603 * ensuring it is coerced to boolean and necessary collation information is
604 * added if required, and add a new nsitem/RTE for the associated relation to
605 * the ParseState's namespace list.
607 * Also check the publication row filter expression and throw an error if
608 * anything not permitted or unexpected is encountered.
611 TransformPubWhereClauses(List
*tables
, const char *queryString
,
618 ParseNamespaceItem
*nsitem
;
619 Node
*whereclause
= NULL
;
621 PublicationRelInfo
*pri
= (PublicationRelInfo
*) lfirst(lc
);
623 if (pri
->whereClause
== NULL
)
627 * If the publication doesn't publish changes via the root partitioned
628 * table, the partition's row filter will be used. So disallow using
629 * WHERE clause on partitioned table in this case.
632 pri
->relation
->rd_rel
->relkind
== RELKIND_PARTITIONED_TABLE
)
634 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
635 errmsg("cannot use publication WHERE clause for relation \"%s\"",
636 RelationGetRelationName(pri
->relation
)),
637 errdetail("WHERE clause cannot be used for a partitioned table when %s is false.",
638 "publish_via_partition_root")));
641 * A fresh pstate is required so that we only have "this" table in its
644 pstate
= make_parsestate(NULL
);
645 pstate
->p_sourcetext
= queryString
;
646 nsitem
= addRangeTableEntryForRelation(pstate
, pri
->relation
,
647 AccessShareLock
, NULL
,
649 addNSItemToQuery(pstate
, nsitem
, false, true, true);
651 whereclause
= transformWhereClause(pstate
,
652 copyObject(pri
->whereClause
),
654 "PUBLICATION WHERE");
656 /* Fix up collation information */
657 assign_expr_collations(pstate
, whereclause
);
660 * We allow only simple expressions in row filters. See
661 * check_simple_rowfilter_expr_walker.
663 check_simple_rowfilter_expr(whereclause
, pstate
);
665 free_parsestate(pstate
);
667 pri
->whereClause
= whereclause
;
673 * Given a list of tables that are going to be added to a publication,
674 * verify that they fulfill the necessary preconditions, namely: no tables
675 * have a column list if any schema is published; and partitioned tables do
676 * not have column lists if publish_via_partition_root is not set.
678 * 'publish_schema' indicates that the publication contains any TABLES IN
679 * SCHEMA elements (newly added in this command, or preexisting).
680 * 'pubviaroot' is the value of publish_via_partition_root.
683 CheckPubRelationColumnList(char *pubname
, List
*tables
,
684 bool publish_schema
, bool pubviaroot
)
690 PublicationRelInfo
*pri
= (PublicationRelInfo
*) lfirst(lc
);
692 if (pri
->columns
== NIL
)
696 * Disallow specifying column list if any schema is in the
699 * XXX We could instead just forbid the case when the publication
700 * tries to publish the table with a column list and a schema for that
701 * table. However, if we do that then we need a restriction during
702 * ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't
703 * seem to be a good idea.
707 errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
708 errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
709 get_namespace_name(RelationGetNamespace(pri
->relation
)),
710 RelationGetRelationName(pri
->relation
), pubname
),
711 errdetail("Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements."));
714 * If the publication doesn't publish changes via the root partitioned
715 * table, the partition's column list will be used. So disallow using
716 * a column list on the partitioned table in this case.
719 pri
->relation
->rd_rel
->relkind
== RELKIND_PARTITIONED_TABLE
)
721 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
722 errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
723 get_namespace_name(RelationGetNamespace(pri
->relation
)),
724 RelationGetRelationName(pri
->relation
), pubname
),
725 errdetail("Column lists cannot be specified for partitioned tables when %s is false.",
726 "publish_via_partition_root")));
731 * Create new publication.
734 CreatePublication(ParseState
*pstate
, CreatePublicationStmt
*stmt
)
737 ObjectAddress myself
;
739 bool nulls
[Natts_pg_publication
];
740 Datum values
[Natts_pg_publication
];
743 PublicationActions pubactions
;
744 bool publish_via_partition_root_given
;
745 bool publish_via_partition_root
;
747 List
*relations
= NIL
;
748 List
*schemaidlist
= NIL
;
750 /* must have CREATE privilege on database */
751 aclresult
= pg_database_aclcheck(MyDatabaseId
, GetUserId(), ACL_CREATE
);
752 if (aclresult
!= ACLCHECK_OK
)
753 aclcheck_error(aclresult
, OBJECT_DATABASE
,
754 get_database_name(MyDatabaseId
));
756 /* FOR ALL TABLES requires superuser */
757 if (stmt
->for_all_tables
&& !superuser())
759 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE
),
760 errmsg("must be superuser to create FOR ALL TABLES publication")));
762 rel
= table_open(PublicationRelationId
, RowExclusiveLock
);
764 /* Check if name is used */
765 puboid
= GetSysCacheOid1(PUBLICATIONNAME
, Anum_pg_publication_oid
,
766 CStringGetDatum(stmt
->pubname
));
767 if (OidIsValid(puboid
))
769 (errcode(ERRCODE_DUPLICATE_OBJECT
),
770 errmsg("publication \"%s\" already exists",
774 memset(values
, 0, sizeof(values
));
775 memset(nulls
, false, sizeof(nulls
));
777 values
[Anum_pg_publication_pubname
- 1] =
778 DirectFunctionCall1(namein
, CStringGetDatum(stmt
->pubname
));
779 values
[Anum_pg_publication_pubowner
- 1] = ObjectIdGetDatum(GetUserId());
781 parse_publication_options(pstate
,
783 &publish_given
, &pubactions
,
784 &publish_via_partition_root_given
,
785 &publish_via_partition_root
);
787 puboid
= GetNewOidWithIndex(rel
, PublicationObjectIndexId
,
788 Anum_pg_publication_oid
);
789 values
[Anum_pg_publication_oid
- 1] = ObjectIdGetDatum(puboid
);
790 values
[Anum_pg_publication_puballtables
- 1] =
791 BoolGetDatum(stmt
->for_all_tables
);
792 values
[Anum_pg_publication_pubinsert
- 1] =
793 BoolGetDatum(pubactions
.pubinsert
);
794 values
[Anum_pg_publication_pubupdate
- 1] =
795 BoolGetDatum(pubactions
.pubupdate
);
796 values
[Anum_pg_publication_pubdelete
- 1] =
797 BoolGetDatum(pubactions
.pubdelete
);
798 values
[Anum_pg_publication_pubtruncate
- 1] =
799 BoolGetDatum(pubactions
.pubtruncate
);
800 values
[Anum_pg_publication_pubviaroot
- 1] =
801 BoolGetDatum(publish_via_partition_root
);
803 tup
= heap_form_tuple(RelationGetDescr(rel
), values
, nulls
);
805 /* Insert tuple into catalog. */
806 CatalogTupleInsert(rel
, tup
);
809 recordDependencyOnOwner(PublicationRelationId
, puboid
, GetUserId());
811 ObjectAddressSet(myself
, PublicationRelationId
, puboid
);
813 /* Make the changes visible. */
814 CommandCounterIncrement();
816 /* Associate objects with the publication. */
817 if (stmt
->for_all_tables
)
819 /* Invalidate relcache so that publication info is rebuilt. */
820 CacheInvalidateRelcacheAll();
824 ObjectsInPublicationToOids(stmt
->pubobjects
, pstate
, &relations
,
827 /* FOR TABLES IN SCHEMA requires superuser */
828 if (schemaidlist
!= NIL
&& !superuser())
830 errcode(ERRCODE_INSUFFICIENT_PRIVILEGE
),
831 errmsg("must be superuser to create FOR TABLES IN SCHEMA publication"));
833 if (relations
!= NIL
)
837 rels
= OpenTableList(relations
);
838 TransformPubWhereClauses(rels
, pstate
->p_sourcetext
,
839 publish_via_partition_root
);
841 CheckPubRelationColumnList(stmt
->pubname
, rels
,
843 publish_via_partition_root
);
845 PublicationAddTables(puboid
, rels
, true, NULL
);
846 CloseTableList(rels
);
849 if (schemaidlist
!= NIL
)
852 * Schema lock is held until the publication is created to prevent
853 * concurrent schema deletion.
855 LockSchemaList(schemaidlist
);
856 PublicationAddSchemas(puboid
, schemaidlist
, true, NULL
);
860 table_close(rel
, RowExclusiveLock
);
862 InvokeObjectPostCreateHook(PublicationRelationId
, puboid
, 0);
864 if (wal_level
!= WAL_LEVEL_LOGICAL
)
866 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE
),
867 errmsg("wal_level is insufficient to publish logical changes"),
868 errhint("Set wal_level to \"logical\" before creating subscriptions.")));
874 * Change options of a publication.
877 AlterPublicationOptions(ParseState
*pstate
, AlterPublicationStmt
*stmt
,
878 Relation rel
, HeapTuple tup
)
880 bool nulls
[Natts_pg_publication
];
881 bool replaces
[Natts_pg_publication
];
882 Datum values
[Natts_pg_publication
];
884 PublicationActions pubactions
;
885 bool publish_via_partition_root_given
;
886 bool publish_via_partition_root
;
888 Form_pg_publication pubform
;
889 List
*root_relids
= NIL
;
892 parse_publication_options(pstate
,
894 &publish_given
, &pubactions
,
895 &publish_via_partition_root_given
,
896 &publish_via_partition_root
);
898 pubform
= (Form_pg_publication
) GETSTRUCT(tup
);
901 * If the publication doesn't publish changes via the root partitioned
902 * table, the partition's row filter and column list will be used. So
903 * disallow using WHERE clause and column lists on partitioned table in
906 if (!pubform
->puballtables
&& publish_via_partition_root_given
&&
907 !publish_via_partition_root
)
910 * Lock the publication so nobody else can do anything with it. This
911 * prevents concurrent alter to add partitioned table(s) with WHERE
912 * clause(s) and/or column lists which we don't allow when not
913 * publishing via root.
915 LockDatabaseObject(PublicationRelationId
, pubform
->oid
, 0,
918 root_relids
= GetPublicationRelations(pubform
->oid
,
919 PUBLICATION_PART_ROOT
);
921 foreach(lc
, root_relids
)
923 Oid relid
= lfirst_oid(lc
);
931 * Beware: we don't have lock on the relations, so cope silently
932 * with the cache lookups returning NULL.
935 rftuple
= SearchSysCache2(PUBLICATIONRELMAP
,
936 ObjectIdGetDatum(relid
),
937 ObjectIdGetDatum(pubform
->oid
));
938 if (!HeapTupleIsValid(rftuple
))
940 has_rowfilter
= !heap_attisnull(rftuple
, Anum_pg_publication_rel_prqual
, NULL
);
941 has_collist
= !heap_attisnull(rftuple
, Anum_pg_publication_rel_prattrs
, NULL
);
942 if (!has_rowfilter
&& !has_collist
)
944 ReleaseSysCache(rftuple
);
948 relkind
= get_rel_relkind(relid
);
949 if (relkind
!= RELKIND_PARTITIONED_TABLE
)
951 ReleaseSysCache(rftuple
);
954 relname
= get_rel_name(relid
);
955 if (relname
== NULL
) /* table concurrently dropped */
957 ReleaseSysCache(rftuple
);
963 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
964 errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
965 "publish_via_partition_root",
967 errdetail("The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
968 relname
, "publish_via_partition_root")));
971 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
972 errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
973 "publish_via_partition_root",
975 errdetail("The publication contains a column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
976 relname
, "publish_via_partition_root")));
980 /* Everything ok, form a new tuple. */
981 memset(values
, 0, sizeof(values
));
982 memset(nulls
, false, sizeof(nulls
));
983 memset(replaces
, false, sizeof(replaces
));
987 values
[Anum_pg_publication_pubinsert
- 1] = BoolGetDatum(pubactions
.pubinsert
);
988 replaces
[Anum_pg_publication_pubinsert
- 1] = true;
990 values
[Anum_pg_publication_pubupdate
- 1] = BoolGetDatum(pubactions
.pubupdate
);
991 replaces
[Anum_pg_publication_pubupdate
- 1] = true;
993 values
[Anum_pg_publication_pubdelete
- 1] = BoolGetDatum(pubactions
.pubdelete
);
994 replaces
[Anum_pg_publication_pubdelete
- 1] = true;
996 values
[Anum_pg_publication_pubtruncate
- 1] = BoolGetDatum(pubactions
.pubtruncate
);
997 replaces
[Anum_pg_publication_pubtruncate
- 1] = true;
1000 if (publish_via_partition_root_given
)
1002 values
[Anum_pg_publication_pubviaroot
- 1] = BoolGetDatum(publish_via_partition_root
);
1003 replaces
[Anum_pg_publication_pubviaroot
- 1] = true;
1006 tup
= heap_modify_tuple(tup
, RelationGetDescr(rel
), values
, nulls
,
1009 /* Update the catalog. */
1010 CatalogTupleUpdate(rel
, &tup
->t_self
, tup
);
1012 CommandCounterIncrement();
1014 pubform
= (Form_pg_publication
) GETSTRUCT(tup
);
1016 /* Invalidate the relcache. */
1017 if (pubform
->puballtables
)
1019 CacheInvalidateRelcacheAll();
1024 List
*schemarelids
= NIL
;
1027 * For any partitioned tables contained in the publication, we must
1028 * invalidate all partitions contained in the respective partition
1029 * trees, not just those explicitly mentioned in the publication.
1031 if (root_relids
== NIL
)
1032 relids
= GetPublicationRelations(pubform
->oid
,
1033 PUBLICATION_PART_ALL
);
1037 * We already got tables explicitly mentioned in the publication.
1038 * Now get all partitions for the partitioned table in the list.
1040 foreach(lc
, root_relids
)
1041 relids
= GetPubPartitionOptionRelations(relids
,
1042 PUBLICATION_PART_ALL
,
1046 schemarelids
= GetAllSchemaPublicationRelations(pubform
->oid
,
1047 PUBLICATION_PART_ALL
);
1048 relids
= list_concat_unique_oid(relids
, schemarelids
);
1050 InvalidatePublicationRels(relids
);
1053 ObjectAddressSet(obj
, PublicationRelationId
, pubform
->oid
);
1054 EventTriggerCollectSimpleCommand(obj
, InvalidObjectAddress
,
1057 InvokeObjectPostAlterHook(PublicationRelationId
, pubform
->oid
, 0);
1061 * Invalidate the relations.
1064 InvalidatePublicationRels(List
*relids
)
1067 * We don't want to send too many individual messages, at some point it's
1068 * cheaper to just reset whole relcache.
1070 if (list_length(relids
) < MAX_RELCACHE_INVAL_MSGS
)
1075 CacheInvalidateRelcacheByRelid(lfirst_oid(lc
));
1078 CacheInvalidateRelcacheAll();
1082 * Add or remove table to/from publication.
1085 AlterPublicationTables(AlterPublicationStmt
*stmt
, HeapTuple tup
,
1086 List
*tables
, const char *queryString
,
1087 bool publish_schema
)
1090 Form_pg_publication pubform
= (Form_pg_publication
) GETSTRUCT(tup
);
1091 Oid pubid
= pubform
->oid
;
1094 * Nothing to do if no objects, except in SET: for that it is quite
1095 * possible that user has not specified any tables in which case we need
1096 * to remove all the existing tables.
1098 if (!tables
&& stmt
->action
!= AP_SetObjects
)
1101 rels
= OpenTableList(tables
);
1103 if (stmt
->action
== AP_AddObjects
)
1105 TransformPubWhereClauses(rels
, queryString
, pubform
->pubviaroot
);
1107 publish_schema
|= is_schema_publication(pubid
);
1109 CheckPubRelationColumnList(stmt
->pubname
, rels
, publish_schema
,
1110 pubform
->pubviaroot
);
1112 PublicationAddTables(pubid
, rels
, false, stmt
);
1114 else if (stmt
->action
== AP_DropObjects
)
1115 PublicationDropTables(pubid
, rels
, false);
1116 else /* AP_SetObjects */
1118 List
*oldrelids
= GetPublicationRelations(pubid
,
1119 PUBLICATION_PART_ROOT
);
1120 List
*delrels
= NIL
;
1123 TransformPubWhereClauses(rels
, queryString
, pubform
->pubviaroot
);
1125 CheckPubRelationColumnList(stmt
->pubname
, rels
, publish_schema
,
1126 pubform
->pubviaroot
);
1129 * To recreate the relation list for the publication, look for
1130 * existing relations that do not need to be dropped.
1132 foreach(oldlc
, oldrelids
)
1134 Oid oldrelid
= lfirst_oid(oldlc
);
1136 PublicationRelInfo
*oldrel
;
1139 Node
*oldrelwhereclause
= NULL
;
1140 Bitmapset
*oldcolumns
= NULL
;
1142 /* look up the cache for the old relmap */
1143 rftuple
= SearchSysCache2(PUBLICATIONRELMAP
,
1144 ObjectIdGetDatum(oldrelid
),
1145 ObjectIdGetDatum(pubid
));
1148 * See if the existing relation currently has a WHERE clause or a
1149 * column list. We need to compare those too.
1151 if (HeapTupleIsValid(rftuple
))
1154 Datum whereClauseDatum
;
1155 Datum columnListDatum
;
1157 /* Load the WHERE clause for this table. */
1158 whereClauseDatum
= SysCacheGetAttr(PUBLICATIONRELMAP
, rftuple
,
1159 Anum_pg_publication_rel_prqual
,
1162 oldrelwhereclause
= stringToNode(TextDatumGetCString(whereClauseDatum
));
1164 /* Transform the int2vector column list to a bitmap. */
1165 columnListDatum
= SysCacheGetAttr(PUBLICATIONRELMAP
, rftuple
,
1166 Anum_pg_publication_rel_prattrs
,
1170 oldcolumns
= pub_collist_to_bitmapset(NULL
, columnListDatum
, NULL
);
1172 ReleaseSysCache(rftuple
);
1175 foreach(newlc
, rels
)
1177 PublicationRelInfo
*newpubrel
;
1179 Bitmapset
*newcolumns
= NULL
;
1181 newpubrel
= (PublicationRelInfo
*) lfirst(newlc
);
1182 newrelid
= RelationGetRelid(newpubrel
->relation
);
1185 * If the new publication has column list, transform it to a
1188 if (newpubrel
->columns
)
1192 foreach(lc
, newpubrel
->columns
)
1194 char *colname
= strVal(lfirst(lc
));
1195 AttrNumber attnum
= get_attnum(newrelid
, colname
);
1197 newcolumns
= bms_add_member(newcolumns
, attnum
);
1202 * Check if any of the new set of relations matches with the
1203 * existing relations in the publication. Additionally, if the
1204 * relation has an associated WHERE clause, check the WHERE
1205 * expressions also match. Same for the column list. Drop the
1208 if (RelationGetRelid(newpubrel
->relation
) == oldrelid
)
1210 if (equal(oldrelwhereclause
, newpubrel
->whereClause
) &&
1211 bms_equal(oldcolumns
, newcolumns
))
1220 * Add the non-matched relations to a list so that they can be
1225 oldrel
= palloc(sizeof(PublicationRelInfo
));
1226 oldrel
->whereClause
= NULL
;
1227 oldrel
->columns
= NIL
;
1228 oldrel
->relation
= table_open(oldrelid
,
1229 ShareUpdateExclusiveLock
);
1230 delrels
= lappend(delrels
, oldrel
);
1234 /* And drop them. */
1235 PublicationDropTables(pubid
, delrels
, true);
1238 * Don't bother calculating the difference for adding, we'll catch and
1239 * skip existing ones when doing catalog update.
1241 PublicationAddTables(pubid
, rels
, true, stmt
);
1243 CloseTableList(delrels
);
1246 CloseTableList(rels
);
1250 * Alter the publication schemas.
1252 * Add or remove schemas to/from publication.
1255 AlterPublicationSchemas(AlterPublicationStmt
*stmt
,
1256 HeapTuple tup
, List
*schemaidlist
)
1258 Form_pg_publication pubform
= (Form_pg_publication
) GETSTRUCT(tup
);
1261 * Nothing to do if no objects, except in SET: for that it is quite
1262 * possible that user has not specified any schemas in which case we need
1263 * to remove all the existing schemas.
1265 if (!schemaidlist
&& stmt
->action
!= AP_SetObjects
)
1269 * Schema lock is held until the publication is altered to prevent
1270 * concurrent schema deletion.
1272 LockSchemaList(schemaidlist
);
1273 if (stmt
->action
== AP_AddObjects
)
1278 reloids
= GetPublicationRelations(pubform
->oid
, PUBLICATION_PART_ROOT
);
1280 foreach(lc
, reloids
)
1284 coltuple
= SearchSysCache2(PUBLICATIONRELMAP
,
1285 ObjectIdGetDatum(lfirst_oid(lc
)),
1286 ObjectIdGetDatum(pubform
->oid
));
1288 if (!HeapTupleIsValid(coltuple
))
1292 * Disallow adding schema if column list is already part of the
1293 * publication. See CheckPubRelationColumnList.
1295 if (!heap_attisnull(coltuple
, Anum_pg_publication_rel_prattrs
, NULL
))
1297 errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
1298 errmsg("cannot add schema to publication \"%s\"",
1300 errdetail("Schemas cannot be added if any tables that specify a column list are already part of the publication."));
1302 ReleaseSysCache(coltuple
);
1305 PublicationAddSchemas(pubform
->oid
, schemaidlist
, false, stmt
);
1307 else if (stmt
->action
== AP_DropObjects
)
1308 PublicationDropSchemas(pubform
->oid
, schemaidlist
, false);
1309 else /* AP_SetObjects */
1311 List
*oldschemaids
= GetPublicationSchemas(pubform
->oid
);
1312 List
*delschemas
= NIL
;
1314 /* Identify which schemas should be dropped */
1315 delschemas
= list_difference_oid(oldschemaids
, schemaidlist
);
1318 * Schema lock is held until the publication is altered to prevent
1319 * concurrent schema deletion.
1321 LockSchemaList(delschemas
);
1324 PublicationDropSchemas(pubform
->oid
, delschemas
, true);
1327 * Don't bother calculating the difference for adding, we'll catch and
1328 * skip existing ones when doing catalog update.
1330 PublicationAddSchemas(pubform
->oid
, schemaidlist
, true, stmt
);
1335 * Check if relations and schemas can be in a given publication and throw
1336 * appropriate error if not.
1339 CheckAlterPublication(AlterPublicationStmt
*stmt
, HeapTuple tup
,
1340 List
*tables
, List
*schemaidlist
)
1342 Form_pg_publication pubform
= (Form_pg_publication
) GETSTRUCT(tup
);
1344 if ((stmt
->action
== AP_AddObjects
|| stmt
->action
== AP_SetObjects
) &&
1345 schemaidlist
&& !superuser())
1347 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE
),
1348 errmsg("must be superuser to add or set schemas")));
1351 * Check that user is allowed to manipulate the publication tables in
1354 if (schemaidlist
&& pubform
->puballtables
)
1356 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE
),
1357 errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1358 NameStr(pubform
->pubname
)),
1359 errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications.")));
1361 /* Check that user is allowed to manipulate the publication tables. */
1362 if (tables
&& pubform
->puballtables
)
1364 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE
),
1365 errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1366 NameStr(pubform
->pubname
)),
1367 errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
1371 * Alter the existing publication.
1373 * This is dispatcher function for AlterPublicationOptions,
1374 * AlterPublicationSchemas and AlterPublicationTables.
1377 AlterPublication(ParseState
*pstate
, AlterPublicationStmt
*stmt
)
1381 Form_pg_publication pubform
;
1383 rel
= table_open(PublicationRelationId
, RowExclusiveLock
);
1385 tup
= SearchSysCacheCopy1(PUBLICATIONNAME
,
1386 CStringGetDatum(stmt
->pubname
));
1388 if (!HeapTupleIsValid(tup
))
1390 (errcode(ERRCODE_UNDEFINED_OBJECT
),
1391 errmsg("publication \"%s\" does not exist",
1394 pubform
= (Form_pg_publication
) GETSTRUCT(tup
);
1397 if (!object_ownercheck(PublicationRelationId
, pubform
->oid
, GetUserId()))
1398 aclcheck_error(ACLCHECK_NOT_OWNER
, OBJECT_PUBLICATION
,
1402 AlterPublicationOptions(pstate
, stmt
, rel
, tup
);
1405 List
*relations
= NIL
;
1406 List
*schemaidlist
= NIL
;
1407 Oid pubid
= pubform
->oid
;
1409 ObjectsInPublicationToOids(stmt
->pubobjects
, pstate
, &relations
,
1412 CheckAlterPublication(stmt
, tup
, relations
, schemaidlist
);
1414 heap_freetuple(tup
);
1416 /* Lock the publication so nobody else can do anything with it. */
1417 LockDatabaseObject(PublicationRelationId
, pubid
, 0,
1418 AccessExclusiveLock
);
1421 * It is possible that by the time we acquire the lock on publication,
1422 * concurrent DDL has removed it. We can test this by checking the
1423 * existence of publication. We get the tuple again to avoid the risk
1424 * of any publication option getting changed.
1426 tup
= SearchSysCacheCopy1(PUBLICATIONOID
, ObjectIdGetDatum(pubid
));
1427 if (!HeapTupleIsValid(tup
))
1429 errcode(ERRCODE_UNDEFINED_OBJECT
),
1430 errmsg("publication \"%s\" does not exist",
1433 AlterPublicationTables(stmt
, tup
, relations
, pstate
->p_sourcetext
,
1434 schemaidlist
!= NIL
);
1435 AlterPublicationSchemas(stmt
, tup
, schemaidlist
);
1439 heap_freetuple(tup
);
1440 table_close(rel
, RowExclusiveLock
);
1444 * Remove relation from publication by mapping OID.
1447 RemovePublicationRelById(Oid proid
)
1451 Form_pg_publication_rel pubrel
;
1454 rel
= table_open(PublicationRelRelationId
, RowExclusiveLock
);
1456 tup
= SearchSysCache1(PUBLICATIONREL
, ObjectIdGetDatum(proid
));
1458 if (!HeapTupleIsValid(tup
))
1459 elog(ERROR
, "cache lookup failed for publication table %u",
1462 pubrel
= (Form_pg_publication_rel
) GETSTRUCT(tup
);
1465 * Invalidate relcache so that publication info is rebuilt.
1467 * For the partitioned tables, we must invalidate all partitions contained
1468 * in the respective partition hierarchies, not just the one explicitly
1469 * mentioned in the publication. This is required because we implicitly
1470 * publish the child tables when the parent table is published.
1472 relids
= GetPubPartitionOptionRelations(relids
, PUBLICATION_PART_ALL
,
1475 InvalidatePublicationRels(relids
);
1477 CatalogTupleDelete(rel
, &tup
->t_self
);
1479 ReleaseSysCache(tup
);
1481 table_close(rel
, RowExclusiveLock
);
1485 * Remove the publication by mapping OID.
1488 RemovePublicationById(Oid pubid
)
1492 Form_pg_publication pubform
;
1494 rel
= table_open(PublicationRelationId
, RowExclusiveLock
);
1496 tup
= SearchSysCache1(PUBLICATIONOID
, ObjectIdGetDatum(pubid
));
1497 if (!HeapTupleIsValid(tup
))
1498 elog(ERROR
, "cache lookup failed for publication %u", pubid
);
1500 pubform
= (Form_pg_publication
) GETSTRUCT(tup
);
1502 /* Invalidate relcache so that publication info is rebuilt. */
1503 if (pubform
->puballtables
)
1504 CacheInvalidateRelcacheAll();
1506 CatalogTupleDelete(rel
, &tup
->t_self
);
1508 ReleaseSysCache(tup
);
1510 table_close(rel
, RowExclusiveLock
);
1514 * Remove schema from publication by mapping OID.
1517 RemovePublicationSchemaById(Oid psoid
)
1521 List
*schemaRels
= NIL
;
1522 Form_pg_publication_namespace pubsch
;
1524 rel
= table_open(PublicationNamespaceRelationId
, RowExclusiveLock
);
1526 tup
= SearchSysCache1(PUBLICATIONNAMESPACE
, ObjectIdGetDatum(psoid
));
1528 if (!HeapTupleIsValid(tup
))
1529 elog(ERROR
, "cache lookup failed for publication schema %u", psoid
);
1531 pubsch
= (Form_pg_publication_namespace
) GETSTRUCT(tup
);
1534 * Invalidate relcache so that publication info is rebuilt. See
1535 * RemovePublicationRelById for why we need to consider all the
1538 schemaRels
= GetSchemaPublicationRelations(pubsch
->pnnspid
,
1539 PUBLICATION_PART_ALL
);
1540 InvalidatePublicationRels(schemaRels
);
1542 CatalogTupleDelete(rel
, &tup
->t_self
);
1544 ReleaseSysCache(tup
);
1546 table_close(rel
, RowExclusiveLock
);
1550 * Open relations specified by a PublicationTable list.
1551 * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
1552 * add them to a publication.
1555 OpenTableList(List
*tables
)
1560 List
*relids_with_rf
= NIL
;
1561 List
*relids_with_collist
= NIL
;
1564 * Open, share-lock, and check all the explicitly-specified relations
1568 PublicationTable
*t
= lfirst_node(PublicationTable
, lc
);
1569 bool recurse
= t
->relation
->inh
;
1572 PublicationRelInfo
*pub_rel
;
1574 /* Allow query cancel in case this takes a long time */
1575 CHECK_FOR_INTERRUPTS();
1577 rel
= table_openrv(t
->relation
, ShareUpdateExclusiveLock
);
1578 myrelid
= RelationGetRelid(rel
);
1581 * Filter out duplicates if user specifies "foo, foo".
1583 * Note that this algorithm is known to not be very efficient (O(N^2))
1584 * but given that it only works on list of tables given to us by user
1585 * it's deemed acceptable.
1587 if (list_member_oid(relids
, myrelid
))
1589 /* Disallow duplicate tables if there are any with row filters. */
1590 if (t
->whereClause
|| list_member_oid(relids_with_rf
, myrelid
))
1592 (errcode(ERRCODE_DUPLICATE_OBJECT
),
1593 errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1594 RelationGetRelationName(rel
))));
1596 /* Disallow duplicate tables if there are any with column lists. */
1597 if (t
->columns
|| list_member_oid(relids_with_collist
, myrelid
))
1599 (errcode(ERRCODE_DUPLICATE_OBJECT
),
1600 errmsg("conflicting or redundant column lists for table \"%s\"",
1601 RelationGetRelationName(rel
))));
1603 table_close(rel
, ShareUpdateExclusiveLock
);
1607 pub_rel
= palloc(sizeof(PublicationRelInfo
));
1608 pub_rel
->relation
= rel
;
1609 pub_rel
->whereClause
= t
->whereClause
;
1610 pub_rel
->columns
= t
->columns
;
1611 rels
= lappend(rels
, pub_rel
);
1612 relids
= lappend_oid(relids
, myrelid
);
1615 relids_with_rf
= lappend_oid(relids_with_rf
, myrelid
);
1618 relids_with_collist
= lappend_oid(relids_with_collist
, myrelid
);
1621 * Add children of this rel, if requested, so that they too are added
1622 * to the publication. A partitioned table can't have any inheritance
1623 * children other than its partitions, which need not be explicitly
1624 * added to the publication.
1626 if (recurse
&& rel
->rd_rel
->relkind
!= RELKIND_PARTITIONED_TABLE
)
1631 children
= find_all_inheritors(myrelid
, ShareUpdateExclusiveLock
,
1634 foreach(child
, children
)
1636 Oid childrelid
= lfirst_oid(child
);
1638 /* Allow query cancel in case this takes a long time */
1639 CHECK_FOR_INTERRUPTS();
1642 * Skip duplicates if user specified both parent and child
1645 if (list_member_oid(relids
, childrelid
))
1648 * We don't allow to specify row filter for both parent
1649 * and child table at the same time as it is not very
1650 * clear which one should be given preference.
1652 if (childrelid
!= myrelid
&&
1653 (t
->whereClause
|| list_member_oid(relids_with_rf
, childrelid
)))
1655 (errcode(ERRCODE_DUPLICATE_OBJECT
),
1656 errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1657 RelationGetRelationName(rel
))));
1660 * We don't allow to specify column list for both parent
1661 * and child table at the same time as it is not very
1662 * clear which one should be given preference.
1664 if (childrelid
!= myrelid
&&
1665 (t
->columns
|| list_member_oid(relids_with_collist
, childrelid
)))
1667 (errcode(ERRCODE_DUPLICATE_OBJECT
),
1668 errmsg("conflicting or redundant column lists for table \"%s\"",
1669 RelationGetRelationName(rel
))));
1674 /* find_all_inheritors already got lock */
1675 rel
= table_open(childrelid
, NoLock
);
1676 pub_rel
= palloc(sizeof(PublicationRelInfo
));
1677 pub_rel
->relation
= rel
;
1678 /* child inherits WHERE clause from parent */
1679 pub_rel
->whereClause
= t
->whereClause
;
1681 /* child inherits column list from parent */
1682 pub_rel
->columns
= t
->columns
;
1683 rels
= lappend(rels
, pub_rel
);
1684 relids
= lappend_oid(relids
, childrelid
);
1687 relids_with_rf
= lappend_oid(relids_with_rf
, childrelid
);
1690 relids_with_collist
= lappend_oid(relids_with_collist
, childrelid
);
1696 list_free(relids_with_rf
);
1702 * Close all relations in the list.
1705 CloseTableList(List
*rels
)
1711 PublicationRelInfo
*pub_rel
;
1713 pub_rel
= (PublicationRelInfo
*) lfirst(lc
);
1714 table_close(pub_rel
->relation
, NoLock
);
1717 list_free_deep(rels
);
1721 * Lock the schemas specified in the schema list in AccessShareLock mode in
1722 * order to prevent concurrent schema deletion.
1725 LockSchemaList(List
*schemalist
)
1729 foreach(lc
, schemalist
)
1731 Oid schemaid
= lfirst_oid(lc
);
1733 /* Allow query cancel in case this takes a long time */
1734 CHECK_FOR_INTERRUPTS();
1735 LockDatabaseObject(NamespaceRelationId
, schemaid
, 0, AccessShareLock
);
1738 * It is possible that by the time we acquire the lock on schema,
1739 * concurrent DDL has removed it. We can test this by checking the
1740 * existence of schema.
1742 if (!SearchSysCacheExists1(NAMESPACEOID
, ObjectIdGetDatum(schemaid
)))
1744 errcode(ERRCODE_UNDEFINED_SCHEMA
),
1745 errmsg("schema with OID %u does not exist", schemaid
));
1750 * Add listed tables to the publication.
1753 PublicationAddTables(Oid pubid
, List
*rels
, bool if_not_exists
,
1754 AlterPublicationStmt
*stmt
)
1758 Assert(!stmt
|| !stmt
->for_all_tables
);
1762 PublicationRelInfo
*pub_rel
= (PublicationRelInfo
*) lfirst(lc
);
1763 Relation rel
= pub_rel
->relation
;
1766 /* Must be owner of the table or superuser. */
1767 if (!object_ownercheck(RelationRelationId
, RelationGetRelid(rel
), GetUserId()))
1768 aclcheck_error(ACLCHECK_NOT_OWNER
, get_relkind_objtype(rel
->rd_rel
->relkind
),
1769 RelationGetRelationName(rel
));
1771 obj
= publication_add_relation(pubid
, pub_rel
, if_not_exists
);
1774 EventTriggerCollectSimpleCommand(obj
, InvalidObjectAddress
,
1777 InvokeObjectPostCreateHook(PublicationRelRelationId
,
1784 * Remove listed tables from the publication.
1787 PublicationDropTables(Oid pubid
, List
*rels
, bool missing_ok
)
1795 PublicationRelInfo
*pubrel
= (PublicationRelInfo
*) lfirst(lc
);
1796 Relation rel
= pubrel
->relation
;
1797 Oid relid
= RelationGetRelid(rel
);
1799 if (pubrel
->columns
)
1801 errcode(ERRCODE_SYNTAX_ERROR
),
1802 errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
1804 prid
= GetSysCacheOid2(PUBLICATIONRELMAP
, Anum_pg_publication_rel_oid
,
1805 ObjectIdGetDatum(relid
),
1806 ObjectIdGetDatum(pubid
));
1807 if (!OidIsValid(prid
))
1813 (errcode(ERRCODE_UNDEFINED_OBJECT
),
1814 errmsg("relation \"%s\" is not part of the publication",
1815 RelationGetRelationName(rel
))));
1818 if (pubrel
->whereClause
)
1820 (errcode(ERRCODE_SYNTAX_ERROR
),
1821 errmsg("cannot use a WHERE clause when removing a table from a publication")));
1823 ObjectAddressSet(obj
, PublicationRelRelationId
, prid
);
1824 performDeletion(&obj
, DROP_CASCADE
, 0);
1829 * Add listed schemas to the publication.
1832 PublicationAddSchemas(Oid pubid
, List
*schemas
, bool if_not_exists
,
1833 AlterPublicationStmt
*stmt
)
1837 Assert(!stmt
|| !stmt
->for_all_tables
);
1839 foreach(lc
, schemas
)
1841 Oid schemaid
= lfirst_oid(lc
);
1844 obj
= publication_add_schema(pubid
, schemaid
, if_not_exists
);
1847 EventTriggerCollectSimpleCommand(obj
, InvalidObjectAddress
,
1850 InvokeObjectPostCreateHook(PublicationNamespaceRelationId
,
1857 * Remove listed schemas from the publication.
1860 PublicationDropSchemas(Oid pubid
, List
*schemas
, bool missing_ok
)
1866 foreach(lc
, schemas
)
1868 Oid schemaid
= lfirst_oid(lc
);
1870 psid
= GetSysCacheOid2(PUBLICATIONNAMESPACEMAP
,
1871 Anum_pg_publication_namespace_oid
,
1872 ObjectIdGetDatum(schemaid
),
1873 ObjectIdGetDatum(pubid
));
1874 if (!OidIsValid(psid
))
1880 (errcode(ERRCODE_UNDEFINED_OBJECT
),
1881 errmsg("tables from schema \"%s\" are not part of the publication",
1882 get_namespace_name(schemaid
))));
1885 ObjectAddressSet(obj
, PublicationNamespaceRelationId
, psid
);
1886 performDeletion(&obj
, DROP_CASCADE
, 0);
1891 * Internal workhorse for changing a publication owner
1894 AlterPublicationOwner_internal(Relation rel
, HeapTuple tup
, Oid newOwnerId
)
1896 Form_pg_publication form
;
1898 form
= (Form_pg_publication
) GETSTRUCT(tup
);
1900 if (form
->pubowner
== newOwnerId
)
1905 AclResult aclresult
;
1908 if (!object_ownercheck(PublicationRelationId
, form
->oid
, GetUserId()))
1909 aclcheck_error(ACLCHECK_NOT_OWNER
, OBJECT_PUBLICATION
,
1910 NameStr(form
->pubname
));
1912 /* Must be able to become new owner */
1913 check_is_member_of_role(GetUserId(), newOwnerId
);
1915 /* New owner must have CREATE privilege on database */
1916 aclresult
= pg_database_aclcheck(MyDatabaseId
, newOwnerId
, ACL_CREATE
);
1917 if (aclresult
!= ACLCHECK_OK
)
1918 aclcheck_error(aclresult
, OBJECT_DATABASE
,
1919 get_database_name(MyDatabaseId
));
1921 if (form
->puballtables
&& !superuser_arg(newOwnerId
))
1923 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE
),
1924 errmsg("permission denied to change owner of publication \"%s\"",
1925 NameStr(form
->pubname
)),
1926 errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
1928 if (!superuser_arg(newOwnerId
) && is_schema_publication(form
->oid
))
1930 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE
),
1931 errmsg("permission denied to change owner of publication \"%s\"",
1932 NameStr(form
->pubname
)),
1933 errhint("The owner of a FOR TABLES IN SCHEMA publication must be a superuser.")));
1936 form
->pubowner
= newOwnerId
;
1937 CatalogTupleUpdate(rel
, &tup
->t_self
, tup
);
1939 /* Update owner dependency reference */
1940 changeDependencyOnOwner(PublicationRelationId
,
1944 InvokeObjectPostAlterHook(PublicationRelationId
,
1949 * Change publication owner -- by name
1952 AlterPublicationOwner(const char *name
, Oid newOwnerId
)
1957 ObjectAddress address
;
1958 Form_pg_publication pubform
;
1960 rel
= table_open(PublicationRelationId
, RowExclusiveLock
);
1962 tup
= SearchSysCacheCopy1(PUBLICATIONNAME
, CStringGetDatum(name
));
1964 if (!HeapTupleIsValid(tup
))
1966 (errcode(ERRCODE_UNDEFINED_OBJECT
),
1967 errmsg("publication \"%s\" does not exist", name
)));
1969 pubform
= (Form_pg_publication
) GETSTRUCT(tup
);
1970 subid
= pubform
->oid
;
1972 AlterPublicationOwner_internal(rel
, tup
, newOwnerId
);
1974 ObjectAddressSet(address
, PublicationRelationId
, subid
);
1976 heap_freetuple(tup
);
1978 table_close(rel
, RowExclusiveLock
);
1984 * Change publication owner -- by OID
1987 AlterPublicationOwner_oid(Oid subid
, Oid newOwnerId
)
1992 rel
= table_open(PublicationRelationId
, RowExclusiveLock
);
1994 tup
= SearchSysCacheCopy1(PUBLICATIONOID
, ObjectIdGetDatum(subid
));
1996 if (!HeapTupleIsValid(tup
))
1998 (errcode(ERRCODE_UNDEFINED_OBJECT
),
1999 errmsg("publication with OID %u does not exist", subid
)));
2001 AlterPublicationOwner_internal(rel
, tup
, newOwnerId
);
2003 heap_freetuple(tup
);
2005 table_close(rel
, RowExclusiveLock
);