Improve notation of BuiltinTrancheNames
[pgsql.git] / src / backend / storage / lmgr / lwlock.c
blob98fa6035cc546f4e89326fbfb0f57adaddd33df4
1 /*-------------------------------------------------------------------------
3 * lwlock.c
4 * Lightweight lock manager
6 * Lightweight locks are intended primarily to provide mutual exclusion of
7 * access to shared-memory data structures. Therefore, they offer both
8 * exclusive and shared lock modes (to support read/write and read-only
9 * access to a shared object). There are few other frammishes. User-level
10 * locking should be done with the full lock manager --- which depends on
11 * LWLocks to protect its shared state.
13 * In addition to exclusive and shared modes, lightweight locks can be used to
14 * wait until a variable changes value. The variable is initially not set
15 * when the lock is acquired with LWLockAcquire, i.e. it remains set to the
16 * value it was set to when the lock was released last, and can be updated
17 * without releasing the lock by calling LWLockUpdateVar. LWLockWaitForVar
18 * waits for the variable to be updated, or until the lock is free. When
19 * releasing the lock with LWLockReleaseClearVar() the value can be set to an
20 * appropriate value for a free lock. The meaning of the variable is up to
21 * the caller, the lightweight lock code just assigns and compares it.
23 * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
24 * Portions Copyright (c) 1994, Regents of the University of California
26 * IDENTIFICATION
27 * src/backend/storage/lmgr/lwlock.c
29 * NOTES:
31 * This used to be a pretty straight forward reader-writer lock
32 * implementation, in which the internal state was protected by a
33 * spinlock. Unfortunately the overhead of taking the spinlock proved to be
34 * too high for workloads/locks that were taken in shared mode very
35 * frequently. Often we were spinning in the (obviously exclusive) spinlock,
36 * while trying to acquire a shared lock that was actually free.
38 * Thus a new implementation was devised that provides wait-free shared lock
39 * acquisition for locks that aren't exclusively locked.
41 * The basic idea is to have a single atomic variable 'lockcount' instead of
42 * the formerly separate shared and exclusive counters and to use atomic
43 * operations to acquire the lock. That's fairly easy to do for plain
44 * rw-spinlocks, but a lot harder for something like LWLocks that want to wait
45 * in the OS.
47 * For lock acquisition we use an atomic compare-and-exchange on the lockcount
48 * variable. For exclusive lock we swap in a sentinel value
49 * (LW_VAL_EXCLUSIVE), for shared locks we count the number of holders.
51 * To release the lock we use an atomic decrement to release the lock. If the
52 * new value is zero (we get that atomically), we know we can/have to release
53 * waiters.
55 * Obviously it is important that the sentinel value for exclusive locks
56 * doesn't conflict with the maximum number of possible share lockers -
57 * luckily MAX_BACKENDS makes that easily possible.
60 * The attentive reader might have noticed that naively doing the above has a
61 * glaring race condition: We try to lock using the atomic operations and
62 * notice that we have to wait. Unfortunately by the time we have finished
63 * queuing, the former locker very well might have already finished it's
64 * work. That's problematic because we're now stuck waiting inside the OS.
66 * To mitigate those races we use a two phased attempt at locking:
67 * Phase 1: Try to do it atomically, if we succeed, nice
68 * Phase 2: Add ourselves to the waitqueue of the lock
69 * Phase 3: Try to grab the lock again, if we succeed, remove ourselves from
70 * the queue
71 * Phase 4: Sleep till wake-up, goto Phase 1
73 * This protects us against the problem from above as nobody can release too
74 * quick, before we're queued, since after Phase 2 we're already queued.
75 * -------------------------------------------------------------------------
77 #include "postgres.h"
79 #include "miscadmin.h"
80 #include "pg_trace.h"
81 #include "pgstat.h"
82 #include "port/pg_bitutils.h"
83 #include "postmaster/postmaster.h"
84 #include "replication/slot.h"
85 #include "storage/ipc.h"
86 #include "storage/predicate.h"
87 #include "storage/proc.h"
88 #include "storage/proclist.h"
89 #include "storage/spin.h"
90 #include "utils/memutils.h"
92 #ifdef LWLOCK_STATS
93 #include "utils/hsearch.h"
94 #endif
97 /* We use the ShmemLock spinlock to protect LWLockCounter */
98 extern slock_t *ShmemLock;
100 #define LW_FLAG_HAS_WAITERS ((uint32) 1 << 30)
101 #define LW_FLAG_RELEASE_OK ((uint32) 1 << 29)
102 #define LW_FLAG_LOCKED ((uint32) 1 << 28)
104 #define LW_VAL_EXCLUSIVE ((uint32) 1 << 24)
105 #define LW_VAL_SHARED 1
107 #define LW_LOCK_MASK ((uint32) ((1 << 25)-1))
108 /* Must be greater than MAX_BACKENDS - which is 2^23-1, so we're fine. */
109 #define LW_SHARED_MASK ((uint32) ((1 << 24)-1))
111 StaticAssertDecl(LW_VAL_EXCLUSIVE > (uint32) MAX_BACKENDS,
112 "MAX_BACKENDS too big for lwlock.c");
115 * There are three sorts of LWLock "tranches":
117 * 1. The individually-named locks defined in lwlocknames.h each have their
118 * own tranche. The names of these tranches appear in IndividualLWLockNames[]
119 * in lwlocknames.c.
121 * 2. There are some predefined tranches for built-in groups of locks.
122 * These are listed in enum BuiltinTrancheIds in lwlock.h, and their names
123 * appear in BuiltinTrancheNames[] below.
125 * 3. Extensions can create new tranches, via either RequestNamedLWLockTranche
126 * or LWLockRegisterTranche. The names of these that are known in the current
127 * process appear in LWLockTrancheNames[].
129 * All these names are user-visible as wait event names, so choose with care
130 * ... and do not forget to update the documentation's list of wait events.
132 extern const char *const IndividualLWLockNames[]; /* in lwlocknames.c */
134 static const char *const BuiltinTrancheNames[] = {
135 [LWTRANCHE_XACT_BUFFER] = "XactBuffer",
136 [LWTRANCHE_COMMITTS_BUFFER] = "CommitTsBuffer",
137 [LWTRANCHE_SUBTRANS_BUFFER] = "SubtransBuffer",
138 [LWTRANCHE_MULTIXACTOFFSET_BUFFER] = "MultiXactOffsetBuffer",
139 [LWTRANCHE_MULTIXACTMEMBER_BUFFER] = "MultiXactMemberBuffer",
140 [LWTRANCHE_NOTIFY_BUFFER] = "NotifyBuffer",
141 [LWTRANCHE_SERIAL_BUFFER] = "SerialBuffer",
142 [LWTRANCHE_WAL_INSERT] = "WALInsert",
143 [LWTRANCHE_BUFFER_CONTENT] = "BufferContent",
144 [LWTRANCHE_REPLICATION_ORIGIN_STATE] = "ReplicationOriginState",
145 [LWTRANCHE_REPLICATION_SLOT_IO] = "ReplicationSlotIO",
146 [LWTRANCHE_LOCK_FASTPATH] = "LockFastPath",
147 [LWTRANCHE_BUFFER_MAPPING] = "BufferMapping",
148 [LWTRANCHE_LOCK_MANAGER] = "LockManager",
149 [LWTRANCHE_PREDICATE_LOCK_MANAGER] = "PredicateLockManager",
150 [LWTRANCHE_PARALLEL_HASH_JOIN] = "ParallelHashJoin",
151 [LWTRANCHE_PARALLEL_QUERY_DSA] = "ParallelQueryDSA",
152 [LWTRANCHE_PER_SESSION_DSA] = "PerSessionDSA",
153 [LWTRANCHE_PER_SESSION_RECORD_TYPE] = "PerSessionRecordType",
154 [LWTRANCHE_PER_SESSION_RECORD_TYPMOD] = "PerSessionRecordTypmod",
155 [LWTRANCHE_SHARED_TUPLESTORE] = "SharedTupleStore",
156 [LWTRANCHE_SHARED_TIDBITMAP] = "SharedTidBitmap",
157 [LWTRANCHE_PARALLEL_APPEND] = "ParallelAppend",
158 [LWTRANCHE_PER_XACT_PREDICATE_LIST] = "PerXactPredicateList",
159 [LWTRANCHE_PGSTATS_DSA] = "PgStatsDSA",
160 [LWTRANCHE_PGSTATS_HASH] = "PgStatsHash",
161 [LWTRANCHE_PGSTATS_DATA] = "PgStatsData",
162 [LWTRANCHE_LAUNCHER_DSA] = "LogicalRepLauncherDSA",
163 [LWTRANCHE_LAUNCHER_HASH] = "LogicalRepLauncherHash",
164 [LWTRANCHE_DSM_REGISTRY_DSA] = "DSMRegistryDSA",
165 [LWTRANCHE_DSM_REGISTRY_HASH] = "DSMRegistryHash",
168 StaticAssertDecl(lengthof(BuiltinTrancheNames) ==
169 LWTRANCHE_FIRST_USER_DEFINED,
170 "missing entries in BuiltinTrancheNames[]");
173 * This is indexed by tranche ID minus LWTRANCHE_FIRST_USER_DEFINED, and
174 * stores the names of all dynamically-created tranches known to the current
175 * process. Any unused entries in the array will contain NULL.
177 static const char **LWLockTrancheNames = NULL;
178 static int LWLockTrancheNamesAllocated = 0;
181 * This points to the main array of LWLocks in shared memory. Backends inherit
182 * the pointer by fork from the postmaster (except in the EXEC_BACKEND case,
183 * where we have special measures to pass it down).
185 LWLockPadded *MainLWLockArray = NULL;
188 * We use this structure to keep track of locked LWLocks for release
189 * during error recovery. Normally, only a few will be held at once, but
190 * occasionally the number can be much higher; for example, the pg_buffercache
191 * extension locks all buffer partitions simultaneously.
193 #define MAX_SIMUL_LWLOCKS 200
195 /* struct representing the LWLocks we're holding */
196 typedef struct LWLockHandle
198 LWLock *lock;
199 LWLockMode mode;
200 } LWLockHandle;
202 static int num_held_lwlocks = 0;
203 static LWLockHandle held_lwlocks[MAX_SIMUL_LWLOCKS];
205 /* struct representing the LWLock tranche request for named tranche */
206 typedef struct NamedLWLockTrancheRequest
208 char tranche_name[NAMEDATALEN];
209 int num_lwlocks;
210 } NamedLWLockTrancheRequest;
212 static NamedLWLockTrancheRequest *NamedLWLockTrancheRequestArray = NULL;
213 static int NamedLWLockTrancheRequestsAllocated = 0;
216 * NamedLWLockTrancheRequests is both the valid length of the request array,
217 * and the length of the shared-memory NamedLWLockTrancheArray later on.
218 * This variable and NamedLWLockTrancheArray are non-static so that
219 * postmaster.c can copy them to child processes in EXEC_BACKEND builds.
221 int NamedLWLockTrancheRequests = 0;
223 /* points to data in shared memory: */
224 NamedLWLockTranche *NamedLWLockTrancheArray = NULL;
226 static void InitializeLWLocks(void);
227 static inline void LWLockReportWaitStart(LWLock *lock);
228 static inline void LWLockReportWaitEnd(void);
229 static const char *GetLWTrancheName(uint16 trancheId);
231 #define T_NAME(lock) \
232 GetLWTrancheName((lock)->tranche)
234 #ifdef LWLOCK_STATS
235 typedef struct lwlock_stats_key
237 int tranche;
238 void *instance;
239 } lwlock_stats_key;
241 typedef struct lwlock_stats
243 lwlock_stats_key key;
244 int sh_acquire_count;
245 int ex_acquire_count;
246 int block_count;
247 int dequeue_self_count;
248 int spin_delay_count;
249 } lwlock_stats;
251 static HTAB *lwlock_stats_htab;
252 static lwlock_stats lwlock_stats_dummy;
253 #endif
255 #ifdef LOCK_DEBUG
256 bool Trace_lwlocks = false;
258 inline static void
259 PRINT_LWDEBUG(const char *where, LWLock *lock, LWLockMode mode)
261 /* hide statement & context here, otherwise the log is just too verbose */
262 if (Trace_lwlocks)
264 uint32 state = pg_atomic_read_u32(&lock->state);
266 ereport(LOG,
267 (errhidestmt(true),
268 errhidecontext(true),
269 errmsg_internal("%d: %s(%s %p): excl %u shared %u haswaiters %u waiters %u rOK %d",
270 MyProcPid,
271 where, T_NAME(lock), lock,
272 (state & LW_VAL_EXCLUSIVE) != 0,
273 state & LW_SHARED_MASK,
274 (state & LW_FLAG_HAS_WAITERS) != 0,
275 pg_atomic_read_u32(&lock->nwaiters),
276 (state & LW_FLAG_RELEASE_OK) != 0)));
280 inline static void
281 LOG_LWDEBUG(const char *where, LWLock *lock, const char *msg)
283 /* hide statement & context here, otherwise the log is just too verbose */
284 if (Trace_lwlocks)
286 ereport(LOG,
287 (errhidestmt(true),
288 errhidecontext(true),
289 errmsg_internal("%s(%s %p): %s", where,
290 T_NAME(lock), lock, msg)));
294 #else /* not LOCK_DEBUG */
295 #define PRINT_LWDEBUG(a,b,c) ((void)0)
296 #define LOG_LWDEBUG(a,b,c) ((void)0)
297 #endif /* LOCK_DEBUG */
299 #ifdef LWLOCK_STATS
301 static void init_lwlock_stats(void);
302 static void print_lwlock_stats(int code, Datum arg);
303 static lwlock_stats * get_lwlock_stats_entry(LWLock *lock);
305 static void
306 init_lwlock_stats(void)
308 HASHCTL ctl;
309 static MemoryContext lwlock_stats_cxt = NULL;
310 static bool exit_registered = false;
312 if (lwlock_stats_cxt != NULL)
313 MemoryContextDelete(lwlock_stats_cxt);
316 * The LWLock stats will be updated within a critical section, which
317 * requires allocating new hash entries. Allocations within a critical
318 * section are normally not allowed because running out of memory would
319 * lead to a PANIC, but LWLOCK_STATS is debugging code that's not normally
320 * turned on in production, so that's an acceptable risk. The hash entries
321 * are small, so the risk of running out of memory is minimal in practice.
323 lwlock_stats_cxt = AllocSetContextCreate(TopMemoryContext,
324 "LWLock stats",
325 ALLOCSET_DEFAULT_SIZES);
326 MemoryContextAllowInCriticalSection(lwlock_stats_cxt, true);
328 ctl.keysize = sizeof(lwlock_stats_key);
329 ctl.entrysize = sizeof(lwlock_stats);
330 ctl.hcxt = lwlock_stats_cxt;
331 lwlock_stats_htab = hash_create("lwlock stats", 16384, &ctl,
332 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
333 if (!exit_registered)
335 on_shmem_exit(print_lwlock_stats, 0);
336 exit_registered = true;
340 static void
341 print_lwlock_stats(int code, Datum arg)
343 HASH_SEQ_STATUS scan;
344 lwlock_stats *lwstats;
346 hash_seq_init(&scan, lwlock_stats_htab);
348 /* Grab an LWLock to keep different backends from mixing reports */
349 LWLockAcquire(&MainLWLockArray[0].lock, LW_EXCLUSIVE);
351 while ((lwstats = (lwlock_stats *) hash_seq_search(&scan)) != NULL)
353 fprintf(stderr,
354 "PID %d lwlock %s %p: shacq %u exacq %u blk %u spindelay %u dequeue self %u\n",
355 MyProcPid, GetLWTrancheName(lwstats->key.tranche),
356 lwstats->key.instance, lwstats->sh_acquire_count,
357 lwstats->ex_acquire_count, lwstats->block_count,
358 lwstats->spin_delay_count, lwstats->dequeue_self_count);
361 LWLockRelease(&MainLWLockArray[0].lock);
364 static lwlock_stats *
365 get_lwlock_stats_entry(LWLock *lock)
367 lwlock_stats_key key;
368 lwlock_stats *lwstats;
369 bool found;
372 * During shared memory initialization, the hash table doesn't exist yet.
373 * Stats of that phase aren't very interesting, so just collect operations
374 * on all locks in a single dummy entry.
376 if (lwlock_stats_htab == NULL)
377 return &lwlock_stats_dummy;
379 /* Fetch or create the entry. */
380 MemSet(&key, 0, sizeof(key));
381 key.tranche = lock->tranche;
382 key.instance = lock;
383 lwstats = hash_search(lwlock_stats_htab, &key, HASH_ENTER, &found);
384 if (!found)
386 lwstats->sh_acquire_count = 0;
387 lwstats->ex_acquire_count = 0;
388 lwstats->block_count = 0;
389 lwstats->dequeue_self_count = 0;
390 lwstats->spin_delay_count = 0;
392 return lwstats;
394 #endif /* LWLOCK_STATS */
398 * Compute number of LWLocks required by named tranches. These will be
399 * allocated in the main array.
401 static int
402 NumLWLocksForNamedTranches(void)
404 int numLocks = 0;
405 int i;
407 for (i = 0; i < NamedLWLockTrancheRequests; i++)
408 numLocks += NamedLWLockTrancheRequestArray[i].num_lwlocks;
410 return numLocks;
414 * Compute shmem space needed for LWLocks and named tranches.
416 Size
417 LWLockShmemSize(void)
419 Size size;
420 int i;
421 int numLocks = NUM_FIXED_LWLOCKS;
423 /* Calculate total number of locks needed in the main array. */
424 numLocks += NumLWLocksForNamedTranches();
426 /* Space for the LWLock array. */
427 size = mul_size(numLocks, sizeof(LWLockPadded));
429 /* Space for dynamic allocation counter, plus room for alignment. */
430 size = add_size(size, sizeof(int) + LWLOCK_PADDED_SIZE);
432 /* space for named tranches. */
433 size = add_size(size, mul_size(NamedLWLockTrancheRequests, sizeof(NamedLWLockTranche)));
435 /* space for name of each tranche. */
436 for (i = 0; i < NamedLWLockTrancheRequests; i++)
437 size = add_size(size, strlen(NamedLWLockTrancheRequestArray[i].tranche_name) + 1);
439 return size;
443 * Allocate shmem space for the main LWLock array and all tranches and
444 * initialize it. We also register extension LWLock tranches here.
446 void
447 CreateLWLocks(void)
449 if (!IsUnderPostmaster)
451 Size spaceLocks = LWLockShmemSize();
452 int *LWLockCounter;
453 char *ptr;
455 /* Allocate space */
456 ptr = (char *) ShmemAlloc(spaceLocks);
458 /* Leave room for dynamic allocation of tranches */
459 ptr += sizeof(int);
461 /* Ensure desired alignment of LWLock array */
462 ptr += LWLOCK_PADDED_SIZE - ((uintptr_t) ptr) % LWLOCK_PADDED_SIZE;
464 MainLWLockArray = (LWLockPadded *) ptr;
467 * Initialize the dynamic-allocation counter for tranches, which is
468 * stored just before the first LWLock.
470 LWLockCounter = (int *) ((char *) MainLWLockArray - sizeof(int));
471 *LWLockCounter = LWTRANCHE_FIRST_USER_DEFINED;
473 /* Initialize all LWLocks */
474 InitializeLWLocks();
477 /* Register named extension LWLock tranches in the current process. */
478 for (int i = 0; i < NamedLWLockTrancheRequests; i++)
479 LWLockRegisterTranche(NamedLWLockTrancheArray[i].trancheId,
480 NamedLWLockTrancheArray[i].trancheName);
484 * Initialize LWLocks that are fixed and those belonging to named tranches.
486 static void
487 InitializeLWLocks(void)
489 int numNamedLocks = NumLWLocksForNamedTranches();
490 int id;
491 int i;
492 int j;
493 LWLockPadded *lock;
495 /* Initialize all individual LWLocks in main array */
496 for (id = 0, lock = MainLWLockArray; id < NUM_INDIVIDUAL_LWLOCKS; id++, lock++)
497 LWLockInitialize(&lock->lock, id);
499 /* Initialize buffer mapping LWLocks in main array */
500 lock = MainLWLockArray + BUFFER_MAPPING_LWLOCK_OFFSET;
501 for (id = 0; id < NUM_BUFFER_PARTITIONS; id++, lock++)
502 LWLockInitialize(&lock->lock, LWTRANCHE_BUFFER_MAPPING);
504 /* Initialize lmgrs' LWLocks in main array */
505 lock = MainLWLockArray + LOCK_MANAGER_LWLOCK_OFFSET;
506 for (id = 0; id < NUM_LOCK_PARTITIONS; id++, lock++)
507 LWLockInitialize(&lock->lock, LWTRANCHE_LOCK_MANAGER);
509 /* Initialize predicate lmgrs' LWLocks in main array */
510 lock = MainLWLockArray + PREDICATELOCK_MANAGER_LWLOCK_OFFSET;
511 for (id = 0; id < NUM_PREDICATELOCK_PARTITIONS; id++, lock++)
512 LWLockInitialize(&lock->lock, LWTRANCHE_PREDICATE_LOCK_MANAGER);
515 * Copy the info about any named tranches into shared memory (so that
516 * other processes can see it), and initialize the requested LWLocks.
518 if (NamedLWLockTrancheRequests > 0)
520 char *trancheNames;
522 NamedLWLockTrancheArray = (NamedLWLockTranche *)
523 &MainLWLockArray[NUM_FIXED_LWLOCKS + numNamedLocks];
525 trancheNames = (char *) NamedLWLockTrancheArray +
526 (NamedLWLockTrancheRequests * sizeof(NamedLWLockTranche));
527 lock = &MainLWLockArray[NUM_FIXED_LWLOCKS];
529 for (i = 0; i < NamedLWLockTrancheRequests; i++)
531 NamedLWLockTrancheRequest *request;
532 NamedLWLockTranche *tranche;
533 char *name;
535 request = &NamedLWLockTrancheRequestArray[i];
536 tranche = &NamedLWLockTrancheArray[i];
538 name = trancheNames;
539 trancheNames += strlen(request->tranche_name) + 1;
540 strcpy(name, request->tranche_name);
541 tranche->trancheId = LWLockNewTrancheId();
542 tranche->trancheName = name;
544 for (j = 0; j < request->num_lwlocks; j++, lock++)
545 LWLockInitialize(&lock->lock, tranche->trancheId);
551 * InitLWLockAccess - initialize backend-local state needed to hold LWLocks
553 void
554 InitLWLockAccess(void)
556 #ifdef LWLOCK_STATS
557 init_lwlock_stats();
558 #endif
562 * GetNamedLWLockTranche - returns the base address of LWLock from the
563 * specified tranche.
565 * Caller needs to retrieve the requested number of LWLocks starting from
566 * the base lock address returned by this API. This can be used for
567 * tranches that are requested by using RequestNamedLWLockTranche() API.
569 LWLockPadded *
570 GetNamedLWLockTranche(const char *tranche_name)
572 int lock_pos;
573 int i;
576 * Obtain the position of base address of LWLock belonging to requested
577 * tranche_name in MainLWLockArray. LWLocks for named tranches are placed
578 * in MainLWLockArray after fixed locks.
580 lock_pos = NUM_FIXED_LWLOCKS;
581 for (i = 0; i < NamedLWLockTrancheRequests; i++)
583 if (strcmp(NamedLWLockTrancheRequestArray[i].tranche_name,
584 tranche_name) == 0)
585 return &MainLWLockArray[lock_pos];
587 lock_pos += NamedLWLockTrancheRequestArray[i].num_lwlocks;
590 elog(ERROR, "requested tranche is not registered");
592 /* just to keep compiler quiet */
593 return NULL;
597 * Allocate a new tranche ID.
600 LWLockNewTrancheId(void)
602 int result;
603 int *LWLockCounter;
605 LWLockCounter = (int *) ((char *) MainLWLockArray - sizeof(int));
606 SpinLockAcquire(ShmemLock);
607 result = (*LWLockCounter)++;
608 SpinLockRelease(ShmemLock);
610 return result;
614 * Register a dynamic tranche name in the lookup table of the current process.
616 * This routine will save a pointer to the tranche name passed as an argument,
617 * so the name should be allocated in a backend-lifetime context
618 * (shared memory, TopMemoryContext, static constant, or similar).
620 * The tranche name will be user-visible as a wait event name, so try to
621 * use a name that fits the style for those.
623 void
624 LWLockRegisterTranche(int tranche_id, const char *tranche_name)
626 /* This should only be called for user-defined tranches. */
627 if (tranche_id < LWTRANCHE_FIRST_USER_DEFINED)
628 return;
630 /* Convert to array index. */
631 tranche_id -= LWTRANCHE_FIRST_USER_DEFINED;
633 /* If necessary, create or enlarge array. */
634 if (tranche_id >= LWLockTrancheNamesAllocated)
636 int newalloc;
638 newalloc = pg_nextpower2_32(Max(8, tranche_id + 1));
640 if (LWLockTrancheNames == NULL)
641 LWLockTrancheNames = (const char **)
642 MemoryContextAllocZero(TopMemoryContext,
643 newalloc * sizeof(char *));
644 else
645 LWLockTrancheNames =
646 repalloc0_array(LWLockTrancheNames, const char *, LWLockTrancheNamesAllocated, newalloc);
647 LWLockTrancheNamesAllocated = newalloc;
650 LWLockTrancheNames[tranche_id] = tranche_name;
654 * RequestNamedLWLockTranche
655 * Request that extra LWLocks be allocated during postmaster
656 * startup.
658 * This may only be called via the shmem_request_hook of a library that is
659 * loaded into the postmaster via shared_preload_libraries. Calls from
660 * elsewhere will fail.
662 * The tranche name will be user-visible as a wait event name, so try to
663 * use a name that fits the style for those.
665 void
666 RequestNamedLWLockTranche(const char *tranche_name, int num_lwlocks)
668 NamedLWLockTrancheRequest *request;
670 if (!process_shmem_requests_in_progress)
671 elog(FATAL, "cannot request additional LWLocks outside shmem_request_hook");
673 if (NamedLWLockTrancheRequestArray == NULL)
675 NamedLWLockTrancheRequestsAllocated = 16;
676 NamedLWLockTrancheRequestArray = (NamedLWLockTrancheRequest *)
677 MemoryContextAlloc(TopMemoryContext,
678 NamedLWLockTrancheRequestsAllocated
679 * sizeof(NamedLWLockTrancheRequest));
682 if (NamedLWLockTrancheRequests >= NamedLWLockTrancheRequestsAllocated)
684 int i = pg_nextpower2_32(NamedLWLockTrancheRequests + 1);
686 NamedLWLockTrancheRequestArray = (NamedLWLockTrancheRequest *)
687 repalloc(NamedLWLockTrancheRequestArray,
688 i * sizeof(NamedLWLockTrancheRequest));
689 NamedLWLockTrancheRequestsAllocated = i;
692 request = &NamedLWLockTrancheRequestArray[NamedLWLockTrancheRequests];
693 Assert(strlen(tranche_name) + 1 <= NAMEDATALEN);
694 strlcpy(request->tranche_name, tranche_name, NAMEDATALEN);
695 request->num_lwlocks = num_lwlocks;
696 NamedLWLockTrancheRequests++;
700 * LWLockInitialize - initialize a new lwlock; it's initially unlocked
702 void
703 LWLockInitialize(LWLock *lock, int tranche_id)
705 pg_atomic_init_u32(&lock->state, LW_FLAG_RELEASE_OK);
706 #ifdef LOCK_DEBUG
707 pg_atomic_init_u32(&lock->nwaiters, 0);
708 #endif
709 lock->tranche = tranche_id;
710 proclist_init(&lock->waiters);
714 * Report start of wait event for light-weight locks.
716 * This function will be used by all the light-weight lock calls which
717 * needs to wait to acquire the lock. This function distinguishes wait
718 * event based on tranche and lock id.
720 static inline void
721 LWLockReportWaitStart(LWLock *lock)
723 pgstat_report_wait_start(PG_WAIT_LWLOCK | lock->tranche);
727 * Report end of wait event for light-weight locks.
729 static inline void
730 LWLockReportWaitEnd(void)
732 pgstat_report_wait_end();
736 * Return the name of an LWLock tranche.
738 static const char *
739 GetLWTrancheName(uint16 trancheId)
741 /* Individual LWLock? */
742 if (trancheId < NUM_INDIVIDUAL_LWLOCKS)
743 return IndividualLWLockNames[trancheId];
745 /* Built-in tranche? */
746 if (trancheId < LWTRANCHE_FIRST_USER_DEFINED)
747 return BuiltinTrancheNames[trancheId];
750 * It's an extension tranche, so look in LWLockTrancheNames[]. However,
751 * it's possible that the tranche has never been registered in the current
752 * process, in which case give up and return "extension".
754 trancheId -= LWTRANCHE_FIRST_USER_DEFINED;
756 if (trancheId >= LWLockTrancheNamesAllocated ||
757 LWLockTrancheNames[trancheId] == NULL)
758 return "extension";
760 return LWLockTrancheNames[trancheId];
764 * Return an identifier for an LWLock based on the wait class and event.
766 const char *
767 GetLWLockIdentifier(uint32 classId, uint16 eventId)
769 Assert(classId == PG_WAIT_LWLOCK);
770 /* The event IDs are just tranche numbers. */
771 return GetLWTrancheName(eventId);
775 * Internal function that tries to atomically acquire the lwlock in the passed
776 * in mode.
778 * This function will not block waiting for a lock to become free - that's the
779 * callers job.
781 * Returns true if the lock isn't free and we need to wait.
783 static bool
784 LWLockAttemptLock(LWLock *lock, LWLockMode mode)
786 uint32 old_state;
788 Assert(mode == LW_EXCLUSIVE || mode == LW_SHARED);
791 * Read once outside the loop, later iterations will get the newer value
792 * via compare & exchange.
794 old_state = pg_atomic_read_u32(&lock->state);
796 /* loop until we've determined whether we could acquire the lock or not */
797 while (true)
799 uint32 desired_state;
800 bool lock_free;
802 desired_state = old_state;
804 if (mode == LW_EXCLUSIVE)
806 lock_free = (old_state & LW_LOCK_MASK) == 0;
807 if (lock_free)
808 desired_state += LW_VAL_EXCLUSIVE;
810 else
812 lock_free = (old_state & LW_VAL_EXCLUSIVE) == 0;
813 if (lock_free)
814 desired_state += LW_VAL_SHARED;
818 * Attempt to swap in the state we are expecting. If we didn't see
819 * lock to be free, that's just the old value. If we saw it as free,
820 * we'll attempt to mark it acquired. The reason that we always swap
821 * in the value is that this doubles as a memory barrier. We could try
822 * to be smarter and only swap in values if we saw the lock as free,
823 * but benchmark haven't shown it as beneficial so far.
825 * Retry if the value changed since we last looked at it.
827 if (pg_atomic_compare_exchange_u32(&lock->state,
828 &old_state, desired_state))
830 if (lock_free)
832 /* Great! Got the lock. */
833 #ifdef LOCK_DEBUG
834 if (mode == LW_EXCLUSIVE)
835 lock->owner = MyProc;
836 #endif
837 return false;
839 else
840 return true; /* somebody else has the lock */
843 pg_unreachable();
847 * Lock the LWLock's wait list against concurrent activity.
849 * NB: even though the wait list is locked, non-conflicting lock operations
850 * may still happen concurrently.
852 * Time spent holding mutex should be short!
854 static void
855 LWLockWaitListLock(LWLock *lock)
857 uint32 old_state;
858 #ifdef LWLOCK_STATS
859 lwlock_stats *lwstats;
860 uint32 delays = 0;
862 lwstats = get_lwlock_stats_entry(lock);
863 #endif
865 while (true)
867 /* always try once to acquire lock directly */
868 old_state = pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_LOCKED);
869 if (!(old_state & LW_FLAG_LOCKED))
870 break; /* got lock */
872 /* and then spin without atomic operations until lock is released */
874 SpinDelayStatus delayStatus;
876 init_local_spin_delay(&delayStatus);
878 while (old_state & LW_FLAG_LOCKED)
880 perform_spin_delay(&delayStatus);
881 old_state = pg_atomic_read_u32(&lock->state);
883 #ifdef LWLOCK_STATS
884 delays += delayStatus.delays;
885 #endif
886 finish_spin_delay(&delayStatus);
890 * Retry. The lock might obviously already be re-acquired by the time
891 * we're attempting to get it again.
895 #ifdef LWLOCK_STATS
896 lwstats->spin_delay_count += delays;
897 #endif
901 * Unlock the LWLock's wait list.
903 * Note that it can be more efficient to manipulate flags and release the
904 * locks in a single atomic operation.
906 static void
907 LWLockWaitListUnlock(LWLock *lock)
909 uint32 old_state PG_USED_FOR_ASSERTS_ONLY;
911 old_state = pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_LOCKED);
913 Assert(old_state & LW_FLAG_LOCKED);
917 * Wakeup all the lockers that currently have a chance to acquire the lock.
919 static void
920 LWLockWakeup(LWLock *lock)
922 bool new_release_ok;
923 bool wokeup_somebody = false;
924 proclist_head wakeup;
925 proclist_mutable_iter iter;
927 proclist_init(&wakeup);
929 new_release_ok = true;
931 /* lock wait list while collecting backends to wake up */
932 LWLockWaitListLock(lock);
934 proclist_foreach_modify(iter, &lock->waiters, lwWaitLink)
936 PGPROC *waiter = GetPGProcByNumber(iter.cur);
938 if (wokeup_somebody && waiter->lwWaitMode == LW_EXCLUSIVE)
939 continue;
941 proclist_delete(&lock->waiters, iter.cur, lwWaitLink);
942 proclist_push_tail(&wakeup, iter.cur, lwWaitLink);
944 if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
947 * Prevent additional wakeups until retryer gets to run. Backends
948 * that are just waiting for the lock to become free don't retry
949 * automatically.
951 new_release_ok = false;
954 * Don't wakeup (further) exclusive locks.
956 wokeup_somebody = true;
960 * Signal that the process isn't on the wait list anymore. This allows
961 * LWLockDequeueSelf() to remove itself of the waitlist with a
962 * proclist_delete(), rather than having to check if it has been
963 * removed from the list.
965 Assert(waiter->lwWaiting == LW_WS_WAITING);
966 waiter->lwWaiting = LW_WS_PENDING_WAKEUP;
969 * Once we've woken up an exclusive lock, there's no point in waking
970 * up anybody else.
972 if (waiter->lwWaitMode == LW_EXCLUSIVE)
973 break;
976 Assert(proclist_is_empty(&wakeup) || pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS);
978 /* unset required flags, and release lock, in one fell swoop */
980 uint32 old_state;
981 uint32 desired_state;
983 old_state = pg_atomic_read_u32(&lock->state);
984 while (true)
986 desired_state = old_state;
988 /* compute desired flags */
990 if (new_release_ok)
991 desired_state |= LW_FLAG_RELEASE_OK;
992 else
993 desired_state &= ~LW_FLAG_RELEASE_OK;
995 if (proclist_is_empty(&wakeup))
996 desired_state &= ~LW_FLAG_HAS_WAITERS;
998 desired_state &= ~LW_FLAG_LOCKED; /* release lock */
1000 if (pg_atomic_compare_exchange_u32(&lock->state, &old_state,
1001 desired_state))
1002 break;
1006 /* Awaken any waiters I removed from the queue. */
1007 proclist_foreach_modify(iter, &wakeup, lwWaitLink)
1009 PGPROC *waiter = GetPGProcByNumber(iter.cur);
1011 LOG_LWDEBUG("LWLockRelease", lock, "release waiter");
1012 proclist_delete(&wakeup, iter.cur, lwWaitLink);
1015 * Guarantee that lwWaiting being unset only becomes visible once the
1016 * unlink from the link has completed. Otherwise the target backend
1017 * could be woken up for other reason and enqueue for a new lock - if
1018 * that happens before the list unlink happens, the list would end up
1019 * being corrupted.
1021 * The barrier pairs with the LWLockWaitListLock() when enqueuing for
1022 * another lock.
1024 pg_write_barrier();
1025 waiter->lwWaiting = LW_WS_NOT_WAITING;
1026 PGSemaphoreUnlock(waiter->sem);
1031 * Add ourselves to the end of the queue.
1033 * NB: Mode can be LW_WAIT_UNTIL_FREE here!
1035 static void
1036 LWLockQueueSelf(LWLock *lock, LWLockMode mode)
1039 * If we don't have a PGPROC structure, there's no way to wait. This
1040 * should never occur, since MyProc should only be null during shared
1041 * memory initialization.
1043 if (MyProc == NULL)
1044 elog(PANIC, "cannot wait without a PGPROC structure");
1046 if (MyProc->lwWaiting != LW_WS_NOT_WAITING)
1047 elog(PANIC, "queueing for lock while waiting on another one");
1049 LWLockWaitListLock(lock);
1051 /* setting the flag is protected by the spinlock */
1052 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_HAS_WAITERS);
1054 MyProc->lwWaiting = LW_WS_WAITING;
1055 MyProc->lwWaitMode = mode;
1057 /* LW_WAIT_UNTIL_FREE waiters are always at the front of the queue */
1058 if (mode == LW_WAIT_UNTIL_FREE)
1059 proclist_push_head(&lock->waiters, MyProc->pgprocno, lwWaitLink);
1060 else
1061 proclist_push_tail(&lock->waiters, MyProc->pgprocno, lwWaitLink);
1063 /* Can release the mutex now */
1064 LWLockWaitListUnlock(lock);
1066 #ifdef LOCK_DEBUG
1067 pg_atomic_fetch_add_u32(&lock->nwaiters, 1);
1068 #endif
1072 * Remove ourselves from the waitlist.
1074 * This is used if we queued ourselves because we thought we needed to sleep
1075 * but, after further checking, we discovered that we don't actually need to
1076 * do so.
1078 static void
1079 LWLockDequeueSelf(LWLock *lock)
1081 bool on_waitlist;
1083 #ifdef LWLOCK_STATS
1084 lwlock_stats *lwstats;
1086 lwstats = get_lwlock_stats_entry(lock);
1088 lwstats->dequeue_self_count++;
1089 #endif
1091 LWLockWaitListLock(lock);
1094 * Remove ourselves from the waitlist, unless we've already been removed.
1095 * The removal happens with the wait list lock held, so there's no race in
1096 * this check.
1098 on_waitlist = MyProc->lwWaiting == LW_WS_WAITING;
1099 if (on_waitlist)
1100 proclist_delete(&lock->waiters, MyProc->pgprocno, lwWaitLink);
1102 if (proclist_is_empty(&lock->waiters) &&
1103 (pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS) != 0)
1105 pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_HAS_WAITERS);
1108 /* XXX: combine with fetch_and above? */
1109 LWLockWaitListUnlock(lock);
1111 /* clear waiting state again, nice for debugging */
1112 if (on_waitlist)
1113 MyProc->lwWaiting = LW_WS_NOT_WAITING;
1114 else
1116 int extraWaits = 0;
1119 * Somebody else dequeued us and has or will wake us up. Deal with the
1120 * superfluous absorption of a wakeup.
1124 * Reset RELEASE_OK flag if somebody woke us before we removed
1125 * ourselves - they'll have set it to false.
1127 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
1130 * Now wait for the scheduled wakeup, otherwise our ->lwWaiting would
1131 * get reset at some inconvenient point later. Most of the time this
1132 * will immediately return.
1134 for (;;)
1136 PGSemaphoreLock(MyProc->sem);
1137 if (MyProc->lwWaiting == LW_WS_NOT_WAITING)
1138 break;
1139 extraWaits++;
1143 * Fix the process wait semaphore's count for any absorbed wakeups.
1145 while (extraWaits-- > 0)
1146 PGSemaphoreUnlock(MyProc->sem);
1149 #ifdef LOCK_DEBUG
1151 /* not waiting anymore */
1152 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1154 Assert(nwaiters < MAX_BACKENDS);
1156 #endif
1160 * LWLockAcquire - acquire a lightweight lock in the specified mode
1162 * If the lock is not available, sleep until it is. Returns true if the lock
1163 * was available immediately, false if we had to sleep.
1165 * Side effect: cancel/die interrupts are held off until lock release.
1167 bool
1168 LWLockAcquire(LWLock *lock, LWLockMode mode)
1170 PGPROC *proc = MyProc;
1171 bool result = true;
1172 int extraWaits = 0;
1173 #ifdef LWLOCK_STATS
1174 lwlock_stats *lwstats;
1176 lwstats = get_lwlock_stats_entry(lock);
1177 #endif
1179 Assert(mode == LW_SHARED || mode == LW_EXCLUSIVE);
1181 PRINT_LWDEBUG("LWLockAcquire", lock, mode);
1183 #ifdef LWLOCK_STATS
1184 /* Count lock acquisition attempts */
1185 if (mode == LW_EXCLUSIVE)
1186 lwstats->ex_acquire_count++;
1187 else
1188 lwstats->sh_acquire_count++;
1189 #endif /* LWLOCK_STATS */
1192 * We can't wait if we haven't got a PGPROC. This should only occur
1193 * during bootstrap or shared memory initialization. Put an Assert here
1194 * to catch unsafe coding practices.
1196 Assert(!(proc == NULL && IsUnderPostmaster));
1198 /* Ensure we will have room to remember the lock */
1199 if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
1200 elog(ERROR, "too many LWLocks taken");
1203 * Lock out cancel/die interrupts until we exit the code section protected
1204 * by the LWLock. This ensures that interrupts will not interfere with
1205 * manipulations of data structures in shared memory.
1207 HOLD_INTERRUPTS();
1210 * Loop here to try to acquire lock after each time we are signaled by
1211 * LWLockRelease.
1213 * NOTE: it might seem better to have LWLockRelease actually grant us the
1214 * lock, rather than retrying and possibly having to go back to sleep. But
1215 * in practice that is no good because it means a process swap for every
1216 * lock acquisition when two or more processes are contending for the same
1217 * lock. Since LWLocks are normally used to protect not-very-long
1218 * sections of computation, a process needs to be able to acquire and
1219 * release the same lock many times during a single CPU time slice, even
1220 * in the presence of contention. The efficiency of being able to do that
1221 * outweighs the inefficiency of sometimes wasting a process dispatch
1222 * cycle because the lock is not free when a released waiter finally gets
1223 * to run. See pgsql-hackers archives for 29-Dec-01.
1225 for (;;)
1227 bool mustwait;
1230 * Try to grab the lock the first time, we're not in the waitqueue
1231 * yet/anymore.
1233 mustwait = LWLockAttemptLock(lock, mode);
1235 if (!mustwait)
1237 LOG_LWDEBUG("LWLockAcquire", lock, "immediately acquired lock");
1238 break; /* got the lock */
1242 * Ok, at this point we couldn't grab the lock on the first try. We
1243 * cannot simply queue ourselves to the end of the list and wait to be
1244 * woken up because by now the lock could long have been released.
1245 * Instead add us to the queue and try to grab the lock again. If we
1246 * succeed we need to revert the queuing and be happy, otherwise we
1247 * recheck the lock. If we still couldn't grab it, we know that the
1248 * other locker will see our queue entries when releasing since they
1249 * existed before we checked for the lock.
1252 /* add to the queue */
1253 LWLockQueueSelf(lock, mode);
1255 /* we're now guaranteed to be woken up if necessary */
1256 mustwait = LWLockAttemptLock(lock, mode);
1258 /* ok, grabbed the lock the second time round, need to undo queueing */
1259 if (!mustwait)
1261 LOG_LWDEBUG("LWLockAcquire", lock, "acquired, undoing queue");
1263 LWLockDequeueSelf(lock);
1264 break;
1268 * Wait until awakened.
1270 * It is possible that we get awakened for a reason other than being
1271 * signaled by LWLockRelease. If so, loop back and wait again. Once
1272 * we've gotten the LWLock, re-increment the sema by the number of
1273 * additional signals received.
1275 LOG_LWDEBUG("LWLockAcquire", lock, "waiting");
1277 #ifdef LWLOCK_STATS
1278 lwstats->block_count++;
1279 #endif
1281 LWLockReportWaitStart(lock);
1282 if (TRACE_POSTGRESQL_LWLOCK_WAIT_START_ENABLED())
1283 TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), mode);
1285 for (;;)
1287 PGSemaphoreLock(proc->sem);
1288 if (proc->lwWaiting == LW_WS_NOT_WAITING)
1289 break;
1290 extraWaits++;
1293 /* Retrying, allow LWLockRelease to release waiters again. */
1294 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
1296 #ifdef LOCK_DEBUG
1298 /* not waiting anymore */
1299 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1301 Assert(nwaiters < MAX_BACKENDS);
1303 #endif
1305 if (TRACE_POSTGRESQL_LWLOCK_WAIT_DONE_ENABLED())
1306 TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), mode);
1307 LWLockReportWaitEnd();
1309 LOG_LWDEBUG("LWLockAcquire", lock, "awakened");
1311 /* Now loop back and try to acquire lock again. */
1312 result = false;
1315 if (TRACE_POSTGRESQL_LWLOCK_ACQUIRE_ENABLED())
1316 TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), mode);
1318 /* Add lock to list of locks held by this backend */
1319 held_lwlocks[num_held_lwlocks].lock = lock;
1320 held_lwlocks[num_held_lwlocks++].mode = mode;
1323 * Fix the process wait semaphore's count for any absorbed wakeups.
1325 while (extraWaits-- > 0)
1326 PGSemaphoreUnlock(proc->sem);
1328 return result;
1332 * LWLockConditionalAcquire - acquire a lightweight lock in the specified mode
1334 * If the lock is not available, return false with no side-effects.
1336 * If successful, cancel/die interrupts are held off until lock release.
1338 bool
1339 LWLockConditionalAcquire(LWLock *lock, LWLockMode mode)
1341 bool mustwait;
1343 Assert(mode == LW_SHARED || mode == LW_EXCLUSIVE);
1345 PRINT_LWDEBUG("LWLockConditionalAcquire", lock, mode);
1347 /* Ensure we will have room to remember the lock */
1348 if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
1349 elog(ERROR, "too many LWLocks taken");
1352 * Lock out cancel/die interrupts until we exit the code section protected
1353 * by the LWLock. This ensures that interrupts will not interfere with
1354 * manipulations of data structures in shared memory.
1356 HOLD_INTERRUPTS();
1358 /* Check for the lock */
1359 mustwait = LWLockAttemptLock(lock, mode);
1361 if (mustwait)
1363 /* Failed to get lock, so release interrupt holdoff */
1364 RESUME_INTERRUPTS();
1366 LOG_LWDEBUG("LWLockConditionalAcquire", lock, "failed");
1367 if (TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL_ENABLED())
1368 TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(T_NAME(lock), mode);
1370 else
1372 /* Add lock to list of locks held by this backend */
1373 held_lwlocks[num_held_lwlocks].lock = lock;
1374 held_lwlocks[num_held_lwlocks++].mode = mode;
1375 if (TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_ENABLED())
1376 TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(T_NAME(lock), mode);
1378 return !mustwait;
1382 * LWLockAcquireOrWait - Acquire lock, or wait until it's free
1384 * The semantics of this function are a bit funky. If the lock is currently
1385 * free, it is acquired in the given mode, and the function returns true. If
1386 * the lock isn't immediately free, the function waits until it is released
1387 * and returns false, but does not acquire the lock.
1389 * This is currently used for WALWriteLock: when a backend flushes the WAL,
1390 * holding WALWriteLock, it can flush the commit records of many other
1391 * backends as a side-effect. Those other backends need to wait until the
1392 * flush finishes, but don't need to acquire the lock anymore. They can just
1393 * wake up, observe that their records have already been flushed, and return.
1395 bool
1396 LWLockAcquireOrWait(LWLock *lock, LWLockMode mode)
1398 PGPROC *proc = MyProc;
1399 bool mustwait;
1400 int extraWaits = 0;
1401 #ifdef LWLOCK_STATS
1402 lwlock_stats *lwstats;
1404 lwstats = get_lwlock_stats_entry(lock);
1405 #endif
1407 Assert(mode == LW_SHARED || mode == LW_EXCLUSIVE);
1409 PRINT_LWDEBUG("LWLockAcquireOrWait", lock, mode);
1411 /* Ensure we will have room to remember the lock */
1412 if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
1413 elog(ERROR, "too many LWLocks taken");
1416 * Lock out cancel/die interrupts until we exit the code section protected
1417 * by the LWLock. This ensures that interrupts will not interfere with
1418 * manipulations of data structures in shared memory.
1420 HOLD_INTERRUPTS();
1423 * NB: We're using nearly the same twice-in-a-row lock acquisition
1424 * protocol as LWLockAcquire(). Check its comments for details.
1426 mustwait = LWLockAttemptLock(lock, mode);
1428 if (mustwait)
1430 LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE);
1432 mustwait = LWLockAttemptLock(lock, mode);
1434 if (mustwait)
1437 * Wait until awakened. Like in LWLockAcquire, be prepared for
1438 * bogus wakeups.
1440 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "waiting");
1442 #ifdef LWLOCK_STATS
1443 lwstats->block_count++;
1444 #endif
1446 LWLockReportWaitStart(lock);
1447 if (TRACE_POSTGRESQL_LWLOCK_WAIT_START_ENABLED())
1448 TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), mode);
1450 for (;;)
1452 PGSemaphoreLock(proc->sem);
1453 if (proc->lwWaiting == LW_WS_NOT_WAITING)
1454 break;
1455 extraWaits++;
1458 #ifdef LOCK_DEBUG
1460 /* not waiting anymore */
1461 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1463 Assert(nwaiters < MAX_BACKENDS);
1465 #endif
1466 if (TRACE_POSTGRESQL_LWLOCK_WAIT_DONE_ENABLED())
1467 TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), mode);
1468 LWLockReportWaitEnd();
1470 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "awakened");
1472 else
1474 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "acquired, undoing queue");
1477 * Got lock in the second attempt, undo queueing. We need to treat
1478 * this as having successfully acquired the lock, otherwise we'd
1479 * not necessarily wake up people we've prevented from acquiring
1480 * the lock.
1482 LWLockDequeueSelf(lock);
1487 * Fix the process wait semaphore's count for any absorbed wakeups.
1489 while (extraWaits-- > 0)
1490 PGSemaphoreUnlock(proc->sem);
1492 if (mustwait)
1494 /* Failed to get lock, so release interrupt holdoff */
1495 RESUME_INTERRUPTS();
1496 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "failed");
1497 if (TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT_FAIL_ENABLED())
1498 TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT_FAIL(T_NAME(lock), mode);
1500 else
1502 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "succeeded");
1503 /* Add lock to list of locks held by this backend */
1504 held_lwlocks[num_held_lwlocks].lock = lock;
1505 held_lwlocks[num_held_lwlocks++].mode = mode;
1506 if (TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT_ENABLED())
1507 TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT(T_NAME(lock), mode);
1510 return !mustwait;
1514 * Does the lwlock in its current state need to wait for the variable value to
1515 * change?
1517 * If we don't need to wait, and it's because the value of the variable has
1518 * changed, store the current value in newval.
1520 * *result is set to true if the lock was free, and false otherwise.
1522 static bool
1523 LWLockConflictsWithVar(LWLock *lock, pg_atomic_uint64 *valptr, uint64 oldval,
1524 uint64 *newval, bool *result)
1526 bool mustwait;
1527 uint64 value;
1530 * Test first to see if it the slot is free right now.
1532 * XXX: the unique caller of this routine, WaitXLogInsertionsToFinish()
1533 * via LWLockWaitForVar(), uses an implied barrier with a spinlock before
1534 * this, so we don't need a memory barrier here as far as the current
1535 * usage is concerned. But that might not be safe in general.
1537 mustwait = (pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) != 0;
1539 if (!mustwait)
1541 *result = true;
1542 return false;
1545 *result = false;
1548 * Reading this value atomically is safe even on platforms where uint64
1549 * cannot be read without observing a torn value.
1551 value = pg_atomic_read_u64(valptr);
1553 if (value != oldval)
1555 mustwait = false;
1556 *newval = value;
1558 else
1560 mustwait = true;
1563 return mustwait;
1567 * LWLockWaitForVar - Wait until lock is free, or a variable is updated.
1569 * If the lock is held and *valptr equals oldval, waits until the lock is
1570 * either freed, or the lock holder updates *valptr by calling
1571 * LWLockUpdateVar. If the lock is free on exit (immediately or after
1572 * waiting), returns true. If the lock is still held, but *valptr no longer
1573 * matches oldval, returns false and sets *newval to the current value in
1574 * *valptr.
1576 * Note: this function ignores shared lock holders; if the lock is held
1577 * in shared mode, returns 'true'.
1579 * Be aware that LWLockConflictsWithVar() does not include a memory barrier,
1580 * hence the caller of this function may want to rely on an explicit barrier or
1581 * an implied barrier via spinlock or LWLock to avoid memory ordering issues.
1583 bool
1584 LWLockWaitForVar(LWLock *lock, pg_atomic_uint64 *valptr, uint64 oldval,
1585 uint64 *newval)
1587 PGPROC *proc = MyProc;
1588 int extraWaits = 0;
1589 bool result = false;
1590 #ifdef LWLOCK_STATS
1591 lwlock_stats *lwstats;
1593 lwstats = get_lwlock_stats_entry(lock);
1594 #endif
1596 PRINT_LWDEBUG("LWLockWaitForVar", lock, LW_WAIT_UNTIL_FREE);
1599 * Lock out cancel/die interrupts while we sleep on the lock. There is no
1600 * cleanup mechanism to remove us from the wait queue if we got
1601 * interrupted.
1603 HOLD_INTERRUPTS();
1606 * Loop here to check the lock's status after each time we are signaled.
1608 for (;;)
1610 bool mustwait;
1612 mustwait = LWLockConflictsWithVar(lock, valptr, oldval, newval,
1613 &result);
1615 if (!mustwait)
1616 break; /* the lock was free or value didn't match */
1619 * Add myself to wait queue. Note that this is racy, somebody else
1620 * could wakeup before we're finished queuing. NB: We're using nearly
1621 * the same twice-in-a-row lock acquisition protocol as
1622 * LWLockAcquire(). Check its comments for details. The only
1623 * difference is that we also have to check the variable's values when
1624 * checking the state of the lock.
1626 LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE);
1629 * Set RELEASE_OK flag, to make sure we get woken up as soon as the
1630 * lock is released.
1632 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
1635 * We're now guaranteed to be woken up if necessary. Recheck the lock
1636 * and variables state.
1638 mustwait = LWLockConflictsWithVar(lock, valptr, oldval, newval,
1639 &result);
1641 /* Ok, no conflict after we queued ourselves. Undo queueing. */
1642 if (!mustwait)
1644 LOG_LWDEBUG("LWLockWaitForVar", lock, "free, undoing queue");
1646 LWLockDequeueSelf(lock);
1647 break;
1651 * Wait until awakened.
1653 * It is possible that we get awakened for a reason other than being
1654 * signaled by LWLockRelease. If so, loop back and wait again. Once
1655 * we've gotten the LWLock, re-increment the sema by the number of
1656 * additional signals received.
1658 LOG_LWDEBUG("LWLockWaitForVar", lock, "waiting");
1660 #ifdef LWLOCK_STATS
1661 lwstats->block_count++;
1662 #endif
1664 LWLockReportWaitStart(lock);
1665 if (TRACE_POSTGRESQL_LWLOCK_WAIT_START_ENABLED())
1666 TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), LW_EXCLUSIVE);
1668 for (;;)
1670 PGSemaphoreLock(proc->sem);
1671 if (proc->lwWaiting == LW_WS_NOT_WAITING)
1672 break;
1673 extraWaits++;
1676 #ifdef LOCK_DEBUG
1678 /* not waiting anymore */
1679 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1681 Assert(nwaiters < MAX_BACKENDS);
1683 #endif
1685 if (TRACE_POSTGRESQL_LWLOCK_WAIT_DONE_ENABLED())
1686 TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), LW_EXCLUSIVE);
1687 LWLockReportWaitEnd();
1689 LOG_LWDEBUG("LWLockWaitForVar", lock, "awakened");
1691 /* Now loop back and check the status of the lock again. */
1695 * Fix the process wait semaphore's count for any absorbed wakeups.
1697 while (extraWaits-- > 0)
1698 PGSemaphoreUnlock(proc->sem);
1701 * Now okay to allow cancel/die interrupts.
1703 RESUME_INTERRUPTS();
1705 return result;
1710 * LWLockUpdateVar - Update a variable and wake up waiters atomically
1712 * Sets *valptr to 'val', and wakes up all processes waiting for us with
1713 * LWLockWaitForVar(). It first sets the value atomically and then wakes up
1714 * waiting processes so that any process calling LWLockWaitForVar() on the same
1715 * lock is guaranteed to see the new value, and act accordingly.
1717 * The caller must be holding the lock in exclusive mode.
1719 void
1720 LWLockUpdateVar(LWLock *lock, pg_atomic_uint64 *valptr, uint64 val)
1722 proclist_head wakeup;
1723 proclist_mutable_iter iter;
1725 PRINT_LWDEBUG("LWLockUpdateVar", lock, LW_EXCLUSIVE);
1728 * Note that pg_atomic_exchange_u64 is a full barrier, so we're guaranteed
1729 * that the variable is updated before waking up waiters.
1731 pg_atomic_exchange_u64(valptr, val);
1733 proclist_init(&wakeup);
1735 LWLockWaitListLock(lock);
1737 Assert(pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE);
1740 * See if there are any LW_WAIT_UNTIL_FREE waiters that need to be woken
1741 * up. They are always in the front of the queue.
1743 proclist_foreach_modify(iter, &lock->waiters, lwWaitLink)
1745 PGPROC *waiter = GetPGProcByNumber(iter.cur);
1747 if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
1748 break;
1750 proclist_delete(&lock->waiters, iter.cur, lwWaitLink);
1751 proclist_push_tail(&wakeup, iter.cur, lwWaitLink);
1753 /* see LWLockWakeup() */
1754 Assert(waiter->lwWaiting == LW_WS_WAITING);
1755 waiter->lwWaiting = LW_WS_PENDING_WAKEUP;
1758 /* We are done updating shared state of the lock itself. */
1759 LWLockWaitListUnlock(lock);
1762 * Awaken any waiters I removed from the queue.
1764 proclist_foreach_modify(iter, &wakeup, lwWaitLink)
1766 PGPROC *waiter = GetPGProcByNumber(iter.cur);
1768 proclist_delete(&wakeup, iter.cur, lwWaitLink);
1769 /* check comment in LWLockWakeup() about this barrier */
1770 pg_write_barrier();
1771 waiter->lwWaiting = LW_WS_NOT_WAITING;
1772 PGSemaphoreUnlock(waiter->sem);
1778 * LWLockRelease - release a previously acquired lock
1780 void
1781 LWLockRelease(LWLock *lock)
1783 LWLockMode mode;
1784 uint32 oldstate;
1785 bool check_waiters;
1786 int i;
1789 * Remove lock from list of locks held. Usually, but not always, it will
1790 * be the latest-acquired lock; so search array backwards.
1792 for (i = num_held_lwlocks; --i >= 0;)
1793 if (lock == held_lwlocks[i].lock)
1794 break;
1796 if (i < 0)
1797 elog(ERROR, "lock %s is not held", T_NAME(lock));
1799 mode = held_lwlocks[i].mode;
1801 num_held_lwlocks--;
1802 for (; i < num_held_lwlocks; i++)
1803 held_lwlocks[i] = held_lwlocks[i + 1];
1805 PRINT_LWDEBUG("LWLockRelease", lock, mode);
1808 * Release my hold on lock, after that it can immediately be acquired by
1809 * others, even if we still have to wakeup other waiters.
1811 if (mode == LW_EXCLUSIVE)
1812 oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_EXCLUSIVE);
1813 else
1814 oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_SHARED);
1816 /* nobody else can have that kind of lock */
1817 Assert(!(oldstate & LW_VAL_EXCLUSIVE));
1819 if (TRACE_POSTGRESQL_LWLOCK_RELEASE_ENABLED())
1820 TRACE_POSTGRESQL_LWLOCK_RELEASE(T_NAME(lock));
1823 * We're still waiting for backends to get scheduled, don't wake them up
1824 * again.
1826 if ((oldstate & (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK)) ==
1827 (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK) &&
1828 (oldstate & LW_LOCK_MASK) == 0)
1829 check_waiters = true;
1830 else
1831 check_waiters = false;
1834 * As waking up waiters requires the spinlock to be acquired, only do so
1835 * if necessary.
1837 if (check_waiters)
1839 /* XXX: remove before commit? */
1840 LOG_LWDEBUG("LWLockRelease", lock, "releasing waiters");
1841 LWLockWakeup(lock);
1845 * Now okay to allow cancel/die interrupts.
1847 RESUME_INTERRUPTS();
1851 * LWLockReleaseClearVar - release a previously acquired lock, reset variable
1853 void
1854 LWLockReleaseClearVar(LWLock *lock, pg_atomic_uint64 *valptr, uint64 val)
1857 * Note that pg_atomic_exchange_u64 is a full barrier, so we're guaranteed
1858 * that the variable is updated before releasing the lock.
1860 pg_atomic_exchange_u64(valptr, val);
1862 LWLockRelease(lock);
1867 * LWLockReleaseAll - release all currently-held locks
1869 * Used to clean up after ereport(ERROR). An important difference between this
1870 * function and retail LWLockRelease calls is that InterruptHoldoffCount is
1871 * unchanged by this operation. This is necessary since InterruptHoldoffCount
1872 * has been set to an appropriate level earlier in error recovery. We could
1873 * decrement it below zero if we allow it to drop for each released lock!
1875 void
1876 LWLockReleaseAll(void)
1878 while (num_held_lwlocks > 0)
1880 HOLD_INTERRUPTS(); /* match the upcoming RESUME_INTERRUPTS */
1882 LWLockRelease(held_lwlocks[num_held_lwlocks - 1].lock);
1888 * LWLockHeldByMe - test whether my process holds a lock in any mode
1890 * This is meant as debug support only.
1892 bool
1893 LWLockHeldByMe(LWLock *lock)
1895 int i;
1897 for (i = 0; i < num_held_lwlocks; i++)
1899 if (held_lwlocks[i].lock == lock)
1900 return true;
1902 return false;
1906 * LWLockHeldByMe - test whether my process holds any of an array of locks
1908 * This is meant as debug support only.
1910 bool
1911 LWLockAnyHeldByMe(LWLock *lock, int nlocks, size_t stride)
1913 char *held_lock_addr;
1914 char *begin;
1915 char *end;
1916 int i;
1918 begin = (char *) lock;
1919 end = begin + nlocks * stride;
1920 for (i = 0; i < num_held_lwlocks; i++)
1922 held_lock_addr = (char *) held_lwlocks[i].lock;
1923 if (held_lock_addr >= begin &&
1924 held_lock_addr < end &&
1925 (held_lock_addr - begin) % stride == 0)
1926 return true;
1928 return false;
1932 * LWLockHeldByMeInMode - test whether my process holds a lock in given mode
1934 * This is meant as debug support only.
1936 bool
1937 LWLockHeldByMeInMode(LWLock *lock, LWLockMode mode)
1939 int i;
1941 for (i = 0; i < num_held_lwlocks; i++)
1943 if (held_lwlocks[i].lock == lock && held_lwlocks[i].mode == mode)
1944 return true;
1946 return false;