Update copyright for 2022
[pgsql.git] / src / test / modules / worker_spi / worker_spi.c
blob05ced63780e6b11fd2f6187152900c0ddb52e7cb
1 /* -------------------------------------------------------------------------
3 * worker_spi.c
4 * Sample background worker code that demonstrates various coding
5 * patterns: establishing a database connection; starting and committing
6 * transactions; using GUC variables, and heeding SIGHUP to reread
7 * the configuration file; reporting to pg_stat_activity; using the
8 * process latch to sleep and exit in case of postmaster death.
10 * This code connects to a database, creates a schema and table, and summarizes
11 * the numbers contained therein. To see it working, insert an initial value
12 * with "total" type and some initial value; then insert some other rows with
13 * "delta" type. Delta rows will be deleted by this worker and their values
14 * aggregated into the total.
16 * Copyright (c) 2013-2022, PostgreSQL Global Development Group
18 * IDENTIFICATION
19 * src/test/modules/worker_spi/worker_spi.c
21 * -------------------------------------------------------------------------
23 #include "postgres.h"
25 /* These are always necessary for a bgworker */
26 #include "miscadmin.h"
27 #include "postmaster/bgworker.h"
28 #include "postmaster/interrupt.h"
29 #include "storage/ipc.h"
30 #include "storage/latch.h"
31 #include "storage/lwlock.h"
32 #include "storage/proc.h"
33 #include "storage/shmem.h"
35 /* these headers are used by this particular worker's code */
36 #include "access/xact.h"
37 #include "executor/spi.h"
38 #include "fmgr.h"
39 #include "lib/stringinfo.h"
40 #include "pgstat.h"
41 #include "utils/builtins.h"
42 #include "utils/snapmgr.h"
43 #include "tcop/utility.h"
45 PG_MODULE_MAGIC;
47 PG_FUNCTION_INFO_V1(worker_spi_launch);
49 void _PG_init(void);
50 void worker_spi_main(Datum) pg_attribute_noreturn();
52 /* GUC variables */
53 static int worker_spi_naptime = 10;
54 static int worker_spi_total_workers = 2;
55 static char *worker_spi_database = NULL;
58 typedef struct worktable
60 const char *schema;
61 const char *name;
62 } worktable;
65 * Initialize workspace for a worker process: create the schema if it doesn't
66 * already exist.
68 static void
69 initialize_worker_spi(worktable *table)
71 int ret;
72 int ntup;
73 bool isnull;
74 StringInfoData buf;
76 SetCurrentStatementStartTimestamp();
77 StartTransactionCommand();
78 SPI_connect();
79 PushActiveSnapshot(GetTransactionSnapshot());
80 pgstat_report_activity(STATE_RUNNING, "initializing worker_spi schema");
82 /* XXX could we use CREATE SCHEMA IF NOT EXISTS? */
83 initStringInfo(&buf);
84 appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'",
85 table->schema);
87 debug_query_string = buf.data;
88 ret = SPI_execute(buf.data, true, 0);
89 if (ret != SPI_OK_SELECT)
90 elog(FATAL, "SPI_execute failed: error code %d", ret);
92 if (SPI_processed != 1)
93 elog(FATAL, "not a singleton result");
95 ntup = DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[0],
96 SPI_tuptable->tupdesc,
97 1, &isnull));
98 if (isnull)
99 elog(FATAL, "null result");
101 if (ntup == 0)
103 debug_query_string = NULL;
104 resetStringInfo(&buf);
105 appendStringInfo(&buf,
106 "CREATE SCHEMA \"%s\" "
107 "CREATE TABLE \"%s\" ("
108 " type text CHECK (type IN ('total', 'delta')), "
109 " value integer)"
110 "CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
111 "WHERE type = 'total'",
112 table->schema, table->name, table->name, table->name);
114 /* set statement start time */
115 SetCurrentStatementStartTimestamp();
117 debug_query_string = buf.data;
118 ret = SPI_execute(buf.data, false, 0);
120 if (ret != SPI_OK_UTILITY)
121 elog(FATAL, "failed to create my schema");
123 debug_query_string = NULL; /* rest is not statement-specific */
126 SPI_finish();
127 PopActiveSnapshot();
128 CommitTransactionCommand();
129 debug_query_string = NULL;
130 pgstat_report_activity(STATE_IDLE, NULL);
133 void
134 worker_spi_main(Datum main_arg)
136 int index = DatumGetInt32(main_arg);
137 worktable *table;
138 StringInfoData buf;
139 char name[20];
141 table = palloc(sizeof(worktable));
142 sprintf(name, "schema%d", index);
143 table->schema = pstrdup(name);
144 table->name = pstrdup("counted");
146 /* Establish signal handlers before unblocking signals. */
147 pqsignal(SIGHUP, SignalHandlerForConfigReload);
148 pqsignal(SIGTERM, die);
150 /* We're now ready to receive signals */
151 BackgroundWorkerUnblockSignals();
153 /* Connect to our database */
154 BackgroundWorkerInitializeConnection(worker_spi_database, NULL, 0);
156 elog(LOG, "%s initialized with %s.%s",
157 MyBgworkerEntry->bgw_name, table->schema, table->name);
158 initialize_worker_spi(table);
161 * Quote identifiers passed to us. Note that this must be done after
162 * initialize_worker_spi, because that routine assumes the names are not
163 * quoted.
165 * Note some memory might be leaked here.
167 table->schema = quote_identifier(table->schema);
168 table->name = quote_identifier(table->name);
170 initStringInfo(&buf);
171 appendStringInfo(&buf,
172 "WITH deleted AS (DELETE "
173 "FROM %s.%s "
174 "WHERE type = 'delta' RETURNING value), "
175 "total AS (SELECT coalesce(sum(value), 0) as sum "
176 "FROM deleted) "
177 "UPDATE %s.%s "
178 "SET value = %s.value + total.sum "
179 "FROM total WHERE type = 'total' "
180 "RETURNING %s.value",
181 table->schema, table->name,
182 table->schema, table->name,
183 table->name,
184 table->name);
187 * Main loop: do this until SIGTERM is received and processed by
188 * ProcessInterrupts.
190 for (;;)
192 int ret;
195 * Background workers mustn't call usleep() or any direct equivalent:
196 * instead, they may wait on their process latch, which sleeps as
197 * necessary, but is awakened if postmaster dies. That way the
198 * background process goes away immediately in an emergency.
200 (void) WaitLatch(MyLatch,
201 WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
202 worker_spi_naptime * 1000L,
203 PG_WAIT_EXTENSION);
204 ResetLatch(MyLatch);
206 CHECK_FOR_INTERRUPTS();
209 * In case of a SIGHUP, just reload the configuration.
211 if (ConfigReloadPending)
213 ConfigReloadPending = false;
214 ProcessConfigFile(PGC_SIGHUP);
218 * Start a transaction on which we can run queries. Note that each
219 * StartTransactionCommand() call should be preceded by a
220 * SetCurrentStatementStartTimestamp() call, which sets both the time
221 * for the statement we're about the run, and also the transaction
222 * start time. Also, each other query sent to SPI should probably be
223 * preceded by SetCurrentStatementStartTimestamp(), so that statement
224 * start time is always up to date.
226 * The SPI_connect() call lets us run queries through the SPI manager,
227 * and the PushActiveSnapshot() call creates an "active" snapshot
228 * which is necessary for queries to have MVCC data to work on.
230 * The pgstat_report_activity() call makes our activity visible
231 * through the pgstat views.
233 SetCurrentStatementStartTimestamp();
234 StartTransactionCommand();
235 SPI_connect();
236 PushActiveSnapshot(GetTransactionSnapshot());
237 debug_query_string = buf.data;
238 pgstat_report_activity(STATE_RUNNING, buf.data);
240 /* We can now execute queries via SPI */
241 ret = SPI_execute(buf.data, false, 0);
243 if (ret != SPI_OK_UPDATE_RETURNING)
244 elog(FATAL, "cannot select from table %s.%s: error code %d",
245 table->schema, table->name, ret);
247 if (SPI_processed > 0)
249 bool isnull;
250 int32 val;
252 val = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
253 SPI_tuptable->tupdesc,
254 1, &isnull));
255 if (!isnull)
256 elog(LOG, "%s: count in %s.%s is now %d",
257 MyBgworkerEntry->bgw_name,
258 table->schema, table->name, val);
262 * And finish our transaction.
264 SPI_finish();
265 PopActiveSnapshot();
266 CommitTransactionCommand();
267 debug_query_string = NULL;
268 pgstat_report_stat(false);
269 pgstat_report_activity(STATE_IDLE, NULL);
272 /* Not reachable */
276 * Entrypoint of this module.
278 * We register more than one worker process here, to demonstrate how that can
279 * be done.
281 void
282 _PG_init(void)
284 BackgroundWorker worker;
286 /* get the configuration */
287 DefineCustomIntVariable("worker_spi.naptime",
288 "Duration between each check (in seconds).",
289 NULL,
290 &worker_spi_naptime,
293 INT_MAX,
294 PGC_SIGHUP,
296 NULL,
297 NULL,
298 NULL);
300 if (!process_shared_preload_libraries_in_progress)
301 return;
303 DefineCustomIntVariable("worker_spi.total_workers",
304 "Number of workers.",
305 NULL,
306 &worker_spi_total_workers,
309 100,
310 PGC_POSTMASTER,
312 NULL,
313 NULL,
314 NULL);
316 DefineCustomStringVariable("worker_spi.database",
317 "Database to connect to.",
318 NULL,
319 &worker_spi_database,
320 "postgres",
321 PGC_POSTMASTER,
323 NULL, NULL, NULL);
325 EmitWarningsOnPlaceholders("worker_spi");
327 /* set up common data for all our workers */
328 memset(&worker, 0, sizeof(worker));
329 worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
330 BGWORKER_BACKEND_DATABASE_CONNECTION;
331 worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
332 worker.bgw_restart_time = BGW_NEVER_RESTART;
333 sprintf(worker.bgw_library_name, "worker_spi");
334 sprintf(worker.bgw_function_name, "worker_spi_main");
335 worker.bgw_notify_pid = 0;
338 * Now fill in worker-specific data, and do the actual registrations.
340 for (int i = 1; i <= worker_spi_total_workers; i++)
342 snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi worker %d", i);
343 snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi");
344 worker.bgw_main_arg = Int32GetDatum(i);
346 RegisterBackgroundWorker(&worker);
351 * Dynamically launch an SPI worker.
353 Datum
354 worker_spi_launch(PG_FUNCTION_ARGS)
356 int32 i = PG_GETARG_INT32(0);
357 BackgroundWorker worker;
358 BackgroundWorkerHandle *handle;
359 BgwHandleStatus status;
360 pid_t pid;
362 memset(&worker, 0, sizeof(worker));
363 worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
364 BGWORKER_BACKEND_DATABASE_CONNECTION;
365 worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
366 worker.bgw_restart_time = BGW_NEVER_RESTART;
367 sprintf(worker.bgw_library_name, "worker_spi");
368 sprintf(worker.bgw_function_name, "worker_spi_main");
369 snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi worker %d", i);
370 snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi");
371 worker.bgw_main_arg = Int32GetDatum(i);
372 /* set bgw_notify_pid so that we can use WaitForBackgroundWorkerStartup */
373 worker.bgw_notify_pid = MyProcPid;
375 if (!RegisterDynamicBackgroundWorker(&worker, &handle))
376 PG_RETURN_NULL();
378 status = WaitForBackgroundWorkerStartup(handle, &pid);
380 if (status == BGWH_STOPPED)
381 ereport(ERROR,
382 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
383 errmsg("could not start background process"),
384 errhint("More details may be available in the server log.")));
385 if (status == BGWH_POSTMASTER_DIED)
386 ereport(ERROR,
387 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
388 errmsg("cannot start background processes without postmaster"),
389 errhint("Kill all remaining database processes and restart the database.")));
390 Assert(status == BGWH_STARTED);
392 PG_RETURN_INT32(pid);