doc: clarify that pg_global can _only_ be used for system tabs.
[pgsql.git] / src / fe_utils / parallel_slot.c
blob2be83b70f68c82d0887f250dd92d8bd4b63401eb
1 /*-------------------------------------------------------------------------
3 * parallel_slot.c
4 * Parallel support for front-end parallel database connections
7 * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
10 * src/fe_utils/parallel_slot.c
12 *-------------------------------------------------------------------------
15 #if defined(WIN32) && FD_SETSIZE < 1024
16 #error FD_SETSIZE needs to have been increased
17 #endif
19 #include "postgres_fe.h"
21 #include <sys/select.h>
23 #include "common/logging.h"
24 #include "fe_utils/cancel.h"
25 #include "fe_utils/parallel_slot.h"
26 #include "fe_utils/query_utils.h"
28 #define ERRCODE_UNDEFINED_TABLE "42P01"
30 static int select_loop(int maxFd, fd_set *workerset);
31 static bool processQueryResult(ParallelSlot *slot, PGresult *result);
34 * Process (and delete) a query result. Returns true if there's no problem,
35 * false otherwise. It's up to the handler to decide what constitutes a
36 * problem.
38 static bool
39 processQueryResult(ParallelSlot *slot, PGresult *result)
41 Assert(slot->handler != NULL);
43 /* On failure, the handler should return NULL after freeing the result */
44 if (!slot->handler(result, slot->connection, slot->handler_context))
45 return false;
47 /* Ok, we have to free it ourself */
48 PQclear(result);
49 return true;
53 * Consume all the results generated for the given connection until
54 * nothing remains. If at least one error is encountered, return false.
55 * Note that this will block if the connection is busy.
57 static bool
58 consumeQueryResult(ParallelSlot *slot)
60 bool ok = true;
61 PGresult *result;
63 SetCancelConn(slot->connection);
64 while ((result = PQgetResult(slot->connection)) != NULL)
66 if (!processQueryResult(slot, result))
67 ok = false;
69 ResetCancelConn();
70 return ok;
74 * Wait until a file descriptor from the given set becomes readable.
76 * Returns the number of ready descriptors, or -1 on failure (including
77 * getting a cancel request).
79 static int
80 select_loop(int maxFd, fd_set *workerset)
82 int i;
83 fd_set saveSet = *workerset;
85 if (CancelRequested)
86 return -1;
88 for (;;)
91 * On Windows, we need to check once in a while for cancel requests;
92 * on other platforms we rely on select() returning when interrupted.
94 struct timeval *tvp;
95 #ifdef WIN32
96 struct timeval tv = {0, 1000000};
98 tvp = &tv;
99 #else
100 tvp = NULL;
101 #endif
103 *workerset = saveSet;
104 i = select(maxFd + 1, workerset, NULL, NULL, tvp);
106 #ifdef WIN32
107 if (i == SOCKET_ERROR)
109 i = -1;
111 if (WSAGetLastError() == WSAEINTR)
112 errno = EINTR;
114 #endif
116 if (i < 0 && errno == EINTR)
117 continue; /* ignore this */
118 if (i < 0 || CancelRequested)
119 return -1; /* but not this */
120 if (i == 0)
121 continue; /* timeout (Win32 only) */
122 break;
125 return i;
129 * Return the offset of a suitable idle slot, or -1 if none are available. If
130 * the given dbname is not null, only idle slots connected to the given
131 * database are considered suitable, otherwise all idle connected slots are
132 * considered suitable.
134 static int
135 find_matching_idle_slot(const ParallelSlotArray *sa, const char *dbname)
137 int i;
139 for (i = 0; i < sa->numslots; i++)
141 if (sa->slots[i].inUse)
142 continue;
144 if (sa->slots[i].connection == NULL)
145 continue;
147 if (dbname == NULL ||
148 strcmp(PQdb(sa->slots[i].connection), dbname) == 0)
149 return i;
151 return -1;
155 * Return the offset of the first slot without a database connection, or -1 if
156 * all slots are connected.
158 static int
159 find_unconnected_slot(const ParallelSlotArray *sa)
161 int i;
163 for (i = 0; i < sa->numslots; i++)
165 if (sa->slots[i].inUse)
166 continue;
168 if (sa->slots[i].connection == NULL)
169 return i;
172 return -1;
176 * Return the offset of the first idle slot, or -1 if all slots are busy.
178 static int
179 find_any_idle_slot(const ParallelSlotArray *sa)
181 int i;
183 for (i = 0; i < sa->numslots; i++)
184 if (!sa->slots[i].inUse)
185 return i;
187 return -1;
191 * Wait for any slot's connection to have query results, consume the results,
192 * and update the slot's status as appropriate. Returns true on success,
193 * false on cancellation, on error, or if no slots are connected.
195 static bool
196 wait_on_slots(ParallelSlotArray *sa)
198 int i;
199 fd_set slotset;
200 int maxFd = 0;
201 PGconn *cancelconn = NULL;
203 /* We must reconstruct the fd_set for each call to select_loop */
204 FD_ZERO(&slotset);
206 for (i = 0; i < sa->numslots; i++)
208 int sock;
210 /* We shouldn't get here if we still have slots without connections */
211 Assert(sa->slots[i].connection != NULL);
213 sock = PQsocket(sa->slots[i].connection);
216 * We don't really expect any connections to lose their sockets after
217 * startup, but just in case, cope by ignoring them.
219 if (sock < 0)
220 continue;
222 /* Keep track of the first valid connection we see. */
223 if (cancelconn == NULL)
224 cancelconn = sa->slots[i].connection;
226 FD_SET(sock, &slotset);
227 if (sock > maxFd)
228 maxFd = sock;
232 * If we get this far with no valid connections, processing cannot
233 * continue.
235 if (cancelconn == NULL)
236 return false;
238 SetCancelConn(cancelconn);
239 i = select_loop(maxFd, &slotset);
240 ResetCancelConn();
242 /* failure? */
243 if (i < 0)
244 return false;
246 for (i = 0; i < sa->numslots; i++)
248 int sock;
250 sock = PQsocket(sa->slots[i].connection);
252 if (sock >= 0 && FD_ISSET(sock, &slotset))
254 /* select() says input is available, so consume it */
255 PQconsumeInput(sa->slots[i].connection);
258 /* Collect result(s) as long as any are available */
259 while (!PQisBusy(sa->slots[i].connection))
261 PGresult *result = PQgetResult(sa->slots[i].connection);
263 if (result != NULL)
265 /* Handle and discard the command result */
266 if (!processQueryResult(&sa->slots[i], result))
267 return false;
269 else
271 /* This connection has become idle */
272 sa->slots[i].inUse = false;
273 ParallelSlotClearHandler(&sa->slots[i]);
274 break;
278 return true;
282 * Open a new database connection using the stored connection parameters and
283 * optionally a given dbname if not null, execute the stored initial command if
284 * any, and associate the new connection with the given slot.
286 static void
287 connect_slot(ParallelSlotArray *sa, int slotno, const char *dbname)
289 const char *old_override;
290 ParallelSlot *slot = &sa->slots[slotno];
292 old_override = sa->cparams->override_dbname;
293 if (dbname)
294 sa->cparams->override_dbname = dbname;
295 slot->connection = connectDatabase(sa->cparams, sa->progname, sa->echo, false, true);
296 sa->cparams->override_dbname = old_override;
299 * POSIX defines FD_SETSIZE as the highest file descriptor acceptable to
300 * FD_SET() and allied macros. Windows defines it as a ceiling on the
301 * count of file descriptors in the set, not a ceiling on the value of
302 * each file descriptor; see
303 * https://learn.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-select
304 * and
305 * https://learn.microsoft.com/en-us/windows/win32/api/winsock/ns-winsock-fd_set.
306 * We can't ignore that, because Windows starts file descriptors at a
307 * higher value, delays reuse, and skips values. With less than ten
308 * concurrent file descriptors, opened and closed rapidly, one can reach
309 * file descriptor 1024.
311 * Doing a hard exit here is a bit grotty, but it doesn't seem worth
312 * complicating the API to make it less grotty.
314 #ifdef WIN32
315 if (slotno >= FD_SETSIZE)
317 pg_log_error("too many jobs for this platform: %d", slotno);
318 exit(1);
320 #else
322 int fd = PQsocket(slot->connection);
324 if (fd >= FD_SETSIZE)
326 pg_log_error("socket file descriptor out of range for select(): %d",
327 fd);
328 pg_log_error_hint("Try fewer jobs.");
329 exit(1);
332 #endif
334 /* Setup the connection using the supplied command, if any. */
335 if (sa->initcmd)
336 executeCommand(slot->connection, sa->initcmd, sa->echo);
340 * ParallelSlotsGetIdle
341 * Return a connection slot that is ready to execute a command.
343 * The slot returned is chosen as follows:
345 * If any idle slot already has an open connection, and if either dbname is
346 * null or the existing connection is to the given database, that slot will be
347 * returned allowing the connection to be reused.
349 * Otherwise, if any idle slot is not yet connected to any database, the slot
350 * will be returned with it's connection opened using the stored cparams and
351 * optionally the given dbname if not null.
353 * Otherwise, if any idle slot exists, an idle slot will be chosen and returned
354 * after having it's connection disconnected and reconnected using the stored
355 * cparams and optionally the given dbname if not null.
357 * Otherwise, if any slots have connections that are busy, we loop on select()
358 * until one socket becomes available. When this happens, we read the whole
359 * set and mark as free all sockets that become available. We then select a
360 * slot using the same rules as above.
362 * Otherwise, we cannot return a slot, which is an error, and NULL is returned.
364 * For any connection created, if the stored initcmd is not null, it will be
365 * executed as a command on the newly formed connection before the slot is
366 * returned.
368 * If an error occurs, NULL is returned.
370 ParallelSlot *
371 ParallelSlotsGetIdle(ParallelSlotArray *sa, const char *dbname)
373 int offset;
375 Assert(sa);
376 Assert(sa->numslots > 0);
378 while (1)
380 /* First choice: a slot already connected to the desired database. */
381 offset = find_matching_idle_slot(sa, dbname);
382 if (offset >= 0)
384 sa->slots[offset].inUse = true;
385 return &sa->slots[offset];
388 /* Second choice: a slot not connected to any database. */
389 offset = find_unconnected_slot(sa);
390 if (offset >= 0)
392 connect_slot(sa, offset, dbname);
393 sa->slots[offset].inUse = true;
394 return &sa->slots[offset];
397 /* Third choice: a slot connected to the wrong database. */
398 offset = find_any_idle_slot(sa);
399 if (offset >= 0)
401 disconnectDatabase(sa->slots[offset].connection);
402 sa->slots[offset].connection = NULL;
403 connect_slot(sa, offset, dbname);
404 sa->slots[offset].inUse = true;
405 return &sa->slots[offset];
409 * Fourth choice: block until one or more slots become available. If
410 * any slots hit a fatal error, we'll find out about that here and
411 * return NULL.
413 if (!wait_on_slots(sa))
414 return NULL;
419 * ParallelSlotsSetup
420 * Prepare a set of parallel slots but do not connect to any database.
422 * This creates and initializes a set of slots, marking all parallel slots as
423 * free and ready to use. Establishing connections is delayed until requesting
424 * a free slot. The cparams, progname, echo, and initcmd are stored for later
425 * use and must remain valid for the lifetime of the returned array.
427 ParallelSlotArray *
428 ParallelSlotsSetup(int numslots, ConnParams *cparams, const char *progname,
429 bool echo, const char *initcmd)
431 ParallelSlotArray *sa;
433 Assert(numslots > 0);
434 Assert(cparams != NULL);
435 Assert(progname != NULL);
437 sa = (ParallelSlotArray *) palloc0(offsetof(ParallelSlotArray, slots) +
438 numslots * sizeof(ParallelSlot));
440 sa->numslots = numslots;
441 sa->cparams = cparams;
442 sa->progname = progname;
443 sa->echo = echo;
444 sa->initcmd = initcmd;
446 return sa;
450 * ParallelSlotsAdoptConn
451 * Assign an open connection to the slots array for reuse.
453 * This turns over ownership of an open connection to a slots array. The
454 * caller should not further use or close the connection. All the connection's
455 * parameters (user, host, port, etc.) except possibly dbname should match
456 * those of the slots array's cparams, as given in ParallelSlotsSetup. If
457 * these parameters differ, subsequent behavior is undefined.
459 void
460 ParallelSlotsAdoptConn(ParallelSlotArray *sa, PGconn *conn)
462 int offset;
464 offset = find_unconnected_slot(sa);
465 if (offset >= 0)
466 sa->slots[offset].connection = conn;
467 else
468 disconnectDatabase(conn);
472 * ParallelSlotsTerminate
473 * Clean up a set of parallel slots
475 * Iterate through all connections in a given set of ParallelSlots and
476 * terminate all connections.
478 void
479 ParallelSlotsTerminate(ParallelSlotArray *sa)
481 int i;
483 for (i = 0; i < sa->numslots; i++)
485 PGconn *conn = sa->slots[i].connection;
487 if (conn == NULL)
488 continue;
490 disconnectDatabase(conn);
495 * ParallelSlotsWaitCompletion
497 * Wait for all connections to finish, returning false if at least one
498 * error has been found on the way.
500 bool
501 ParallelSlotsWaitCompletion(ParallelSlotArray *sa)
503 int i;
505 for (i = 0; i < sa->numslots; i++)
507 if (sa->slots[i].connection == NULL)
508 continue;
509 if (!consumeQueryResult(&sa->slots[i]))
510 return false;
511 /* Mark connection as idle */
512 sa->slots[i].inUse = false;
513 ParallelSlotClearHandler(&sa->slots[i]);
516 return true;
520 * TableCommandResultHandler
522 * ParallelSlotResultHandler for results of commands (not queries) against
523 * tables.
525 * Requires that the result status is either PGRES_COMMAND_OK or an error about
526 * a missing table. This is useful for utilities that compile a list of tables
527 * to process and then run commands (vacuum, reindex, or whatever) against
528 * those tables, as there is a race condition between the time the list is
529 * compiled and the time the command attempts to open the table.
531 * For missing tables, logs an error but allows processing to continue.
533 * For all other errors, logs an error and terminates further processing.
535 * res: PGresult from the query executed on the slot's connection
536 * conn: connection belonging to the slot
537 * context: unused
539 bool
540 TableCommandResultHandler(PGresult *res, PGconn *conn, void *context)
542 Assert(res != NULL);
543 Assert(conn != NULL);
546 * If it's an error, report it. Errors about a missing table are harmless
547 * so we continue processing; but die for other errors.
549 if (PQresultStatus(res) != PGRES_COMMAND_OK)
551 char *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
553 pg_log_error("processing of database \"%s\" failed: %s",
554 PQdb(conn), PQerrorMessage(conn));
556 if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
558 PQclear(res);
559 return false;
563 return true;