1 /* -------------------------------------------------------------------------
4 * Sample background worker code that demonstrates various coding
5 * patterns: establishing a database connection; starting and committing
6 * transactions; using GUC variables, and heeding SIGHUP to reread
7 * the configuration file; reporting to pg_stat_activity; using the
8 * process latch to sleep and exit in case of postmaster death.
10 * This code connects to a database, creates a schema and table, and summarizes
11 * the numbers contained therein. To see it working, insert an initial value
12 * with "total" type and some initial value; then insert some other rows with
13 * "delta" type. Delta rows will be deleted by this worker and their values
14 * aggregated into the total.
16 * Copyright (c) 2013-2022, PostgreSQL Global Development Group
19 * src/test/modules/worker_spi/worker_spi.c
21 * -------------------------------------------------------------------------
25 /* These are always necessary for a bgworker */
26 #include "miscadmin.h"
27 #include "postmaster/bgworker.h"
28 #include "postmaster/interrupt.h"
29 #include "storage/ipc.h"
30 #include "storage/latch.h"
31 #include "storage/lwlock.h"
32 #include "storage/proc.h"
33 #include "storage/shmem.h"
35 /* these headers are used by this particular worker's code */
36 #include "access/xact.h"
37 #include "executor/spi.h"
39 #include "lib/stringinfo.h"
41 #include "utils/builtins.h"
42 #include "utils/snapmgr.h"
43 #include "tcop/utility.h"
47 PG_FUNCTION_INFO_V1(worker_spi_launch
);
50 void worker_spi_main(Datum
) pg_attribute_noreturn();
53 static int worker_spi_naptime
= 10;
54 static int worker_spi_total_workers
= 2;
55 static char *worker_spi_database
= NULL
;
58 typedef struct worktable
65 * Initialize workspace for a worker process: create the schema if it doesn't
69 initialize_worker_spi(worktable
*table
)
76 SetCurrentStatementStartTimestamp();
77 StartTransactionCommand();
79 PushActiveSnapshot(GetTransactionSnapshot());
80 pgstat_report_activity(STATE_RUNNING
, "initializing worker_spi schema");
82 /* XXX could we use CREATE SCHEMA IF NOT EXISTS? */
84 appendStringInfo(&buf
, "select count(*) from pg_namespace where nspname = '%s'",
87 debug_query_string
= buf
.data
;
88 ret
= SPI_execute(buf
.data
, true, 0);
89 if (ret
!= SPI_OK_SELECT
)
90 elog(FATAL
, "SPI_execute failed: error code %d", ret
);
92 if (SPI_processed
!= 1)
93 elog(FATAL
, "not a singleton result");
95 ntup
= DatumGetInt64(SPI_getbinval(SPI_tuptable
->vals
[0],
96 SPI_tuptable
->tupdesc
,
99 elog(FATAL
, "null result");
103 debug_query_string
= NULL
;
104 resetStringInfo(&buf
);
105 appendStringInfo(&buf
,
106 "CREATE SCHEMA \"%s\" "
107 "CREATE TABLE \"%s\" ("
108 " type text CHECK (type IN ('total', 'delta')), "
110 "CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
111 "WHERE type = 'total'",
112 table
->schema
, table
->name
, table
->name
, table
->name
);
114 /* set statement start time */
115 SetCurrentStatementStartTimestamp();
117 debug_query_string
= buf
.data
;
118 ret
= SPI_execute(buf
.data
, false, 0);
120 if (ret
!= SPI_OK_UTILITY
)
121 elog(FATAL
, "failed to create my schema");
123 debug_query_string
= NULL
; /* rest is not statement-specific */
128 CommitTransactionCommand();
129 debug_query_string
= NULL
;
130 pgstat_report_activity(STATE_IDLE
, NULL
);
134 worker_spi_main(Datum main_arg
)
136 int index
= DatumGetInt32(main_arg
);
141 table
= palloc(sizeof(worktable
));
142 sprintf(name
, "schema%d", index
);
143 table
->schema
= pstrdup(name
);
144 table
->name
= pstrdup("counted");
146 /* Establish signal handlers before unblocking signals. */
147 pqsignal(SIGHUP
, SignalHandlerForConfigReload
);
148 pqsignal(SIGTERM
, die
);
150 /* We're now ready to receive signals */
151 BackgroundWorkerUnblockSignals();
153 /* Connect to our database */
154 BackgroundWorkerInitializeConnection(worker_spi_database
, NULL
, 0);
156 elog(LOG
, "%s initialized with %s.%s",
157 MyBgworkerEntry
->bgw_name
, table
->schema
, table
->name
);
158 initialize_worker_spi(table
);
161 * Quote identifiers passed to us. Note that this must be done after
162 * initialize_worker_spi, because that routine assumes the names are not
165 * Note some memory might be leaked here.
167 table
->schema
= quote_identifier(table
->schema
);
168 table
->name
= quote_identifier(table
->name
);
170 initStringInfo(&buf
);
171 appendStringInfo(&buf
,
172 "WITH deleted AS (DELETE "
174 "WHERE type = 'delta' RETURNING value), "
175 "total AS (SELECT coalesce(sum(value), 0) as sum "
178 "SET value = %s.value + total.sum "
179 "FROM total WHERE type = 'total' "
180 "RETURNING %s.value",
181 table
->schema
, table
->name
,
182 table
->schema
, table
->name
,
187 * Main loop: do this until SIGTERM is received and processed by
195 * Background workers mustn't call usleep() or any direct equivalent:
196 * instead, they may wait on their process latch, which sleeps as
197 * necessary, but is awakened if postmaster dies. That way the
198 * background process goes away immediately in an emergency.
200 (void) WaitLatch(MyLatch
,
201 WL_LATCH_SET
| WL_TIMEOUT
| WL_EXIT_ON_PM_DEATH
,
202 worker_spi_naptime
* 1000L,
206 CHECK_FOR_INTERRUPTS();
209 * In case of a SIGHUP, just reload the configuration.
211 if (ConfigReloadPending
)
213 ConfigReloadPending
= false;
214 ProcessConfigFile(PGC_SIGHUP
);
218 * Start a transaction on which we can run queries. Note that each
219 * StartTransactionCommand() call should be preceded by a
220 * SetCurrentStatementStartTimestamp() call, which sets both the time
221 * for the statement we're about the run, and also the transaction
222 * start time. Also, each other query sent to SPI should probably be
223 * preceded by SetCurrentStatementStartTimestamp(), so that statement
224 * start time is always up to date.
226 * The SPI_connect() call lets us run queries through the SPI manager,
227 * and the PushActiveSnapshot() call creates an "active" snapshot
228 * which is necessary for queries to have MVCC data to work on.
230 * The pgstat_report_activity() call makes our activity visible
231 * through the pgstat views.
233 SetCurrentStatementStartTimestamp();
234 StartTransactionCommand();
236 PushActiveSnapshot(GetTransactionSnapshot());
237 debug_query_string
= buf
.data
;
238 pgstat_report_activity(STATE_RUNNING
, buf
.data
);
240 /* We can now execute queries via SPI */
241 ret
= SPI_execute(buf
.data
, false, 0);
243 if (ret
!= SPI_OK_UPDATE_RETURNING
)
244 elog(FATAL
, "cannot select from table %s.%s: error code %d",
245 table
->schema
, table
->name
, ret
);
247 if (SPI_processed
> 0)
252 val
= DatumGetInt32(SPI_getbinval(SPI_tuptable
->vals
[0],
253 SPI_tuptable
->tupdesc
,
256 elog(LOG
, "%s: count in %s.%s is now %d",
257 MyBgworkerEntry
->bgw_name
,
258 table
->schema
, table
->name
, val
);
262 * And finish our transaction.
266 CommitTransactionCommand();
267 debug_query_string
= NULL
;
268 pgstat_report_stat(false);
269 pgstat_report_activity(STATE_IDLE
, NULL
);
276 * Entrypoint of this module.
278 * We register more than one worker process here, to demonstrate how that can
284 BackgroundWorker worker
;
286 /* get the configuration */
287 DefineCustomIntVariable("worker_spi.naptime",
288 "Duration between each check (in seconds).",
300 if (!process_shared_preload_libraries_in_progress
)
303 DefineCustomIntVariable("worker_spi.total_workers",
304 "Number of workers.",
306 &worker_spi_total_workers
,
316 DefineCustomStringVariable("worker_spi.database",
317 "Database to connect to.",
319 &worker_spi_database
,
325 EmitWarningsOnPlaceholders("worker_spi");
327 /* set up common data for all our workers */
328 memset(&worker
, 0, sizeof(worker
));
329 worker
.bgw_flags
= BGWORKER_SHMEM_ACCESS
|
330 BGWORKER_BACKEND_DATABASE_CONNECTION
;
331 worker
.bgw_start_time
= BgWorkerStart_RecoveryFinished
;
332 worker
.bgw_restart_time
= BGW_NEVER_RESTART
;
333 sprintf(worker
.bgw_library_name
, "worker_spi");
334 sprintf(worker
.bgw_function_name
, "worker_spi_main");
335 worker
.bgw_notify_pid
= 0;
338 * Now fill in worker-specific data, and do the actual registrations.
340 for (int i
= 1; i
<= worker_spi_total_workers
; i
++)
342 snprintf(worker
.bgw_name
, BGW_MAXLEN
, "worker_spi worker %d", i
);
343 snprintf(worker
.bgw_type
, BGW_MAXLEN
, "worker_spi");
344 worker
.bgw_main_arg
= Int32GetDatum(i
);
346 RegisterBackgroundWorker(&worker
);
351 * Dynamically launch an SPI worker.
354 worker_spi_launch(PG_FUNCTION_ARGS
)
356 int32 i
= PG_GETARG_INT32(0);
357 BackgroundWorker worker
;
358 BackgroundWorkerHandle
*handle
;
359 BgwHandleStatus status
;
362 memset(&worker
, 0, sizeof(worker
));
363 worker
.bgw_flags
= BGWORKER_SHMEM_ACCESS
|
364 BGWORKER_BACKEND_DATABASE_CONNECTION
;
365 worker
.bgw_start_time
= BgWorkerStart_RecoveryFinished
;
366 worker
.bgw_restart_time
= BGW_NEVER_RESTART
;
367 sprintf(worker
.bgw_library_name
, "worker_spi");
368 sprintf(worker
.bgw_function_name
, "worker_spi_main");
369 snprintf(worker
.bgw_name
, BGW_MAXLEN
, "worker_spi worker %d", i
);
370 snprintf(worker
.bgw_type
, BGW_MAXLEN
, "worker_spi");
371 worker
.bgw_main_arg
= Int32GetDatum(i
);
372 /* set bgw_notify_pid so that we can use WaitForBackgroundWorkerStartup */
373 worker
.bgw_notify_pid
= MyProcPid
;
375 if (!RegisterDynamicBackgroundWorker(&worker
, &handle
))
378 status
= WaitForBackgroundWorkerStartup(handle
, &pid
);
380 if (status
== BGWH_STOPPED
)
382 (errcode(ERRCODE_INSUFFICIENT_RESOURCES
),
383 errmsg("could not start background process"),
384 errhint("More details may be available in the server log.")));
385 if (status
== BGWH_POSTMASTER_DIED
)
387 (errcode(ERRCODE_INSUFFICIENT_RESOURCES
),
388 errmsg("cannot start background processes without postmaster"),
389 errhint("Kill all remaining database processes and restart the database.")));
390 Assert(status
== BGWH_STARTED
);
392 PG_RETURN_INT32(pid
);