Update copyright for 2022
[pgsql.git] / src / backend / catalog / pg_subscription.c
blobca65a8bd201a326d40daa5eb60ae5b24f66d83b0
1 /*-------------------------------------------------------------------------
3 * pg_subscription.c
4 * replication subscriptions
6 * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
9 * IDENTIFICATION
10 * src/backend/catalog/pg_subscription.c
12 *-------------------------------------------------------------------------
15 #include "postgres.h"
17 #include "access/genam.h"
18 #include "access/heapam.h"
19 #include "access/htup_details.h"
20 #include "access/tableam.h"
21 #include "access/xact.h"
22 #include "catalog/indexing.h"
23 #include "catalog/pg_subscription.h"
24 #include "catalog/pg_subscription_rel.h"
25 #include "catalog/pg_type.h"
26 #include "miscadmin.h"
27 #include "nodes/makefuncs.h"
28 #include "storage/lmgr.h"
29 #include "utils/array.h"
30 #include "utils/builtins.h"
31 #include "utils/fmgroids.h"
32 #include "utils/lsyscache.h"
33 #include "utils/pg_lsn.h"
34 #include "utils/rel.h"
35 #include "utils/syscache.h"
37 static List *textarray_to_stringlist(ArrayType *textarray);
40 * Fetch the subscription from the syscache.
42 Subscription *
43 GetSubscription(Oid subid, bool missing_ok)
45 HeapTuple tup;
46 Subscription *sub;
47 Form_pg_subscription subform;
48 Datum datum;
49 bool isnull;
51 tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
53 if (!HeapTupleIsValid(tup))
55 if (missing_ok)
56 return NULL;
58 elog(ERROR, "cache lookup failed for subscription %u", subid);
61 subform = (Form_pg_subscription) GETSTRUCT(tup);
63 sub = (Subscription *) palloc(sizeof(Subscription));
64 sub->oid = subid;
65 sub->dbid = subform->subdbid;
66 sub->name = pstrdup(NameStr(subform->subname));
67 sub->owner = subform->subowner;
68 sub->enabled = subform->subenabled;
69 sub->binary = subform->subbinary;
70 sub->stream = subform->substream;
71 sub->twophasestate = subform->subtwophasestate;
73 /* Get conninfo */
74 datum = SysCacheGetAttr(SUBSCRIPTIONOID,
75 tup,
76 Anum_pg_subscription_subconninfo,
77 &isnull);
78 Assert(!isnull);
79 sub->conninfo = TextDatumGetCString(datum);
81 /* Get slotname */
82 datum = SysCacheGetAttr(SUBSCRIPTIONOID,
83 tup,
84 Anum_pg_subscription_subslotname,
85 &isnull);
86 if (!isnull)
87 sub->slotname = pstrdup(NameStr(*DatumGetName(datum)));
88 else
89 sub->slotname = NULL;
91 /* Get synccommit */
92 datum = SysCacheGetAttr(SUBSCRIPTIONOID,
93 tup,
94 Anum_pg_subscription_subsynccommit,
95 &isnull);
96 Assert(!isnull);
97 sub->synccommit = TextDatumGetCString(datum);
99 /* Get publications */
100 datum = SysCacheGetAttr(SUBSCRIPTIONOID,
101 tup,
102 Anum_pg_subscription_subpublications,
103 &isnull);
104 Assert(!isnull);
105 sub->publications = textarray_to_stringlist(DatumGetArrayTypeP(datum));
107 ReleaseSysCache(tup);
109 return sub;
113 * Return number of subscriptions defined in given database.
114 * Used by dropdb() to check if database can indeed be dropped.
117 CountDBSubscriptions(Oid dbid)
119 int nsubs = 0;
120 Relation rel;
121 ScanKeyData scankey;
122 SysScanDesc scan;
123 HeapTuple tup;
125 rel = table_open(SubscriptionRelationId, RowExclusiveLock);
127 ScanKeyInit(&scankey,
128 Anum_pg_subscription_subdbid,
129 BTEqualStrategyNumber, F_OIDEQ,
130 ObjectIdGetDatum(dbid));
132 scan = systable_beginscan(rel, InvalidOid, false,
133 NULL, 1, &scankey);
135 while (HeapTupleIsValid(tup = systable_getnext(scan)))
136 nsubs++;
138 systable_endscan(scan);
140 table_close(rel, NoLock);
142 return nsubs;
146 * Free memory allocated by subscription struct.
148 void
149 FreeSubscription(Subscription *sub)
151 pfree(sub->name);
152 pfree(sub->conninfo);
153 if (sub->slotname)
154 pfree(sub->slotname);
155 list_free_deep(sub->publications);
156 pfree(sub);
160 * get_subscription_oid - given a subscription name, look up the OID
162 * If missing_ok is false, throw an error if name not found. If true, just
163 * return InvalidOid.
166 get_subscription_oid(const char *subname, bool missing_ok)
168 Oid oid;
170 oid = GetSysCacheOid2(SUBSCRIPTIONNAME, Anum_pg_subscription_oid,
171 MyDatabaseId, CStringGetDatum(subname));
172 if (!OidIsValid(oid) && !missing_ok)
173 ereport(ERROR,
174 (errcode(ERRCODE_UNDEFINED_OBJECT),
175 errmsg("subscription \"%s\" does not exist", subname)));
176 return oid;
180 * get_subscription_name - given a subscription OID, look up the name
182 * If missing_ok is false, throw an error if name not found. If true, just
183 * return NULL.
185 char *
186 get_subscription_name(Oid subid, bool missing_ok)
188 HeapTuple tup;
189 char *subname;
190 Form_pg_subscription subform;
192 tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
194 if (!HeapTupleIsValid(tup))
196 if (!missing_ok)
197 elog(ERROR, "cache lookup failed for subscription %u", subid);
198 return NULL;
201 subform = (Form_pg_subscription) GETSTRUCT(tup);
202 subname = pstrdup(NameStr(subform->subname));
204 ReleaseSysCache(tup);
206 return subname;
210 * Convert text array to list of strings.
212 * Note: the resulting list of strings is pallocated here.
214 static List *
215 textarray_to_stringlist(ArrayType *textarray)
217 Datum *elems;
218 int nelems,
220 List *res = NIL;
222 deconstruct_array(textarray,
223 TEXTOID, -1, false, TYPALIGN_INT,
224 &elems, NULL, &nelems);
226 if (nelems == 0)
227 return NIL;
229 for (i = 0; i < nelems; i++)
230 res = lappend(res, makeString(TextDatumGetCString(elems[i])));
232 return res;
236 * Add new state record for a subscription table.
238 void
239 AddSubscriptionRelState(Oid subid, Oid relid, char state,
240 XLogRecPtr sublsn)
242 Relation rel;
243 HeapTuple tup;
244 bool nulls[Natts_pg_subscription_rel];
245 Datum values[Natts_pg_subscription_rel];
247 LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
249 rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
251 /* Try finding existing mapping. */
252 tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
253 ObjectIdGetDatum(relid),
254 ObjectIdGetDatum(subid));
255 if (HeapTupleIsValid(tup))
256 elog(ERROR, "subscription table %u in subscription %u already exists",
257 relid, subid);
259 /* Form the tuple. */
260 memset(values, 0, sizeof(values));
261 memset(nulls, false, sizeof(nulls));
262 values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
263 values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
264 values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
265 if (sublsn != InvalidXLogRecPtr)
266 values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
267 else
268 nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
270 tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
272 /* Insert tuple into catalog. */
273 CatalogTupleInsert(rel, tup);
275 heap_freetuple(tup);
277 /* Cleanup. */
278 table_close(rel, NoLock);
282 * Update the state of a subscription table.
284 void
285 UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
286 XLogRecPtr sublsn)
288 Relation rel;
289 HeapTuple tup;
290 bool nulls[Natts_pg_subscription_rel];
291 Datum values[Natts_pg_subscription_rel];
292 bool replaces[Natts_pg_subscription_rel];
294 LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
296 rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
298 /* Try finding existing mapping. */
299 tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP,
300 ObjectIdGetDatum(relid),
301 ObjectIdGetDatum(subid));
302 if (!HeapTupleIsValid(tup))
303 elog(ERROR, "subscription table %u in subscription %u does not exist",
304 relid, subid);
306 /* Update the tuple. */
307 memset(values, 0, sizeof(values));
308 memset(nulls, false, sizeof(nulls));
309 memset(replaces, false, sizeof(replaces));
311 replaces[Anum_pg_subscription_rel_srsubstate - 1] = true;
312 values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
314 replaces[Anum_pg_subscription_rel_srsublsn - 1] = true;
315 if (sublsn != InvalidXLogRecPtr)
316 values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
317 else
318 nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
320 tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
321 replaces);
323 /* Update the catalog. */
324 CatalogTupleUpdate(rel, &tup->t_self, tup);
326 /* Cleanup. */
327 table_close(rel, NoLock);
331 * Get state of subscription table.
333 * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
335 char
336 GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
338 HeapTuple tup;
339 char substate;
340 bool isnull;
341 Datum d;
342 Relation rel;
345 * This is to avoid the race condition with AlterSubscription which tries
346 * to remove this relstate.
348 rel = table_open(SubscriptionRelRelationId, AccessShareLock);
350 /* Try finding the mapping. */
351 tup = SearchSysCache2(SUBSCRIPTIONRELMAP,
352 ObjectIdGetDatum(relid),
353 ObjectIdGetDatum(subid));
355 if (!HeapTupleIsValid(tup))
357 table_close(rel, AccessShareLock);
358 *sublsn = InvalidXLogRecPtr;
359 return SUBREL_STATE_UNKNOWN;
362 /* Get the state. */
363 substate = ((Form_pg_subscription_rel) GETSTRUCT(tup))->srsubstate;
365 /* Get the LSN */
366 d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
367 Anum_pg_subscription_rel_srsublsn, &isnull);
368 if (isnull)
369 *sublsn = InvalidXLogRecPtr;
370 else
371 *sublsn = DatumGetLSN(d);
373 /* Cleanup */
374 ReleaseSysCache(tup);
376 table_close(rel, AccessShareLock);
378 return substate;
382 * Drop subscription relation mapping. These can be for a particular
383 * subscription, or for a particular relation, or both.
385 void
386 RemoveSubscriptionRel(Oid subid, Oid relid)
388 Relation rel;
389 TableScanDesc scan;
390 ScanKeyData skey[2];
391 HeapTuple tup;
392 int nkeys = 0;
394 rel = table_open(SubscriptionRelRelationId, RowExclusiveLock);
396 if (OidIsValid(subid))
398 ScanKeyInit(&skey[nkeys++],
399 Anum_pg_subscription_rel_srsubid,
400 BTEqualStrategyNumber,
401 F_OIDEQ,
402 ObjectIdGetDatum(subid));
405 if (OidIsValid(relid))
407 ScanKeyInit(&skey[nkeys++],
408 Anum_pg_subscription_rel_srrelid,
409 BTEqualStrategyNumber,
410 F_OIDEQ,
411 ObjectIdGetDatum(relid));
414 /* Do the search and delete what we found. */
415 scan = table_beginscan_catalog(rel, nkeys, skey);
416 while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
418 Form_pg_subscription_rel subrel;
420 subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
423 * We don't allow to drop the relation mapping when the table
424 * synchronization is in progress unless the caller updates the
425 * corresponding subscription as well. This is to ensure that we don't
426 * leave tablesync slots or origins in the system when the
427 * corresponding table is dropped.
429 if (!OidIsValid(subid) && subrel->srsubstate != SUBREL_STATE_READY)
431 ereport(ERROR,
432 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
433 errmsg("could not drop relation mapping for subscription \"%s\"",
434 get_subscription_name(subrel->srsubid, false)),
435 errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
436 get_rel_name(relid), subrel->srsubstate),
439 * translator: first %s is a SQL ALTER command and second %s is a
440 * SQL DROP command
442 errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
443 "ALTER SUBSCRIPTION ... ENABLE",
444 "DROP SUBSCRIPTION ...")));
447 CatalogTupleDelete(rel, &tup->t_self);
449 table_endscan(scan);
451 table_close(rel, RowExclusiveLock);
455 * Does the subscription have any relations?
457 * Use this function only to know true/false, and when you have no need for the
458 * List returned by GetSubscriptionRelations.
460 bool
461 HasSubscriptionRelations(Oid subid)
463 Relation rel;
464 ScanKeyData skey[1];
465 SysScanDesc scan;
466 bool has_subrels;
468 rel = table_open(SubscriptionRelRelationId, AccessShareLock);
470 ScanKeyInit(&skey[0],
471 Anum_pg_subscription_rel_srsubid,
472 BTEqualStrategyNumber, F_OIDEQ,
473 ObjectIdGetDatum(subid));
475 scan = systable_beginscan(rel, InvalidOid, false,
476 NULL, 1, skey);
478 /* If even a single tuple exists then the subscription has tables. */
479 has_subrels = HeapTupleIsValid(systable_getnext(scan));
481 /* Cleanup */
482 systable_endscan(scan);
483 table_close(rel, AccessShareLock);
485 return has_subrels;
489 * Get all relations for subscription.
491 * Returned list is palloc'ed in current memory context.
493 List *
494 GetSubscriptionRelations(Oid subid)
496 List *res = NIL;
497 Relation rel;
498 HeapTuple tup;
499 ScanKeyData skey[1];
500 SysScanDesc scan;
502 rel = table_open(SubscriptionRelRelationId, AccessShareLock);
504 ScanKeyInit(&skey[0],
505 Anum_pg_subscription_rel_srsubid,
506 BTEqualStrategyNumber, F_OIDEQ,
507 ObjectIdGetDatum(subid));
509 scan = systable_beginscan(rel, InvalidOid, false,
510 NULL, 1, skey);
512 while (HeapTupleIsValid(tup = systable_getnext(scan)))
514 Form_pg_subscription_rel subrel;
515 SubscriptionRelState *relstate;
516 Datum d;
517 bool isnull;
519 subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
521 relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
522 relstate->relid = subrel->srrelid;
523 relstate->state = subrel->srsubstate;
524 d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
525 Anum_pg_subscription_rel_srsublsn, &isnull);
526 if (isnull)
527 relstate->lsn = InvalidXLogRecPtr;
528 else
529 relstate->lsn = DatumGetLSN(d);
531 res = lappend(res, relstate);
534 /* Cleanup */
535 systable_endscan(scan);
536 table_close(rel, AccessShareLock);
538 return res;
542 * Get all relations for subscription that are not in a ready state.
544 * Returned list is palloc'ed in current memory context.
546 List *
547 GetSubscriptionNotReadyRelations(Oid subid)
549 List *res = NIL;
550 Relation rel;
551 HeapTuple tup;
552 int nkeys = 0;
553 ScanKeyData skey[2];
554 SysScanDesc scan;
556 rel = table_open(SubscriptionRelRelationId, AccessShareLock);
558 ScanKeyInit(&skey[nkeys++],
559 Anum_pg_subscription_rel_srsubid,
560 BTEqualStrategyNumber, F_OIDEQ,
561 ObjectIdGetDatum(subid));
563 ScanKeyInit(&skey[nkeys++],
564 Anum_pg_subscription_rel_srsubstate,
565 BTEqualStrategyNumber, F_CHARNE,
566 CharGetDatum(SUBREL_STATE_READY));
568 scan = systable_beginscan(rel, InvalidOid, false,
569 NULL, nkeys, skey);
571 while (HeapTupleIsValid(tup = systable_getnext(scan)))
573 Form_pg_subscription_rel subrel;
574 SubscriptionRelState *relstate;
575 Datum d;
576 bool isnull;
578 subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
580 relstate = (SubscriptionRelState *) palloc(sizeof(SubscriptionRelState));
581 relstate->relid = subrel->srrelid;
582 relstate->state = subrel->srsubstate;
583 d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
584 Anum_pg_subscription_rel_srsublsn, &isnull);
585 if (isnull)
586 relstate->lsn = InvalidXLogRecPtr;
587 else
588 relstate->lsn = DatumGetLSN(d);
590 res = lappend(res, relstate);
593 /* Cleanup */
594 systable_endscan(scan);
595 table_close(rel, AccessShareLock);
597 return res;