1 /*-------------------------------------------------------------------------
3 * libpq-be-fe-helpers.h
4 * Helper functions for using libpq in extensions
6 * Code built directly into the backend is not allowed to link to libpq
7 * directly. Extension code is allowed to use libpq however. However, libpq
8 * used in extensions has to be careful not to block inside libpq, otherwise
9 * interrupts will not be processed, leading to issues like unresolvable
10 * deadlocks. Backend code also needs to take care to acquire/release an
11 * external fd for the connection, otherwise fd.c's accounting of fd's is
14 * This file provides helper functions to make it easier to comply with these
15 * rules. It is a header only library as it needs to be linked into each
16 * extension using libpq, and it seems too small to be worth adding a
17 * dedicated static library for.
19 * TODO: For historical reasons the connections established here are not put
20 * into non-blocking mode. That can lead to blocking even when only the async
21 * libpq functions are used. This should be fixed.
23 * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
24 * Portions Copyright (c) 1994, Regents of the University of California
26 * src/include/libpq/libpq-be-fe-helpers.h
28 *-------------------------------------------------------------------------
30 #ifndef LIBPQ_BE_FE_HELPERS_H
31 #define LIBPQ_BE_FE_HELPERS_H
34 * Despite the name, BUILDING_DLL is set only when building code directly part
35 * of the backend. Which also is where libpq isn't allowed to be
36 * used. Obviously this doesn't protect against libpq-fe.h getting included
37 * otherwise, but perhaps still protects against a few mistakes...
40 #error "libpq may not be used code directly built into the backend"
44 #include "miscadmin.h"
45 #include "storage/fd.h"
46 #include "storage/latch.h"
47 #include "utils/timestamp.h"
48 #include "utils/wait_event.h"
51 static inline void libpqsrv_connect_prepare(void);
52 static inline void libpqsrv_connect_internal(PGconn
*conn
, uint32 wait_event_info
);
53 static inline PGresult
*libpqsrv_get_result_last(PGconn
*conn
, uint32 wait_event_info
);
54 static inline PGresult
*libpqsrv_get_result(PGconn
*conn
, uint32 wait_event_info
);
58 * PQconnectdb() wrapper that reserves a file descriptor and processes
59 * interrupts during connection establishment.
61 * Throws an error if AcquireExternalFD() fails, but does not throw if
62 * connection establishment itself fails. Callers need to use PQstatus() to
63 * check if connection establishment succeeded.
65 static inline PGconn
*
66 libpqsrv_connect(const char *conninfo
, uint32 wait_event_info
)
70 libpqsrv_connect_prepare();
72 conn
= PQconnectStart(conninfo
);
74 libpqsrv_connect_internal(conn
, wait_event_info
);
80 * Like libpqsrv_connect(), except that this is a wrapper for
81 * PQconnectdbParams().
83 static inline PGconn
*
84 libpqsrv_connect_params(const char *const *keywords
,
85 const char *const *values
,
87 uint32 wait_event_info
)
91 libpqsrv_connect_prepare();
93 conn
= PQconnectStartParams(keywords
, values
, expand_dbname
);
95 libpqsrv_connect_internal(conn
, wait_event_info
);
101 * PQfinish() wrapper that additionally releases the reserved file descriptor.
103 * It is allowed to call this with a NULL pgconn iff NULL was returned by
107 libpqsrv_disconnect(PGconn
*conn
)
110 * If no connection was established, we haven't reserved an FD for it (or
111 * already released it). This rule makes it easier to write PG_CATCH()
112 * handlers for this facility's users.
114 * See also libpqsrv_connect_internal().
124 /* internal helper functions follow */
128 * Helper function for all connection establishment functions.
131 libpqsrv_connect_prepare(void)
134 * We must obey fd.c's limit on non-virtual file descriptors. Assume that
135 * a PGconn represents one long-lived FD. (Doing this here also ensures
136 * that VFDs are closed if needed to make room.)
138 if (!AcquireExternalFD())
140 #ifndef WIN32 /* can't write #if within ereport() macro */
142 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION
),
143 errmsg("could not establish connection"),
144 errdetail("There are too many open files on the local server."),
145 errhint("Raise the server's max_files_per_process and/or \"ulimit -n\" limits.")));
148 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION
),
149 errmsg("could not establish connection"),
150 errdetail("There are too many open files on the local server."),
151 errhint("Raise the server's max_files_per_process setting.")));
157 * Helper function for all connection establishment functions.
160 libpqsrv_connect_internal(PGconn
*conn
, uint32 wait_event_info
)
163 * With conn == NULL libpqsrv_disconnect() wouldn't release the FD. So do
173 * Can't wait without a socket. Note that we don't want to close the libpq
174 * connection yet, so callers can emit a useful error.
176 if (PQstatus(conn
) == CONNECTION_BAD
)
180 * WaitLatchOrSocket() can conceivably fail, handle that case here instead
181 * of requiring all callers to do so.
185 PostgresPollingStatusType status
;
188 * Poll connection until we have OK or FAILED status.
190 * Per spec for PQconnectPoll, first wait till socket is write-ready.
192 status
= PGRES_POLLING_WRITING
;
193 while (status
!= PGRES_POLLING_OK
&& status
!= PGRES_POLLING_FAILED
)
198 if (status
== PGRES_POLLING_READING
)
199 io_flag
= WL_SOCKET_READABLE
;
203 * Windows needs a different test while waiting for
206 else if (PQstatus(conn
) == CONNECTION_STARTED
)
207 io_flag
= WL_SOCKET_CONNECTED
;
210 io_flag
= WL_SOCKET_WRITEABLE
;
212 rc
= WaitLatchOrSocket(MyLatch
,
213 WL_EXIT_ON_PM_DEATH
| WL_LATCH_SET
| io_flag
,
219 if (rc
& WL_LATCH_SET
)
222 CHECK_FOR_INTERRUPTS();
225 /* If socket is ready, advance the libpq state machine */
227 status
= PQconnectPoll(conn
);
233 * If an error is thrown here, the callers won't call
234 * libpqsrv_disconnect() with a conn, so release resources
246 * PQexec() wrapper that processes interrupts.
248 * Unless PQsetnonblocking(conn, 1) is in effect, this can't process
249 * interrupts while pushing the query text to the server. Consider that
250 * setting if query strings can be long relative to TCP buffer size.
252 * This has the preconditions of PQsendQuery(), not those of PQexec(). Most
253 * notably, PQexec() would silently discard any prior query results.
255 static inline PGresult
*
256 libpqsrv_exec(PGconn
*conn
, const char *query
, uint32 wait_event_info
)
258 if (!PQsendQuery(conn
, query
))
260 return libpqsrv_get_result_last(conn
, wait_event_info
);
264 * PQexecParams() wrapper that processes interrupts.
266 * See notes at libpqsrv_exec().
268 static inline PGresult
*
269 libpqsrv_exec_params(PGconn
*conn
,
272 const Oid
*paramTypes
,
273 const char *const *paramValues
,
274 const int *paramLengths
,
275 const int *paramFormats
,
277 uint32 wait_event_info
)
279 if (!PQsendQueryParams(conn
, command
, nParams
, paramTypes
, paramValues
,
280 paramLengths
, paramFormats
, resultFormat
))
282 return libpqsrv_get_result_last(conn
, wait_event_info
);
286 * Like PQexec(), loop over PQgetResult() until it returns NULL or another
287 * terminal state. Return the last non-NULL result or the terminal state.
289 static inline PGresult
*
290 libpqsrv_get_result_last(PGconn
*conn
, uint32 wait_event_info
)
292 PGresult
*volatile lastResult
= NULL
;
294 /* In what follows, do not leak any PGresults on an error. */
299 /* Wait for, and collect, the next PGresult. */
302 result
= libpqsrv_get_result(conn
, wait_event_info
);
304 break; /* query is complete, or failure */
307 * Emulate PQexec()'s behavior of returning the last result when
313 if (PQresultStatus(lastResult
) == PGRES_COPY_IN
||
314 PQresultStatus(lastResult
) == PGRES_COPY_OUT
||
315 PQresultStatus(lastResult
) == PGRES_COPY_BOTH
||
316 PQstatus(conn
) == CONNECTION_BAD
)
331 * Perform the equivalent of PQgetResult(), but watch for interrupts.
333 static inline PGresult
*
334 libpqsrv_get_result(PGconn
*conn
, uint32 wait_event_info
)
337 * Collect data until PQgetResult is ready to get the result without
340 while (PQisBusy(conn
))
344 rc
= WaitLatchOrSocket(MyLatch
,
345 WL_EXIT_ON_PM_DEATH
| WL_LATCH_SET
|
352 if (rc
& WL_LATCH_SET
)
355 CHECK_FOR_INTERRUPTS();
358 /* Consume whatever data is available from the socket */
359 if (PQconsumeInput(conn
) == 0)
361 /* trouble; expect PQgetResult() to return NULL */
366 /* Now we can collect and return the next PGresult */
367 return PQgetResult(conn
);
371 * Submit a cancel request to the given connection, waiting only until
374 * We sleep interruptibly until we receive confirmation that the cancel
375 * request has been accepted, and if it is, return NULL; if the cancel
376 * request fails, return an error message string (which is not to be
379 * For other problems (to wit: OOM when strdup'ing an error message from
380 * libpq), this function can ereport(ERROR).
382 * Note: this function leaks a string's worth of memory when reporting
383 * libpq errors. Make sure to call it in a transient memory context.
385 static inline const char *
386 libpqsrv_cancel(PGconn
*conn
, TimestampTz endtime
)
388 PGcancelConn
*cancel_conn
;
389 const char *error
= NULL
;
391 cancel_conn
= PQcancelCreate(conn
);
392 if (cancel_conn
== NULL
)
393 return "out of memory";
395 /* In what follows, do not leak any PGcancelConn on any errors. */
399 if (!PQcancelStart(cancel_conn
))
401 error
= pchomp(PQcancelErrorMessage(cancel_conn
));
407 PostgresPollingStatusType pollres
;
410 int waitEvents
= WL_LATCH_SET
| WL_TIMEOUT
| WL_EXIT_ON_PM_DEATH
;
412 pollres
= PQcancelPoll(cancel_conn
);
413 if (pollres
== PGRES_POLLING_OK
)
414 break; /* success! */
416 /* If timeout has expired, give up, else get sleep time. */
417 now
= GetCurrentTimestamp();
418 cur_timeout
= TimestampDifferenceMilliseconds(now
, endtime
);
419 if (cur_timeout
<= 0)
421 error
= "cancel request timed out";
427 case PGRES_POLLING_READING
:
428 waitEvents
|= WL_SOCKET_READABLE
;
430 case PGRES_POLLING_WRITING
:
431 waitEvents
|= WL_SOCKET_WRITEABLE
;
434 error
= pchomp(PQcancelErrorMessage(cancel_conn
));
438 /* Sleep until there's something to do */
439 WaitLatchOrSocket(MyLatch
, waitEvents
, PQcancelSocket(cancel_conn
),
440 cur_timeout
, PG_WAIT_CLIENT
);
444 CHECK_FOR_INTERRUPTS();
450 PQcancelFinish(cancel_conn
);
457 #endif /* LIBPQ_BE_FE_HELPERS_H */