Refactor ownercheck functions
[pgsql.git] / src / backend / commands / publicationcmds.c
blob8428e9e7b2204744757fe921df7d5cbf0c097b22
1 /*-------------------------------------------------------------------------
3 * publicationcmds.c
4 * publication manipulation
6 * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
9 * IDENTIFICATION
10 * src/backend/commands/publicationcmds.c
12 *-------------------------------------------------------------------------
15 #include "postgres.h"
17 #include "access/genam.h"
18 #include "access/htup_details.h"
19 #include "access/table.h"
20 #include "access/xact.h"
21 #include "catalog/catalog.h"
22 #include "catalog/indexing.h"
23 #include "catalog/namespace.h"
24 #include "catalog/objectaccess.h"
25 #include "catalog/objectaddress.h"
26 #include "catalog/partition.h"
27 #include "catalog/pg_inherits.h"
28 #include "catalog/pg_namespace.h"
29 #include "catalog/pg_proc.h"
30 #include "catalog/pg_publication.h"
31 #include "catalog/pg_publication_namespace.h"
32 #include "catalog/pg_publication_rel.h"
33 #include "catalog/pg_type.h"
34 #include "commands/dbcommands.h"
35 #include "commands/defrem.h"
36 #include "commands/event_trigger.h"
37 #include "commands/publicationcmds.h"
38 #include "funcapi.h"
39 #include "miscadmin.h"
40 #include "nodes/nodeFuncs.h"
41 #include "parser/parse_clause.h"
42 #include "parser/parse_collate.h"
43 #include "parser/parse_relation.h"
44 #include "storage/lmgr.h"
45 #include "utils/acl.h"
46 #include "utils/array.h"
47 #include "utils/builtins.h"
48 #include "utils/catcache.h"
49 #include "utils/fmgroids.h"
50 #include "utils/inval.h"
51 #include "utils/lsyscache.h"
52 #include "utils/rel.h"
53 #include "utils/syscache.h"
54 #include "utils/varlena.h"
58 * Information used to validate the columns in the row filter expression. See
59 * contain_invalid_rfcolumn_walker for details.
61 typedef struct rf_context
63 Bitmapset *bms_replident; /* bitset of replica identity columns */
64 bool pubviaroot; /* true if we are validating the parent
65 * relation's row filter */
66 Oid relid; /* relid of the relation */
67 Oid parentid; /* relid of the parent relation */
68 } rf_context;
70 static List *OpenTableList(List *tables);
71 static void CloseTableList(List *rels);
72 static void LockSchemaList(List *schemalist);
73 static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
74 AlterPublicationStmt *stmt);
75 static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok);
76 static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
77 AlterPublicationStmt *stmt);
78 static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok);
81 static void
82 parse_publication_options(ParseState *pstate,
83 List *options,
84 bool *publish_given,
85 PublicationActions *pubactions,
86 bool *publish_via_partition_root_given,
87 bool *publish_via_partition_root)
89 ListCell *lc;
91 *publish_given = false;
92 *publish_via_partition_root_given = false;
94 /* defaults */
95 pubactions->pubinsert = true;
96 pubactions->pubupdate = true;
97 pubactions->pubdelete = true;
98 pubactions->pubtruncate = true;
99 *publish_via_partition_root = false;
101 /* Parse options */
102 foreach(lc, options)
104 DefElem *defel = (DefElem *) lfirst(lc);
106 if (strcmp(defel->defname, "publish") == 0)
108 char *publish;
109 List *publish_list;
110 ListCell *lc2;
112 if (*publish_given)
113 errorConflictingDefElem(defel, pstate);
116 * If publish option was given only the explicitly listed actions
117 * should be published.
119 pubactions->pubinsert = false;
120 pubactions->pubupdate = false;
121 pubactions->pubdelete = false;
122 pubactions->pubtruncate = false;
124 *publish_given = true;
125 publish = defGetString(defel);
127 if (!SplitIdentifierString(publish, ',', &publish_list))
128 ereport(ERROR,
129 (errcode(ERRCODE_SYNTAX_ERROR),
130 errmsg("invalid list syntax in parameter \"%s\"",
131 "publish")));
133 /* Process the option list. */
134 foreach(lc2, publish_list)
136 char *publish_opt = (char *) lfirst(lc2);
138 if (strcmp(publish_opt, "insert") == 0)
139 pubactions->pubinsert = true;
140 else if (strcmp(publish_opt, "update") == 0)
141 pubactions->pubupdate = true;
142 else if (strcmp(publish_opt, "delete") == 0)
143 pubactions->pubdelete = true;
144 else if (strcmp(publish_opt, "truncate") == 0)
145 pubactions->pubtruncate = true;
146 else
147 ereport(ERROR,
148 (errcode(ERRCODE_SYNTAX_ERROR),
149 errmsg("unrecognized value for publication option \"%s\": \"%s\"",
150 "publish", publish_opt)));
153 else if (strcmp(defel->defname, "publish_via_partition_root") == 0)
155 if (*publish_via_partition_root_given)
156 errorConflictingDefElem(defel, pstate);
157 *publish_via_partition_root_given = true;
158 *publish_via_partition_root = defGetBoolean(defel);
160 else
161 ereport(ERROR,
162 (errcode(ERRCODE_SYNTAX_ERROR),
163 errmsg("unrecognized publication parameter: \"%s\"", defel->defname)));
168 * Convert the PublicationObjSpecType list into schema oid list and
169 * PublicationTable list.
171 static void
172 ObjectsInPublicationToOids(List *pubobjspec_list, ParseState *pstate,
173 List **rels, List **schemas)
175 ListCell *cell;
176 PublicationObjSpec *pubobj;
178 if (!pubobjspec_list)
179 return;
181 foreach(cell, pubobjspec_list)
183 Oid schemaid;
184 List *search_path;
186 pubobj = (PublicationObjSpec *) lfirst(cell);
188 switch (pubobj->pubobjtype)
190 case PUBLICATIONOBJ_TABLE:
191 *rels = lappend(*rels, pubobj->pubtable);
192 break;
193 case PUBLICATIONOBJ_TABLES_IN_SCHEMA:
194 schemaid = get_namespace_oid(pubobj->name, false);
196 /* Filter out duplicates if user specifies "sch1, sch1" */
197 *schemas = list_append_unique_oid(*schemas, schemaid);
198 break;
199 case PUBLICATIONOBJ_TABLES_IN_CUR_SCHEMA:
200 search_path = fetch_search_path(false);
201 if (search_path == NIL) /* nothing valid in search_path? */
202 ereport(ERROR,
203 errcode(ERRCODE_UNDEFINED_SCHEMA),
204 errmsg("no schema has been selected for CURRENT_SCHEMA"));
206 schemaid = linitial_oid(search_path);
207 list_free(search_path);
209 /* Filter out duplicates if user specifies "sch1, sch1" */
210 *schemas = list_append_unique_oid(*schemas, schemaid);
211 break;
212 default:
213 /* shouldn't happen */
214 elog(ERROR, "invalid publication object type %d", pubobj->pubobjtype);
215 break;
221 * Returns true if any of the columns used in the row filter WHERE expression is
222 * not part of REPLICA IDENTITY, false otherwise.
224 static bool
225 contain_invalid_rfcolumn_walker(Node *node, rf_context *context)
227 if (node == NULL)
228 return false;
230 if (IsA(node, Var))
232 Var *var = (Var *) node;
233 AttrNumber attnum = var->varattno;
236 * If pubviaroot is true, we are validating the row filter of the
237 * parent table, but the bitmap contains the replica identity
238 * information of the child table. So, get the column number of the
239 * child table as parent and child column order could be different.
241 if (context->pubviaroot)
243 char *colname = get_attname(context->parentid, attnum, false);
245 attnum = get_attnum(context->relid, colname);
248 if (!bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber,
249 context->bms_replident))
250 return true;
253 return expression_tree_walker(node, contain_invalid_rfcolumn_walker,
254 (void *) context);
258 * Check if all columns referenced in the filter expression are part of the
259 * REPLICA IDENTITY index or not.
261 * Returns true if any invalid column is found.
263 bool
264 pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
265 bool pubviaroot)
267 HeapTuple rftuple;
268 Oid relid = RelationGetRelid(relation);
269 Oid publish_as_relid = RelationGetRelid(relation);
270 bool result = false;
271 Datum rfdatum;
272 bool rfisnull;
275 * FULL means all columns are in the REPLICA IDENTITY, so all columns are
276 * allowed in the row filter and we can skip the validation.
278 if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
279 return false;
282 * For a partition, if pubviaroot is true, find the topmost ancestor that
283 * is published via this publication as we need to use its row filter
284 * expression to filter the partition's changes.
286 * Note that even though the row filter used is for an ancestor, the
287 * REPLICA IDENTITY used will be for the actual child table.
289 if (pubviaroot && relation->rd_rel->relispartition)
291 publish_as_relid
292 = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
294 if (!OidIsValid(publish_as_relid))
295 publish_as_relid = relid;
298 rftuple = SearchSysCache2(PUBLICATIONRELMAP,
299 ObjectIdGetDatum(publish_as_relid),
300 ObjectIdGetDatum(pubid));
302 if (!HeapTupleIsValid(rftuple))
303 return false;
305 rfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
306 Anum_pg_publication_rel_prqual,
307 &rfisnull);
309 if (!rfisnull)
311 rf_context context = {0};
312 Node *rfnode;
313 Bitmapset *bms = NULL;
315 context.pubviaroot = pubviaroot;
316 context.parentid = publish_as_relid;
317 context.relid = relid;
319 /* Remember columns that are part of the REPLICA IDENTITY */
320 bms = RelationGetIndexAttrBitmap(relation,
321 INDEX_ATTR_BITMAP_IDENTITY_KEY);
323 context.bms_replident = bms;
324 rfnode = stringToNode(TextDatumGetCString(rfdatum));
325 result = contain_invalid_rfcolumn_walker(rfnode, &context);
328 ReleaseSysCache(rftuple);
330 return result;
334 * Check if all columns referenced in the REPLICA IDENTITY are covered by
335 * the column list.
337 * Returns true if any replica identity column is not covered by column list.
339 bool
340 pub_collist_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
341 bool pubviaroot)
343 HeapTuple tuple;
344 Oid relid = RelationGetRelid(relation);
345 Oid publish_as_relid = RelationGetRelid(relation);
346 bool result = false;
347 Datum datum;
348 bool isnull;
351 * For a partition, if pubviaroot is true, find the topmost ancestor that
352 * is published via this publication as we need to use its column list for
353 * the changes.
355 * Note that even though the column list used is for an ancestor, the
356 * REPLICA IDENTITY used will be for the actual child table.
358 if (pubviaroot && relation->rd_rel->relispartition)
360 publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL);
362 if (!OidIsValid(publish_as_relid))
363 publish_as_relid = relid;
366 tuple = SearchSysCache2(PUBLICATIONRELMAP,
367 ObjectIdGetDatum(publish_as_relid),
368 ObjectIdGetDatum(pubid));
370 if (!HeapTupleIsValid(tuple))
371 return false;
373 datum = SysCacheGetAttr(PUBLICATIONRELMAP, tuple,
374 Anum_pg_publication_rel_prattrs,
375 &isnull);
377 if (!isnull)
379 int x;
380 Bitmapset *idattrs;
381 Bitmapset *columns = NULL;
383 /* With REPLICA IDENTITY FULL, no column list is allowed. */
384 if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
385 result = true;
387 /* Transform the column list datum to a bitmapset. */
388 columns = pub_collist_to_bitmapset(NULL, datum, NULL);
390 /* Remember columns that are part of the REPLICA IDENTITY */
391 idattrs = RelationGetIndexAttrBitmap(relation,
392 INDEX_ATTR_BITMAP_IDENTITY_KEY);
395 * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are
396 * offset (to handle system columns the usual way), while column list
397 * does not use offset, so we can't do bms_is_subset(). Instead, we
398 * have to loop over the idattrs and check all of them are in the
399 * list.
401 x = -1;
402 while ((x = bms_next_member(idattrs, x)) >= 0)
404 AttrNumber attnum = (x + FirstLowInvalidHeapAttributeNumber);
407 * If pubviaroot is true, we are validating the column list of the
408 * parent table, but the bitmap contains the replica identity
409 * information of the child table. The parent/child attnums may
410 * not match, so translate them to the parent - get the attname
411 * from the child, and look it up in the parent.
413 if (pubviaroot)
415 /* attribute name in the child table */
416 char *colname = get_attname(relid, attnum, false);
419 * Determine the attnum for the attribute name in parent (we
420 * are using the column list defined on the parent).
422 attnum = get_attnum(publish_as_relid, colname);
425 /* replica identity column, not covered by the column list */
426 if (!bms_is_member(attnum, columns))
428 result = true;
429 break;
433 bms_free(idattrs);
434 bms_free(columns);
437 ReleaseSysCache(tuple);
439 return result;
442 /* check_functions_in_node callback */
443 static bool
444 contain_mutable_or_user_functions_checker(Oid func_id, void *context)
446 return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE ||
447 func_id >= FirstNormalObjectId);
451 * The row filter walker checks if the row filter expression is a "simple
452 * expression".
454 * It allows only simple or compound expressions such as:
455 * - (Var Op Const)
456 * - (Var Op Var)
457 * - (Var Op Const) AND/OR (Var Op Const)
458 * - etc
459 * (where Var is a column of the table this filter belongs to)
461 * The simple expression has the following restrictions:
462 * - User-defined operators are not allowed;
463 * - User-defined functions are not allowed;
464 * - User-defined types are not allowed;
465 * - User-defined collations are not allowed;
466 * - Non-immutable built-in functions are not allowed;
467 * - System columns are not allowed.
469 * NOTES
471 * We don't allow user-defined functions/operators/types/collations because
472 * (a) if a user drops a user-defined object used in a row filter expression or
473 * if there is any other error while using it, the logical decoding
474 * infrastructure won't be able to recover from such an error even if the
475 * object is recreated again because a historic snapshot is used to evaluate
476 * the row filter;
477 * (b) a user-defined function can be used to access tables that could have
478 * unpleasant results because a historic snapshot is used. That's why only
479 * immutable built-in functions are allowed in row filter expressions.
481 * We don't allow system columns because currently, we don't have that
482 * information in the tuple passed to downstream. Also, as we don't replicate
483 * those to subscribers, there doesn't seem to be a need for a filter on those
484 * columns.
486 * We can allow other node types after more analysis and testing.
488 static bool
489 check_simple_rowfilter_expr_walker(Node *node, ParseState *pstate)
491 char *errdetail_msg = NULL;
493 if (node == NULL)
494 return false;
496 switch (nodeTag(node))
498 case T_Var:
499 /* System columns are not allowed. */
500 if (((Var *) node)->varattno < InvalidAttrNumber)
501 errdetail_msg = _("System columns are not allowed.");
502 break;
503 case T_OpExpr:
504 case T_DistinctExpr:
505 case T_NullIfExpr:
506 /* OK, except user-defined operators are not allowed. */
507 if (((OpExpr *) node)->opno >= FirstNormalObjectId)
508 errdetail_msg = _("User-defined operators are not allowed.");
509 break;
510 case T_ScalarArrayOpExpr:
511 /* OK, except user-defined operators are not allowed. */
512 if (((ScalarArrayOpExpr *) node)->opno >= FirstNormalObjectId)
513 errdetail_msg = _("User-defined operators are not allowed.");
516 * We don't need to check the hashfuncid and negfuncid of
517 * ScalarArrayOpExpr as those functions are only built for a
518 * subquery.
520 break;
521 case T_RowCompareExpr:
523 ListCell *opid;
525 /* OK, except user-defined operators are not allowed. */
526 foreach(opid, ((RowCompareExpr *) node)->opnos)
528 if (lfirst_oid(opid) >= FirstNormalObjectId)
530 errdetail_msg = _("User-defined operators are not allowed.");
531 break;
535 break;
536 case T_Const:
537 case T_FuncExpr:
538 case T_BoolExpr:
539 case T_RelabelType:
540 case T_CollateExpr:
541 case T_CaseExpr:
542 case T_CaseTestExpr:
543 case T_ArrayExpr:
544 case T_RowExpr:
545 case T_CoalesceExpr:
546 case T_MinMaxExpr:
547 case T_XmlExpr:
548 case T_NullTest:
549 case T_BooleanTest:
550 case T_List:
551 /* OK, supported */
552 break;
553 default:
554 errdetail_msg = _("Only columns, constants, built-in operators, built-in data types, built-in collations, and immutable built-in functions are allowed.");
555 break;
559 * For all the supported nodes, if we haven't already found a problem,
560 * check the types, functions, and collations used in it. We check List
561 * by walking through each element.
563 if (!errdetail_msg && !IsA(node, List))
565 if (exprType(node) >= FirstNormalObjectId)
566 errdetail_msg = _("User-defined types are not allowed.");
567 else if (check_functions_in_node(node, contain_mutable_or_user_functions_checker,
568 (void *) pstate))
569 errdetail_msg = _("User-defined or built-in mutable functions are not allowed.");
570 else if (exprCollation(node) >= FirstNormalObjectId ||
571 exprInputCollation(node) >= FirstNormalObjectId)
572 errdetail_msg = _("User-defined collations are not allowed.");
576 * If we found a problem in this node, throw error now. Otherwise keep
577 * going.
579 if (errdetail_msg)
580 ereport(ERROR,
581 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
582 errmsg("invalid publication WHERE expression"),
583 errdetail_internal("%s", errdetail_msg),
584 parser_errposition(pstate, exprLocation(node))));
586 return expression_tree_walker(node, check_simple_rowfilter_expr_walker,
587 (void *) pstate);
591 * Check if the row filter expression is a "simple expression".
593 * See check_simple_rowfilter_expr_walker for details.
595 static bool
596 check_simple_rowfilter_expr(Node *node, ParseState *pstate)
598 return check_simple_rowfilter_expr_walker(node, pstate);
602 * Transform the publication WHERE expression for all the relations in the list,
603 * ensuring it is coerced to boolean and necessary collation information is
604 * added if required, and add a new nsitem/RTE for the associated relation to
605 * the ParseState's namespace list.
607 * Also check the publication row filter expression and throw an error if
608 * anything not permitted or unexpected is encountered.
610 static void
611 TransformPubWhereClauses(List *tables, const char *queryString,
612 bool pubviaroot)
614 ListCell *lc;
616 foreach(lc, tables)
618 ParseNamespaceItem *nsitem;
619 Node *whereclause = NULL;
620 ParseState *pstate;
621 PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
623 if (pri->whereClause == NULL)
624 continue;
627 * If the publication doesn't publish changes via the root partitioned
628 * table, the partition's row filter will be used. So disallow using
629 * WHERE clause on partitioned table in this case.
631 if (!pubviaroot &&
632 pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
633 ereport(ERROR,
634 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
635 errmsg("cannot use publication WHERE clause for relation \"%s\"",
636 RelationGetRelationName(pri->relation)),
637 errdetail("WHERE clause cannot be used for a partitioned table when %s is false.",
638 "publish_via_partition_root")));
641 * A fresh pstate is required so that we only have "this" table in its
642 * rangetable
644 pstate = make_parsestate(NULL);
645 pstate->p_sourcetext = queryString;
646 nsitem = addRangeTableEntryForRelation(pstate, pri->relation,
647 AccessShareLock, NULL,
648 false, false);
649 addNSItemToQuery(pstate, nsitem, false, true, true);
651 whereclause = transformWhereClause(pstate,
652 copyObject(pri->whereClause),
653 EXPR_KIND_WHERE,
654 "PUBLICATION WHERE");
656 /* Fix up collation information */
657 assign_expr_collations(pstate, whereclause);
660 * We allow only simple expressions in row filters. See
661 * check_simple_rowfilter_expr_walker.
663 check_simple_rowfilter_expr(whereclause, pstate);
665 free_parsestate(pstate);
667 pri->whereClause = whereclause;
673 * Given a list of tables that are going to be added to a publication,
674 * verify that they fulfill the necessary preconditions, namely: no tables
675 * have a column list if any schema is published; and partitioned tables do
676 * not have column lists if publish_via_partition_root is not set.
678 * 'publish_schema' indicates that the publication contains any TABLES IN
679 * SCHEMA elements (newly added in this command, or preexisting).
680 * 'pubviaroot' is the value of publish_via_partition_root.
682 static void
683 CheckPubRelationColumnList(char *pubname, List *tables,
684 bool publish_schema, bool pubviaroot)
686 ListCell *lc;
688 foreach(lc, tables)
690 PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc);
692 if (pri->columns == NIL)
693 continue;
696 * Disallow specifying column list if any schema is in the
697 * publication.
699 * XXX We could instead just forbid the case when the publication
700 * tries to publish the table with a column list and a schema for that
701 * table. However, if we do that then we need a restriction during
702 * ALTER TABLE ... SET SCHEMA to prevent such a case which doesn't
703 * seem to be a good idea.
705 if (publish_schema)
706 ereport(ERROR,
707 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
708 errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
709 get_namespace_name(RelationGetNamespace(pri->relation)),
710 RelationGetRelationName(pri->relation), pubname),
711 errdetail("Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements."));
714 * If the publication doesn't publish changes via the root partitioned
715 * table, the partition's column list will be used. So disallow using
716 * a column list on the partitioned table in this case.
718 if (!pubviaroot &&
719 pri->relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
720 ereport(ERROR,
721 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
722 errmsg("cannot use column list for relation \"%s.%s\" in publication \"%s\"",
723 get_namespace_name(RelationGetNamespace(pri->relation)),
724 RelationGetRelationName(pri->relation), pubname),
725 errdetail("Column lists cannot be specified for partitioned tables when %s is false.",
726 "publish_via_partition_root")));
731 * Create new publication.
733 ObjectAddress
734 CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
736 Relation rel;
737 ObjectAddress myself;
738 Oid puboid;
739 bool nulls[Natts_pg_publication];
740 Datum values[Natts_pg_publication];
741 HeapTuple tup;
742 bool publish_given;
743 PublicationActions pubactions;
744 bool publish_via_partition_root_given;
745 bool publish_via_partition_root;
746 AclResult aclresult;
747 List *relations = NIL;
748 List *schemaidlist = NIL;
750 /* must have CREATE privilege on database */
751 aclresult = pg_database_aclcheck(MyDatabaseId, GetUserId(), ACL_CREATE);
752 if (aclresult != ACLCHECK_OK)
753 aclcheck_error(aclresult, OBJECT_DATABASE,
754 get_database_name(MyDatabaseId));
756 /* FOR ALL TABLES requires superuser */
757 if (stmt->for_all_tables && !superuser())
758 ereport(ERROR,
759 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
760 errmsg("must be superuser to create FOR ALL TABLES publication")));
762 rel = table_open(PublicationRelationId, RowExclusiveLock);
764 /* Check if name is used */
765 puboid = GetSysCacheOid1(PUBLICATIONNAME, Anum_pg_publication_oid,
766 CStringGetDatum(stmt->pubname));
767 if (OidIsValid(puboid))
768 ereport(ERROR,
769 (errcode(ERRCODE_DUPLICATE_OBJECT),
770 errmsg("publication \"%s\" already exists",
771 stmt->pubname)));
773 /* Form a tuple. */
774 memset(values, 0, sizeof(values));
775 memset(nulls, false, sizeof(nulls));
777 values[Anum_pg_publication_pubname - 1] =
778 DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
779 values[Anum_pg_publication_pubowner - 1] = ObjectIdGetDatum(GetUserId());
781 parse_publication_options(pstate,
782 stmt->options,
783 &publish_given, &pubactions,
784 &publish_via_partition_root_given,
785 &publish_via_partition_root);
787 puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId,
788 Anum_pg_publication_oid);
789 values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid);
790 values[Anum_pg_publication_puballtables - 1] =
791 BoolGetDatum(stmt->for_all_tables);
792 values[Anum_pg_publication_pubinsert - 1] =
793 BoolGetDatum(pubactions.pubinsert);
794 values[Anum_pg_publication_pubupdate - 1] =
795 BoolGetDatum(pubactions.pubupdate);
796 values[Anum_pg_publication_pubdelete - 1] =
797 BoolGetDatum(pubactions.pubdelete);
798 values[Anum_pg_publication_pubtruncate - 1] =
799 BoolGetDatum(pubactions.pubtruncate);
800 values[Anum_pg_publication_pubviaroot - 1] =
801 BoolGetDatum(publish_via_partition_root);
803 tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
805 /* Insert tuple into catalog. */
806 CatalogTupleInsert(rel, tup);
807 heap_freetuple(tup);
809 recordDependencyOnOwner(PublicationRelationId, puboid, GetUserId());
811 ObjectAddressSet(myself, PublicationRelationId, puboid);
813 /* Make the changes visible. */
814 CommandCounterIncrement();
816 /* Associate objects with the publication. */
817 if (stmt->for_all_tables)
819 /* Invalidate relcache so that publication info is rebuilt. */
820 CacheInvalidateRelcacheAll();
822 else
824 ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
825 &schemaidlist);
827 /* FOR TABLES IN SCHEMA requires superuser */
828 if (schemaidlist != NIL && !superuser())
829 ereport(ERROR,
830 errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
831 errmsg("must be superuser to create FOR TABLES IN SCHEMA publication"));
833 if (relations != NIL)
835 List *rels;
837 rels = OpenTableList(relations);
838 TransformPubWhereClauses(rels, pstate->p_sourcetext,
839 publish_via_partition_root);
841 CheckPubRelationColumnList(stmt->pubname, rels,
842 schemaidlist != NIL,
843 publish_via_partition_root);
845 PublicationAddTables(puboid, rels, true, NULL);
846 CloseTableList(rels);
849 if (schemaidlist != NIL)
852 * Schema lock is held until the publication is created to prevent
853 * concurrent schema deletion.
855 LockSchemaList(schemaidlist);
856 PublicationAddSchemas(puboid, schemaidlist, true, NULL);
860 table_close(rel, RowExclusiveLock);
862 InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
864 if (wal_level != WAL_LEVEL_LOGICAL)
865 ereport(WARNING,
866 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
867 errmsg("wal_level is insufficient to publish logical changes"),
868 errhint("Set wal_level to \"logical\" before creating subscriptions.")));
870 return myself;
874 * Change options of a publication.
876 static void
877 AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt,
878 Relation rel, HeapTuple tup)
880 bool nulls[Natts_pg_publication];
881 bool replaces[Natts_pg_publication];
882 Datum values[Natts_pg_publication];
883 bool publish_given;
884 PublicationActions pubactions;
885 bool publish_via_partition_root_given;
886 bool publish_via_partition_root;
887 ObjectAddress obj;
888 Form_pg_publication pubform;
889 List *root_relids = NIL;
890 ListCell *lc;
892 parse_publication_options(pstate,
893 stmt->options,
894 &publish_given, &pubactions,
895 &publish_via_partition_root_given,
896 &publish_via_partition_root);
898 pubform = (Form_pg_publication) GETSTRUCT(tup);
901 * If the publication doesn't publish changes via the root partitioned
902 * table, the partition's row filter and column list will be used. So
903 * disallow using WHERE clause and column lists on partitioned table in
904 * this case.
906 if (!pubform->puballtables && publish_via_partition_root_given &&
907 !publish_via_partition_root)
910 * Lock the publication so nobody else can do anything with it. This
911 * prevents concurrent alter to add partitioned table(s) with WHERE
912 * clause(s) and/or column lists which we don't allow when not
913 * publishing via root.
915 LockDatabaseObject(PublicationRelationId, pubform->oid, 0,
916 AccessShareLock);
918 root_relids = GetPublicationRelations(pubform->oid,
919 PUBLICATION_PART_ROOT);
921 foreach(lc, root_relids)
923 Oid relid = lfirst_oid(lc);
924 HeapTuple rftuple;
925 char relkind;
926 char *relname;
927 bool has_rowfilter;
928 bool has_collist;
931 * Beware: we don't have lock on the relations, so cope silently
932 * with the cache lookups returning NULL.
935 rftuple = SearchSysCache2(PUBLICATIONRELMAP,
936 ObjectIdGetDatum(relid),
937 ObjectIdGetDatum(pubform->oid));
938 if (!HeapTupleIsValid(rftuple))
939 continue;
940 has_rowfilter = !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL);
941 has_collist = !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL);
942 if (!has_rowfilter && !has_collist)
944 ReleaseSysCache(rftuple);
945 continue;
948 relkind = get_rel_relkind(relid);
949 if (relkind != RELKIND_PARTITIONED_TABLE)
951 ReleaseSysCache(rftuple);
952 continue;
954 relname = get_rel_name(relid);
955 if (relname == NULL) /* table concurrently dropped */
957 ReleaseSysCache(rftuple);
958 continue;
961 if (has_rowfilter)
962 ereport(ERROR,
963 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
964 errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
965 "publish_via_partition_root",
966 stmt->pubname),
967 errdetail("The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
968 relname, "publish_via_partition_root")));
969 Assert(has_collist);
970 ereport(ERROR,
971 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
972 errmsg("cannot set parameter \"%s\" to false for publication \"%s\"",
973 "publish_via_partition_root",
974 stmt->pubname),
975 errdetail("The publication contains a column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.",
976 relname, "publish_via_partition_root")));
980 /* Everything ok, form a new tuple. */
981 memset(values, 0, sizeof(values));
982 memset(nulls, false, sizeof(nulls));
983 memset(replaces, false, sizeof(replaces));
985 if (publish_given)
987 values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert);
988 replaces[Anum_pg_publication_pubinsert - 1] = true;
990 values[Anum_pg_publication_pubupdate - 1] = BoolGetDatum(pubactions.pubupdate);
991 replaces[Anum_pg_publication_pubupdate - 1] = true;
993 values[Anum_pg_publication_pubdelete - 1] = BoolGetDatum(pubactions.pubdelete);
994 replaces[Anum_pg_publication_pubdelete - 1] = true;
996 values[Anum_pg_publication_pubtruncate - 1] = BoolGetDatum(pubactions.pubtruncate);
997 replaces[Anum_pg_publication_pubtruncate - 1] = true;
1000 if (publish_via_partition_root_given)
1002 values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root);
1003 replaces[Anum_pg_publication_pubviaroot - 1] = true;
1006 tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
1007 replaces);
1009 /* Update the catalog. */
1010 CatalogTupleUpdate(rel, &tup->t_self, tup);
1012 CommandCounterIncrement();
1014 pubform = (Form_pg_publication) GETSTRUCT(tup);
1016 /* Invalidate the relcache. */
1017 if (pubform->puballtables)
1019 CacheInvalidateRelcacheAll();
1021 else
1023 List *relids = NIL;
1024 List *schemarelids = NIL;
1027 * For any partitioned tables contained in the publication, we must
1028 * invalidate all partitions contained in the respective partition
1029 * trees, not just those explicitly mentioned in the publication.
1031 if (root_relids == NIL)
1032 relids = GetPublicationRelations(pubform->oid,
1033 PUBLICATION_PART_ALL);
1034 else
1037 * We already got tables explicitly mentioned in the publication.
1038 * Now get all partitions for the partitioned table in the list.
1040 foreach(lc, root_relids)
1041 relids = GetPubPartitionOptionRelations(relids,
1042 PUBLICATION_PART_ALL,
1043 lfirst_oid(lc));
1046 schemarelids = GetAllSchemaPublicationRelations(pubform->oid,
1047 PUBLICATION_PART_ALL);
1048 relids = list_concat_unique_oid(relids, schemarelids);
1050 InvalidatePublicationRels(relids);
1053 ObjectAddressSet(obj, PublicationRelationId, pubform->oid);
1054 EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1055 (Node *) stmt);
1057 InvokeObjectPostAlterHook(PublicationRelationId, pubform->oid, 0);
1061 * Invalidate the relations.
1063 void
1064 InvalidatePublicationRels(List *relids)
1067 * We don't want to send too many individual messages, at some point it's
1068 * cheaper to just reset whole relcache.
1070 if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS)
1072 ListCell *lc;
1074 foreach(lc, relids)
1075 CacheInvalidateRelcacheByRelid(lfirst_oid(lc));
1077 else
1078 CacheInvalidateRelcacheAll();
1082 * Add or remove table to/from publication.
1084 static void
1085 AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
1086 List *tables, const char *queryString,
1087 bool publish_schema)
1089 List *rels = NIL;
1090 Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1091 Oid pubid = pubform->oid;
1094 * Nothing to do if no objects, except in SET: for that it is quite
1095 * possible that user has not specified any tables in which case we need
1096 * to remove all the existing tables.
1098 if (!tables && stmt->action != AP_SetObjects)
1099 return;
1101 rels = OpenTableList(tables);
1103 if (stmt->action == AP_AddObjects)
1105 TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1107 publish_schema |= is_schema_publication(pubid);
1109 CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
1110 pubform->pubviaroot);
1112 PublicationAddTables(pubid, rels, false, stmt);
1114 else if (stmt->action == AP_DropObjects)
1115 PublicationDropTables(pubid, rels, false);
1116 else /* AP_SetObjects */
1118 List *oldrelids = GetPublicationRelations(pubid,
1119 PUBLICATION_PART_ROOT);
1120 List *delrels = NIL;
1121 ListCell *oldlc;
1123 TransformPubWhereClauses(rels, queryString, pubform->pubviaroot);
1125 CheckPubRelationColumnList(stmt->pubname, rels, publish_schema,
1126 pubform->pubviaroot);
1129 * To recreate the relation list for the publication, look for
1130 * existing relations that do not need to be dropped.
1132 foreach(oldlc, oldrelids)
1134 Oid oldrelid = lfirst_oid(oldlc);
1135 ListCell *newlc;
1136 PublicationRelInfo *oldrel;
1137 bool found = false;
1138 HeapTuple rftuple;
1139 Node *oldrelwhereclause = NULL;
1140 Bitmapset *oldcolumns = NULL;
1142 /* look up the cache for the old relmap */
1143 rftuple = SearchSysCache2(PUBLICATIONRELMAP,
1144 ObjectIdGetDatum(oldrelid),
1145 ObjectIdGetDatum(pubid));
1148 * See if the existing relation currently has a WHERE clause or a
1149 * column list. We need to compare those too.
1151 if (HeapTupleIsValid(rftuple))
1153 bool isnull = true;
1154 Datum whereClauseDatum;
1155 Datum columnListDatum;
1157 /* Load the WHERE clause for this table. */
1158 whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1159 Anum_pg_publication_rel_prqual,
1160 &isnull);
1161 if (!isnull)
1162 oldrelwhereclause = stringToNode(TextDatumGetCString(whereClauseDatum));
1164 /* Transform the int2vector column list to a bitmap. */
1165 columnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple,
1166 Anum_pg_publication_rel_prattrs,
1167 &isnull);
1169 if (!isnull)
1170 oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL);
1172 ReleaseSysCache(rftuple);
1175 foreach(newlc, rels)
1177 PublicationRelInfo *newpubrel;
1178 Oid newrelid;
1179 Bitmapset *newcolumns = NULL;
1181 newpubrel = (PublicationRelInfo *) lfirst(newlc);
1182 newrelid = RelationGetRelid(newpubrel->relation);
1185 * If the new publication has column list, transform it to a
1186 * bitmap too.
1188 if (newpubrel->columns)
1190 ListCell *lc;
1192 foreach(lc, newpubrel->columns)
1194 char *colname = strVal(lfirst(lc));
1195 AttrNumber attnum = get_attnum(newrelid, colname);
1197 newcolumns = bms_add_member(newcolumns, attnum);
1202 * Check if any of the new set of relations matches with the
1203 * existing relations in the publication. Additionally, if the
1204 * relation has an associated WHERE clause, check the WHERE
1205 * expressions also match. Same for the column list. Drop the
1206 * rest.
1208 if (RelationGetRelid(newpubrel->relation) == oldrelid)
1210 if (equal(oldrelwhereclause, newpubrel->whereClause) &&
1211 bms_equal(oldcolumns, newcolumns))
1213 found = true;
1214 break;
1220 * Add the non-matched relations to a list so that they can be
1221 * dropped.
1223 if (!found)
1225 oldrel = palloc(sizeof(PublicationRelInfo));
1226 oldrel->whereClause = NULL;
1227 oldrel->columns = NIL;
1228 oldrel->relation = table_open(oldrelid,
1229 ShareUpdateExclusiveLock);
1230 delrels = lappend(delrels, oldrel);
1234 /* And drop them. */
1235 PublicationDropTables(pubid, delrels, true);
1238 * Don't bother calculating the difference for adding, we'll catch and
1239 * skip existing ones when doing catalog update.
1241 PublicationAddTables(pubid, rels, true, stmt);
1243 CloseTableList(delrels);
1246 CloseTableList(rels);
1250 * Alter the publication schemas.
1252 * Add or remove schemas to/from publication.
1254 static void
1255 AlterPublicationSchemas(AlterPublicationStmt *stmt,
1256 HeapTuple tup, List *schemaidlist)
1258 Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1261 * Nothing to do if no objects, except in SET: for that it is quite
1262 * possible that user has not specified any schemas in which case we need
1263 * to remove all the existing schemas.
1265 if (!schemaidlist && stmt->action != AP_SetObjects)
1266 return;
1269 * Schema lock is held until the publication is altered to prevent
1270 * concurrent schema deletion.
1272 LockSchemaList(schemaidlist);
1273 if (stmt->action == AP_AddObjects)
1275 ListCell *lc;
1276 List *reloids;
1278 reloids = GetPublicationRelations(pubform->oid, PUBLICATION_PART_ROOT);
1280 foreach(lc, reloids)
1282 HeapTuple coltuple;
1284 coltuple = SearchSysCache2(PUBLICATIONRELMAP,
1285 ObjectIdGetDatum(lfirst_oid(lc)),
1286 ObjectIdGetDatum(pubform->oid));
1288 if (!HeapTupleIsValid(coltuple))
1289 continue;
1292 * Disallow adding schema if column list is already part of the
1293 * publication. See CheckPubRelationColumnList.
1295 if (!heap_attisnull(coltuple, Anum_pg_publication_rel_prattrs, NULL))
1296 ereport(ERROR,
1297 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1298 errmsg("cannot add schema to publication \"%s\"",
1299 stmt->pubname),
1300 errdetail("Schemas cannot be added if any tables that specify a column list are already part of the publication."));
1302 ReleaseSysCache(coltuple);
1305 PublicationAddSchemas(pubform->oid, schemaidlist, false, stmt);
1307 else if (stmt->action == AP_DropObjects)
1308 PublicationDropSchemas(pubform->oid, schemaidlist, false);
1309 else /* AP_SetObjects */
1311 List *oldschemaids = GetPublicationSchemas(pubform->oid);
1312 List *delschemas = NIL;
1314 /* Identify which schemas should be dropped */
1315 delschemas = list_difference_oid(oldschemaids, schemaidlist);
1318 * Schema lock is held until the publication is altered to prevent
1319 * concurrent schema deletion.
1321 LockSchemaList(delschemas);
1323 /* And drop them */
1324 PublicationDropSchemas(pubform->oid, delschemas, true);
1327 * Don't bother calculating the difference for adding, we'll catch and
1328 * skip existing ones when doing catalog update.
1330 PublicationAddSchemas(pubform->oid, schemaidlist, true, stmt);
1335 * Check if relations and schemas can be in a given publication and throw
1336 * appropriate error if not.
1338 static void
1339 CheckAlterPublication(AlterPublicationStmt *stmt, HeapTuple tup,
1340 List *tables, List *schemaidlist)
1342 Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup);
1344 if ((stmt->action == AP_AddObjects || stmt->action == AP_SetObjects) &&
1345 schemaidlist && !superuser())
1346 ereport(ERROR,
1347 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1348 errmsg("must be superuser to add or set schemas")));
1351 * Check that user is allowed to manipulate the publication tables in
1352 * schema
1354 if (schemaidlist && pubform->puballtables)
1355 ereport(ERROR,
1356 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1357 errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1358 NameStr(pubform->pubname)),
1359 errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications.")));
1361 /* Check that user is allowed to manipulate the publication tables. */
1362 if (tables && pubform->puballtables)
1363 ereport(ERROR,
1364 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1365 errmsg("publication \"%s\" is defined as FOR ALL TABLES",
1366 NameStr(pubform->pubname)),
1367 errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications.")));
1371 * Alter the existing publication.
1373 * This is dispatcher function for AlterPublicationOptions,
1374 * AlterPublicationSchemas and AlterPublicationTables.
1376 void
1377 AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt)
1379 Relation rel;
1380 HeapTuple tup;
1381 Form_pg_publication pubform;
1383 rel = table_open(PublicationRelationId, RowExclusiveLock);
1385 tup = SearchSysCacheCopy1(PUBLICATIONNAME,
1386 CStringGetDatum(stmt->pubname));
1388 if (!HeapTupleIsValid(tup))
1389 ereport(ERROR,
1390 (errcode(ERRCODE_UNDEFINED_OBJECT),
1391 errmsg("publication \"%s\" does not exist",
1392 stmt->pubname)));
1394 pubform = (Form_pg_publication) GETSTRUCT(tup);
1396 /* must be owner */
1397 if (!object_ownercheck(PublicationRelationId, pubform->oid, GetUserId()))
1398 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
1399 stmt->pubname);
1401 if (stmt->options)
1402 AlterPublicationOptions(pstate, stmt, rel, tup);
1403 else
1405 List *relations = NIL;
1406 List *schemaidlist = NIL;
1407 Oid pubid = pubform->oid;
1409 ObjectsInPublicationToOids(stmt->pubobjects, pstate, &relations,
1410 &schemaidlist);
1412 CheckAlterPublication(stmt, tup, relations, schemaidlist);
1414 heap_freetuple(tup);
1416 /* Lock the publication so nobody else can do anything with it. */
1417 LockDatabaseObject(PublicationRelationId, pubid, 0,
1418 AccessExclusiveLock);
1421 * It is possible that by the time we acquire the lock on publication,
1422 * concurrent DDL has removed it. We can test this by checking the
1423 * existence of publication. We get the tuple again to avoid the risk
1424 * of any publication option getting changed.
1426 tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1427 if (!HeapTupleIsValid(tup))
1428 ereport(ERROR,
1429 errcode(ERRCODE_UNDEFINED_OBJECT),
1430 errmsg("publication \"%s\" does not exist",
1431 stmt->pubname));
1433 AlterPublicationTables(stmt, tup, relations, pstate->p_sourcetext,
1434 schemaidlist != NIL);
1435 AlterPublicationSchemas(stmt, tup, schemaidlist);
1438 /* Cleanup. */
1439 heap_freetuple(tup);
1440 table_close(rel, RowExclusiveLock);
1444 * Remove relation from publication by mapping OID.
1446 void
1447 RemovePublicationRelById(Oid proid)
1449 Relation rel;
1450 HeapTuple tup;
1451 Form_pg_publication_rel pubrel;
1452 List *relids = NIL;
1454 rel = table_open(PublicationRelRelationId, RowExclusiveLock);
1456 tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid));
1458 if (!HeapTupleIsValid(tup))
1459 elog(ERROR, "cache lookup failed for publication table %u",
1460 proid);
1462 pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
1465 * Invalidate relcache so that publication info is rebuilt.
1467 * For the partitioned tables, we must invalidate all partitions contained
1468 * in the respective partition hierarchies, not just the one explicitly
1469 * mentioned in the publication. This is required because we implicitly
1470 * publish the child tables when the parent table is published.
1472 relids = GetPubPartitionOptionRelations(relids, PUBLICATION_PART_ALL,
1473 pubrel->prrelid);
1475 InvalidatePublicationRels(relids);
1477 CatalogTupleDelete(rel, &tup->t_self);
1479 ReleaseSysCache(tup);
1481 table_close(rel, RowExclusiveLock);
1485 * Remove the publication by mapping OID.
1487 void
1488 RemovePublicationById(Oid pubid)
1490 Relation rel;
1491 HeapTuple tup;
1492 Form_pg_publication pubform;
1494 rel = table_open(PublicationRelationId, RowExclusiveLock);
1496 tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
1497 if (!HeapTupleIsValid(tup))
1498 elog(ERROR, "cache lookup failed for publication %u", pubid);
1500 pubform = (Form_pg_publication) GETSTRUCT(tup);
1502 /* Invalidate relcache so that publication info is rebuilt. */
1503 if (pubform->puballtables)
1504 CacheInvalidateRelcacheAll();
1506 CatalogTupleDelete(rel, &tup->t_self);
1508 ReleaseSysCache(tup);
1510 table_close(rel, RowExclusiveLock);
1514 * Remove schema from publication by mapping OID.
1516 void
1517 RemovePublicationSchemaById(Oid psoid)
1519 Relation rel;
1520 HeapTuple tup;
1521 List *schemaRels = NIL;
1522 Form_pg_publication_namespace pubsch;
1524 rel = table_open(PublicationNamespaceRelationId, RowExclusiveLock);
1526 tup = SearchSysCache1(PUBLICATIONNAMESPACE, ObjectIdGetDatum(psoid));
1528 if (!HeapTupleIsValid(tup))
1529 elog(ERROR, "cache lookup failed for publication schema %u", psoid);
1531 pubsch = (Form_pg_publication_namespace) GETSTRUCT(tup);
1534 * Invalidate relcache so that publication info is rebuilt. See
1535 * RemovePublicationRelById for why we need to consider all the
1536 * partitions.
1538 schemaRels = GetSchemaPublicationRelations(pubsch->pnnspid,
1539 PUBLICATION_PART_ALL);
1540 InvalidatePublicationRels(schemaRels);
1542 CatalogTupleDelete(rel, &tup->t_self);
1544 ReleaseSysCache(tup);
1546 table_close(rel, RowExclusiveLock);
1550 * Open relations specified by a PublicationTable list.
1551 * The returned tables are locked in ShareUpdateExclusiveLock mode in order to
1552 * add them to a publication.
1554 static List *
1555 OpenTableList(List *tables)
1557 List *relids = NIL;
1558 List *rels = NIL;
1559 ListCell *lc;
1560 List *relids_with_rf = NIL;
1561 List *relids_with_collist = NIL;
1564 * Open, share-lock, and check all the explicitly-specified relations
1566 foreach(lc, tables)
1568 PublicationTable *t = lfirst_node(PublicationTable, lc);
1569 bool recurse = t->relation->inh;
1570 Relation rel;
1571 Oid myrelid;
1572 PublicationRelInfo *pub_rel;
1574 /* Allow query cancel in case this takes a long time */
1575 CHECK_FOR_INTERRUPTS();
1577 rel = table_openrv(t->relation, ShareUpdateExclusiveLock);
1578 myrelid = RelationGetRelid(rel);
1581 * Filter out duplicates if user specifies "foo, foo".
1583 * Note that this algorithm is known to not be very efficient (O(N^2))
1584 * but given that it only works on list of tables given to us by user
1585 * it's deemed acceptable.
1587 if (list_member_oid(relids, myrelid))
1589 /* Disallow duplicate tables if there are any with row filters. */
1590 if (t->whereClause || list_member_oid(relids_with_rf, myrelid))
1591 ereport(ERROR,
1592 (errcode(ERRCODE_DUPLICATE_OBJECT),
1593 errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1594 RelationGetRelationName(rel))));
1596 /* Disallow duplicate tables if there are any with column lists. */
1597 if (t->columns || list_member_oid(relids_with_collist, myrelid))
1598 ereport(ERROR,
1599 (errcode(ERRCODE_DUPLICATE_OBJECT),
1600 errmsg("conflicting or redundant column lists for table \"%s\"",
1601 RelationGetRelationName(rel))));
1603 table_close(rel, ShareUpdateExclusiveLock);
1604 continue;
1607 pub_rel = palloc(sizeof(PublicationRelInfo));
1608 pub_rel->relation = rel;
1609 pub_rel->whereClause = t->whereClause;
1610 pub_rel->columns = t->columns;
1611 rels = lappend(rels, pub_rel);
1612 relids = lappend_oid(relids, myrelid);
1614 if (t->whereClause)
1615 relids_with_rf = lappend_oid(relids_with_rf, myrelid);
1617 if (t->columns)
1618 relids_with_collist = lappend_oid(relids_with_collist, myrelid);
1621 * Add children of this rel, if requested, so that they too are added
1622 * to the publication. A partitioned table can't have any inheritance
1623 * children other than its partitions, which need not be explicitly
1624 * added to the publication.
1626 if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
1628 List *children;
1629 ListCell *child;
1631 children = find_all_inheritors(myrelid, ShareUpdateExclusiveLock,
1632 NULL);
1634 foreach(child, children)
1636 Oid childrelid = lfirst_oid(child);
1638 /* Allow query cancel in case this takes a long time */
1639 CHECK_FOR_INTERRUPTS();
1642 * Skip duplicates if user specified both parent and child
1643 * tables.
1645 if (list_member_oid(relids, childrelid))
1648 * We don't allow to specify row filter for both parent
1649 * and child table at the same time as it is not very
1650 * clear which one should be given preference.
1652 if (childrelid != myrelid &&
1653 (t->whereClause || list_member_oid(relids_with_rf, childrelid)))
1654 ereport(ERROR,
1655 (errcode(ERRCODE_DUPLICATE_OBJECT),
1656 errmsg("conflicting or redundant WHERE clauses for table \"%s\"",
1657 RelationGetRelationName(rel))));
1660 * We don't allow to specify column list for both parent
1661 * and child table at the same time as it is not very
1662 * clear which one should be given preference.
1664 if (childrelid != myrelid &&
1665 (t->columns || list_member_oid(relids_with_collist, childrelid)))
1666 ereport(ERROR,
1667 (errcode(ERRCODE_DUPLICATE_OBJECT),
1668 errmsg("conflicting or redundant column lists for table \"%s\"",
1669 RelationGetRelationName(rel))));
1671 continue;
1674 /* find_all_inheritors already got lock */
1675 rel = table_open(childrelid, NoLock);
1676 pub_rel = palloc(sizeof(PublicationRelInfo));
1677 pub_rel->relation = rel;
1678 /* child inherits WHERE clause from parent */
1679 pub_rel->whereClause = t->whereClause;
1681 /* child inherits column list from parent */
1682 pub_rel->columns = t->columns;
1683 rels = lappend(rels, pub_rel);
1684 relids = lappend_oid(relids, childrelid);
1686 if (t->whereClause)
1687 relids_with_rf = lappend_oid(relids_with_rf, childrelid);
1689 if (t->columns)
1690 relids_with_collist = lappend_oid(relids_with_collist, childrelid);
1695 list_free(relids);
1696 list_free(relids_with_rf);
1698 return rels;
1702 * Close all relations in the list.
1704 static void
1705 CloseTableList(List *rels)
1707 ListCell *lc;
1709 foreach(lc, rels)
1711 PublicationRelInfo *pub_rel;
1713 pub_rel = (PublicationRelInfo *) lfirst(lc);
1714 table_close(pub_rel->relation, NoLock);
1717 list_free_deep(rels);
1721 * Lock the schemas specified in the schema list in AccessShareLock mode in
1722 * order to prevent concurrent schema deletion.
1724 static void
1725 LockSchemaList(List *schemalist)
1727 ListCell *lc;
1729 foreach(lc, schemalist)
1731 Oid schemaid = lfirst_oid(lc);
1733 /* Allow query cancel in case this takes a long time */
1734 CHECK_FOR_INTERRUPTS();
1735 LockDatabaseObject(NamespaceRelationId, schemaid, 0, AccessShareLock);
1738 * It is possible that by the time we acquire the lock on schema,
1739 * concurrent DDL has removed it. We can test this by checking the
1740 * existence of schema.
1742 if (!SearchSysCacheExists1(NAMESPACEOID, ObjectIdGetDatum(schemaid)))
1743 ereport(ERROR,
1744 errcode(ERRCODE_UNDEFINED_SCHEMA),
1745 errmsg("schema with OID %u does not exist", schemaid));
1750 * Add listed tables to the publication.
1752 static void
1753 PublicationAddTables(Oid pubid, List *rels, bool if_not_exists,
1754 AlterPublicationStmt *stmt)
1756 ListCell *lc;
1758 Assert(!stmt || !stmt->for_all_tables);
1760 foreach(lc, rels)
1762 PublicationRelInfo *pub_rel = (PublicationRelInfo *) lfirst(lc);
1763 Relation rel = pub_rel->relation;
1764 ObjectAddress obj;
1766 /* Must be owner of the table or superuser. */
1767 if (!object_ownercheck(RelationRelationId, RelationGetRelid(rel), GetUserId()))
1768 aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind),
1769 RelationGetRelationName(rel));
1771 obj = publication_add_relation(pubid, pub_rel, if_not_exists);
1772 if (stmt)
1774 EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1775 (Node *) stmt);
1777 InvokeObjectPostCreateHook(PublicationRelRelationId,
1778 obj.objectId, 0);
1784 * Remove listed tables from the publication.
1786 static void
1787 PublicationDropTables(Oid pubid, List *rels, bool missing_ok)
1789 ObjectAddress obj;
1790 ListCell *lc;
1791 Oid prid;
1793 foreach(lc, rels)
1795 PublicationRelInfo *pubrel = (PublicationRelInfo *) lfirst(lc);
1796 Relation rel = pubrel->relation;
1797 Oid relid = RelationGetRelid(rel);
1799 if (pubrel->columns)
1800 ereport(ERROR,
1801 errcode(ERRCODE_SYNTAX_ERROR),
1802 errmsg("column list must not be specified in ALTER PUBLICATION ... DROP"));
1804 prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid,
1805 ObjectIdGetDatum(relid),
1806 ObjectIdGetDatum(pubid));
1807 if (!OidIsValid(prid))
1809 if (missing_ok)
1810 continue;
1812 ereport(ERROR,
1813 (errcode(ERRCODE_UNDEFINED_OBJECT),
1814 errmsg("relation \"%s\" is not part of the publication",
1815 RelationGetRelationName(rel))));
1818 if (pubrel->whereClause)
1819 ereport(ERROR,
1820 (errcode(ERRCODE_SYNTAX_ERROR),
1821 errmsg("cannot use a WHERE clause when removing a table from a publication")));
1823 ObjectAddressSet(obj, PublicationRelRelationId, prid);
1824 performDeletion(&obj, DROP_CASCADE, 0);
1829 * Add listed schemas to the publication.
1831 static void
1832 PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists,
1833 AlterPublicationStmt *stmt)
1835 ListCell *lc;
1837 Assert(!stmt || !stmt->for_all_tables);
1839 foreach(lc, schemas)
1841 Oid schemaid = lfirst_oid(lc);
1842 ObjectAddress obj;
1844 obj = publication_add_schema(pubid, schemaid, if_not_exists);
1845 if (stmt)
1847 EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress,
1848 (Node *) stmt);
1850 InvokeObjectPostCreateHook(PublicationNamespaceRelationId,
1851 obj.objectId, 0);
1857 * Remove listed schemas from the publication.
1859 static void
1860 PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok)
1862 ObjectAddress obj;
1863 ListCell *lc;
1864 Oid psid;
1866 foreach(lc, schemas)
1868 Oid schemaid = lfirst_oid(lc);
1870 psid = GetSysCacheOid2(PUBLICATIONNAMESPACEMAP,
1871 Anum_pg_publication_namespace_oid,
1872 ObjectIdGetDatum(schemaid),
1873 ObjectIdGetDatum(pubid));
1874 if (!OidIsValid(psid))
1876 if (missing_ok)
1877 continue;
1879 ereport(ERROR,
1880 (errcode(ERRCODE_UNDEFINED_OBJECT),
1881 errmsg("tables from schema \"%s\" are not part of the publication",
1882 get_namespace_name(schemaid))));
1885 ObjectAddressSet(obj, PublicationNamespaceRelationId, psid);
1886 performDeletion(&obj, DROP_CASCADE, 0);
1891 * Internal workhorse for changing a publication owner
1893 static void
1894 AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
1896 Form_pg_publication form;
1898 form = (Form_pg_publication) GETSTRUCT(tup);
1900 if (form->pubowner == newOwnerId)
1901 return;
1903 if (!superuser())
1905 AclResult aclresult;
1907 /* Must be owner */
1908 if (!object_ownercheck(PublicationRelationId, form->oid, GetUserId()))
1909 aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_PUBLICATION,
1910 NameStr(form->pubname));
1912 /* Must be able to become new owner */
1913 check_is_member_of_role(GetUserId(), newOwnerId);
1915 /* New owner must have CREATE privilege on database */
1916 aclresult = pg_database_aclcheck(MyDatabaseId, newOwnerId, ACL_CREATE);
1917 if (aclresult != ACLCHECK_OK)
1918 aclcheck_error(aclresult, OBJECT_DATABASE,
1919 get_database_name(MyDatabaseId));
1921 if (form->puballtables && !superuser_arg(newOwnerId))
1922 ereport(ERROR,
1923 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1924 errmsg("permission denied to change owner of publication \"%s\"",
1925 NameStr(form->pubname)),
1926 errhint("The owner of a FOR ALL TABLES publication must be a superuser.")));
1928 if (!superuser_arg(newOwnerId) && is_schema_publication(form->oid))
1929 ereport(ERROR,
1930 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1931 errmsg("permission denied to change owner of publication \"%s\"",
1932 NameStr(form->pubname)),
1933 errhint("The owner of a FOR TABLES IN SCHEMA publication must be a superuser.")));
1936 form->pubowner = newOwnerId;
1937 CatalogTupleUpdate(rel, &tup->t_self, tup);
1939 /* Update owner dependency reference */
1940 changeDependencyOnOwner(PublicationRelationId,
1941 form->oid,
1942 newOwnerId);
1944 InvokeObjectPostAlterHook(PublicationRelationId,
1945 form->oid, 0);
1949 * Change publication owner -- by name
1951 ObjectAddress
1952 AlterPublicationOwner(const char *name, Oid newOwnerId)
1954 Oid subid;
1955 HeapTuple tup;
1956 Relation rel;
1957 ObjectAddress address;
1958 Form_pg_publication pubform;
1960 rel = table_open(PublicationRelationId, RowExclusiveLock);
1962 tup = SearchSysCacheCopy1(PUBLICATIONNAME, CStringGetDatum(name));
1964 if (!HeapTupleIsValid(tup))
1965 ereport(ERROR,
1966 (errcode(ERRCODE_UNDEFINED_OBJECT),
1967 errmsg("publication \"%s\" does not exist", name)));
1969 pubform = (Form_pg_publication) GETSTRUCT(tup);
1970 subid = pubform->oid;
1972 AlterPublicationOwner_internal(rel, tup, newOwnerId);
1974 ObjectAddressSet(address, PublicationRelationId, subid);
1976 heap_freetuple(tup);
1978 table_close(rel, RowExclusiveLock);
1980 return address;
1984 * Change publication owner -- by OID
1986 void
1987 AlterPublicationOwner_oid(Oid subid, Oid newOwnerId)
1989 HeapTuple tup;
1990 Relation rel;
1992 rel = table_open(PublicationRelationId, RowExclusiveLock);
1994 tup = SearchSysCacheCopy1(PUBLICATIONOID, ObjectIdGetDatum(subid));
1996 if (!HeapTupleIsValid(tup))
1997 ereport(ERROR,
1998 (errcode(ERRCODE_UNDEFINED_OBJECT),
1999 errmsg("publication with OID %u does not exist", subid)));
2001 AlterPublicationOwner_internal(rel, tup, newOwnerId);
2003 heap_freetuple(tup);
2005 table_close(rel, RowExclusiveLock);