1 /*-------------------------------------------------------------------------
4 * replication subscriptions
6 * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
10 * src/backend/catalog/pg_subscription.c
12 *-------------------------------------------------------------------------
17 #include "access/genam.h"
18 #include "access/heapam.h"
19 #include "access/htup_details.h"
20 #include "access/tableam.h"
21 #include "access/xact.h"
22 #include "catalog/indexing.h"
23 #include "catalog/pg_subscription.h"
24 #include "catalog/pg_subscription_rel.h"
25 #include "catalog/pg_type.h"
26 #include "miscadmin.h"
27 #include "nodes/makefuncs.h"
28 #include "storage/lmgr.h"
29 #include "utils/array.h"
30 #include "utils/builtins.h"
31 #include "utils/fmgroids.h"
32 #include "utils/lsyscache.h"
33 #include "utils/pg_lsn.h"
34 #include "utils/rel.h"
35 #include "utils/syscache.h"
37 static List
*textarray_to_stringlist(ArrayType
*textarray
);
40 * Fetch the subscription from the syscache.
43 GetSubscription(Oid subid
, bool missing_ok
)
47 Form_pg_subscription subform
;
51 tup
= SearchSysCache1(SUBSCRIPTIONOID
, ObjectIdGetDatum(subid
));
53 if (!HeapTupleIsValid(tup
))
58 elog(ERROR
, "cache lookup failed for subscription %u", subid
);
61 subform
= (Form_pg_subscription
) GETSTRUCT(tup
);
63 sub
= (Subscription
*) palloc(sizeof(Subscription
));
65 sub
->dbid
= subform
->subdbid
;
66 sub
->name
= pstrdup(NameStr(subform
->subname
));
67 sub
->owner
= subform
->subowner
;
68 sub
->enabled
= subform
->subenabled
;
69 sub
->binary
= subform
->subbinary
;
70 sub
->stream
= subform
->substream
;
71 sub
->twophasestate
= subform
->subtwophasestate
;
74 datum
= SysCacheGetAttr(SUBSCRIPTIONOID
,
76 Anum_pg_subscription_subconninfo
,
79 sub
->conninfo
= TextDatumGetCString(datum
);
82 datum
= SysCacheGetAttr(SUBSCRIPTIONOID
,
84 Anum_pg_subscription_subslotname
,
87 sub
->slotname
= pstrdup(NameStr(*DatumGetName(datum
)));
92 datum
= SysCacheGetAttr(SUBSCRIPTIONOID
,
94 Anum_pg_subscription_subsynccommit
,
97 sub
->synccommit
= TextDatumGetCString(datum
);
99 /* Get publications */
100 datum
= SysCacheGetAttr(SUBSCRIPTIONOID
,
102 Anum_pg_subscription_subpublications
,
105 sub
->publications
= textarray_to_stringlist(DatumGetArrayTypeP(datum
));
107 ReleaseSysCache(tup
);
113 * Return number of subscriptions defined in given database.
114 * Used by dropdb() to check if database can indeed be dropped.
117 CountDBSubscriptions(Oid dbid
)
125 rel
= table_open(SubscriptionRelationId
, RowExclusiveLock
);
127 ScanKeyInit(&scankey
,
128 Anum_pg_subscription_subdbid
,
129 BTEqualStrategyNumber
, F_OIDEQ
,
130 ObjectIdGetDatum(dbid
));
132 scan
= systable_beginscan(rel
, InvalidOid
, false,
135 while (HeapTupleIsValid(tup
= systable_getnext(scan
)))
138 systable_endscan(scan
);
140 table_close(rel
, NoLock
);
146 * Free memory allocated by subscription struct.
149 FreeSubscription(Subscription
*sub
)
152 pfree(sub
->conninfo
);
154 pfree(sub
->slotname
);
155 list_free_deep(sub
->publications
);
160 * get_subscription_oid - given a subscription name, look up the OID
162 * If missing_ok is false, throw an error if name not found. If true, just
166 get_subscription_oid(const char *subname
, bool missing_ok
)
170 oid
= GetSysCacheOid2(SUBSCRIPTIONNAME
, Anum_pg_subscription_oid
,
171 MyDatabaseId
, CStringGetDatum(subname
));
172 if (!OidIsValid(oid
) && !missing_ok
)
174 (errcode(ERRCODE_UNDEFINED_OBJECT
),
175 errmsg("subscription \"%s\" does not exist", subname
)));
180 * get_subscription_name - given a subscription OID, look up the name
182 * If missing_ok is false, throw an error if name not found. If true, just
186 get_subscription_name(Oid subid
, bool missing_ok
)
190 Form_pg_subscription subform
;
192 tup
= SearchSysCache1(SUBSCRIPTIONOID
, ObjectIdGetDatum(subid
));
194 if (!HeapTupleIsValid(tup
))
197 elog(ERROR
, "cache lookup failed for subscription %u", subid
);
201 subform
= (Form_pg_subscription
) GETSTRUCT(tup
);
202 subname
= pstrdup(NameStr(subform
->subname
));
204 ReleaseSysCache(tup
);
210 * Convert text array to list of strings.
212 * Note: the resulting list of strings is pallocated here.
215 textarray_to_stringlist(ArrayType
*textarray
)
222 deconstruct_array(textarray
,
223 TEXTOID
, -1, false, TYPALIGN_INT
,
224 &elems
, NULL
, &nelems
);
229 for (i
= 0; i
< nelems
; i
++)
230 res
= lappend(res
, makeString(TextDatumGetCString(elems
[i
])));
236 * Add new state record for a subscription table.
239 AddSubscriptionRelState(Oid subid
, Oid relid
, char state
,
244 bool nulls
[Natts_pg_subscription_rel
];
245 Datum values
[Natts_pg_subscription_rel
];
247 LockSharedObject(SubscriptionRelationId
, subid
, 0, AccessShareLock
);
249 rel
= table_open(SubscriptionRelRelationId
, RowExclusiveLock
);
251 /* Try finding existing mapping. */
252 tup
= SearchSysCacheCopy2(SUBSCRIPTIONRELMAP
,
253 ObjectIdGetDatum(relid
),
254 ObjectIdGetDatum(subid
));
255 if (HeapTupleIsValid(tup
))
256 elog(ERROR
, "subscription table %u in subscription %u already exists",
259 /* Form the tuple. */
260 memset(values
, 0, sizeof(values
));
261 memset(nulls
, false, sizeof(nulls
));
262 values
[Anum_pg_subscription_rel_srsubid
- 1] = ObjectIdGetDatum(subid
);
263 values
[Anum_pg_subscription_rel_srrelid
- 1] = ObjectIdGetDatum(relid
);
264 values
[Anum_pg_subscription_rel_srsubstate
- 1] = CharGetDatum(state
);
265 if (sublsn
!= InvalidXLogRecPtr
)
266 values
[Anum_pg_subscription_rel_srsublsn
- 1] = LSNGetDatum(sublsn
);
268 nulls
[Anum_pg_subscription_rel_srsublsn
- 1] = true;
270 tup
= heap_form_tuple(RelationGetDescr(rel
), values
, nulls
);
272 /* Insert tuple into catalog. */
273 CatalogTupleInsert(rel
, tup
);
278 table_close(rel
, NoLock
);
282 * Update the state of a subscription table.
285 UpdateSubscriptionRelState(Oid subid
, Oid relid
, char state
,
290 bool nulls
[Natts_pg_subscription_rel
];
291 Datum values
[Natts_pg_subscription_rel
];
292 bool replaces
[Natts_pg_subscription_rel
];
294 LockSharedObject(SubscriptionRelationId
, subid
, 0, AccessShareLock
);
296 rel
= table_open(SubscriptionRelRelationId
, RowExclusiveLock
);
298 /* Try finding existing mapping. */
299 tup
= SearchSysCacheCopy2(SUBSCRIPTIONRELMAP
,
300 ObjectIdGetDatum(relid
),
301 ObjectIdGetDatum(subid
));
302 if (!HeapTupleIsValid(tup
))
303 elog(ERROR
, "subscription table %u in subscription %u does not exist",
306 /* Update the tuple. */
307 memset(values
, 0, sizeof(values
));
308 memset(nulls
, false, sizeof(nulls
));
309 memset(replaces
, false, sizeof(replaces
));
311 replaces
[Anum_pg_subscription_rel_srsubstate
- 1] = true;
312 values
[Anum_pg_subscription_rel_srsubstate
- 1] = CharGetDatum(state
);
314 replaces
[Anum_pg_subscription_rel_srsublsn
- 1] = true;
315 if (sublsn
!= InvalidXLogRecPtr
)
316 values
[Anum_pg_subscription_rel_srsublsn
- 1] = LSNGetDatum(sublsn
);
318 nulls
[Anum_pg_subscription_rel_srsublsn
- 1] = true;
320 tup
= heap_modify_tuple(tup
, RelationGetDescr(rel
), values
, nulls
,
323 /* Update the catalog. */
324 CatalogTupleUpdate(rel
, &tup
->t_self
, tup
);
327 table_close(rel
, NoLock
);
331 * Get state of subscription table.
333 * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription.
336 GetSubscriptionRelState(Oid subid
, Oid relid
, XLogRecPtr
*sublsn
)
345 * This is to avoid the race condition with AlterSubscription which tries
346 * to remove this relstate.
348 rel
= table_open(SubscriptionRelRelationId
, AccessShareLock
);
350 /* Try finding the mapping. */
351 tup
= SearchSysCache2(SUBSCRIPTIONRELMAP
,
352 ObjectIdGetDatum(relid
),
353 ObjectIdGetDatum(subid
));
355 if (!HeapTupleIsValid(tup
))
357 table_close(rel
, AccessShareLock
);
358 *sublsn
= InvalidXLogRecPtr
;
359 return SUBREL_STATE_UNKNOWN
;
363 substate
= ((Form_pg_subscription_rel
) GETSTRUCT(tup
))->srsubstate
;
366 d
= SysCacheGetAttr(SUBSCRIPTIONRELMAP
, tup
,
367 Anum_pg_subscription_rel_srsublsn
, &isnull
);
369 *sublsn
= InvalidXLogRecPtr
;
371 *sublsn
= DatumGetLSN(d
);
374 ReleaseSysCache(tup
);
376 table_close(rel
, AccessShareLock
);
382 * Drop subscription relation mapping. These can be for a particular
383 * subscription, or for a particular relation, or both.
386 RemoveSubscriptionRel(Oid subid
, Oid relid
)
394 rel
= table_open(SubscriptionRelRelationId
, RowExclusiveLock
);
396 if (OidIsValid(subid
))
398 ScanKeyInit(&skey
[nkeys
++],
399 Anum_pg_subscription_rel_srsubid
,
400 BTEqualStrategyNumber
,
402 ObjectIdGetDatum(subid
));
405 if (OidIsValid(relid
))
407 ScanKeyInit(&skey
[nkeys
++],
408 Anum_pg_subscription_rel_srrelid
,
409 BTEqualStrategyNumber
,
411 ObjectIdGetDatum(relid
));
414 /* Do the search and delete what we found. */
415 scan
= table_beginscan_catalog(rel
, nkeys
, skey
);
416 while (HeapTupleIsValid(tup
= heap_getnext(scan
, ForwardScanDirection
)))
418 Form_pg_subscription_rel subrel
;
420 subrel
= (Form_pg_subscription_rel
) GETSTRUCT(tup
);
423 * We don't allow to drop the relation mapping when the table
424 * synchronization is in progress unless the caller updates the
425 * corresponding subscription as well. This is to ensure that we don't
426 * leave tablesync slots or origins in the system when the
427 * corresponding table is dropped.
429 if (!OidIsValid(subid
) && subrel
->srsubstate
!= SUBREL_STATE_READY
)
432 (errcode(ERRCODE_INVALID_PARAMETER_VALUE
),
433 errmsg("could not drop relation mapping for subscription \"%s\"",
434 get_subscription_name(subrel
->srsubid
, false)),
435 errdetail("Table synchronization for relation \"%s\" is in progress and is in state \"%c\".",
436 get_rel_name(relid
), subrel
->srsubstate
),
439 * translator: first %s is a SQL ALTER command and second %s is a
442 errhint("Use %s to enable subscription if not already enabled or use %s to drop the subscription.",
443 "ALTER SUBSCRIPTION ... ENABLE",
444 "DROP SUBSCRIPTION ...")));
447 CatalogTupleDelete(rel
, &tup
->t_self
);
451 table_close(rel
, RowExclusiveLock
);
455 * Does the subscription have any relations?
457 * Use this function only to know true/false, and when you have no need for the
458 * List returned by GetSubscriptionRelations.
461 HasSubscriptionRelations(Oid subid
)
468 rel
= table_open(SubscriptionRelRelationId
, AccessShareLock
);
470 ScanKeyInit(&skey
[0],
471 Anum_pg_subscription_rel_srsubid
,
472 BTEqualStrategyNumber
, F_OIDEQ
,
473 ObjectIdGetDatum(subid
));
475 scan
= systable_beginscan(rel
, InvalidOid
, false,
478 /* If even a single tuple exists then the subscription has tables. */
479 has_subrels
= HeapTupleIsValid(systable_getnext(scan
));
482 systable_endscan(scan
);
483 table_close(rel
, AccessShareLock
);
489 * Get all relations for subscription.
491 * Returned list is palloc'ed in current memory context.
494 GetSubscriptionRelations(Oid subid
)
502 rel
= table_open(SubscriptionRelRelationId
, AccessShareLock
);
504 ScanKeyInit(&skey
[0],
505 Anum_pg_subscription_rel_srsubid
,
506 BTEqualStrategyNumber
, F_OIDEQ
,
507 ObjectIdGetDatum(subid
));
509 scan
= systable_beginscan(rel
, InvalidOid
, false,
512 while (HeapTupleIsValid(tup
= systable_getnext(scan
)))
514 Form_pg_subscription_rel subrel
;
515 SubscriptionRelState
*relstate
;
519 subrel
= (Form_pg_subscription_rel
) GETSTRUCT(tup
);
521 relstate
= (SubscriptionRelState
*) palloc(sizeof(SubscriptionRelState
));
522 relstate
->relid
= subrel
->srrelid
;
523 relstate
->state
= subrel
->srsubstate
;
524 d
= SysCacheGetAttr(SUBSCRIPTIONRELMAP
, tup
,
525 Anum_pg_subscription_rel_srsublsn
, &isnull
);
527 relstate
->lsn
= InvalidXLogRecPtr
;
529 relstate
->lsn
= DatumGetLSN(d
);
531 res
= lappend(res
, relstate
);
535 systable_endscan(scan
);
536 table_close(rel
, AccessShareLock
);
542 * Get all relations for subscription that are not in a ready state.
544 * Returned list is palloc'ed in current memory context.
547 GetSubscriptionNotReadyRelations(Oid subid
)
556 rel
= table_open(SubscriptionRelRelationId
, AccessShareLock
);
558 ScanKeyInit(&skey
[nkeys
++],
559 Anum_pg_subscription_rel_srsubid
,
560 BTEqualStrategyNumber
, F_OIDEQ
,
561 ObjectIdGetDatum(subid
));
563 ScanKeyInit(&skey
[nkeys
++],
564 Anum_pg_subscription_rel_srsubstate
,
565 BTEqualStrategyNumber
, F_CHARNE
,
566 CharGetDatum(SUBREL_STATE_READY
));
568 scan
= systable_beginscan(rel
, InvalidOid
, false,
571 while (HeapTupleIsValid(tup
= systable_getnext(scan
)))
573 Form_pg_subscription_rel subrel
;
574 SubscriptionRelState
*relstate
;
578 subrel
= (Form_pg_subscription_rel
) GETSTRUCT(tup
);
580 relstate
= (SubscriptionRelState
*) palloc(sizeof(SubscriptionRelState
));
581 relstate
->relid
= subrel
->srrelid
;
582 relstate
->state
= subrel
->srsubstate
;
583 d
= SysCacheGetAttr(SUBSCRIPTIONRELMAP
, tup
,
584 Anum_pg_subscription_rel_srsublsn
, &isnull
);
586 relstate
->lsn
= InvalidXLogRecPtr
;
588 relstate
->lsn
= DatumGetLSN(d
);
590 res
= lappend(res
, relstate
);
594 systable_endscan(scan
);
595 table_close(rel
, AccessShareLock
);