1 /*-------------------------------------------------------------------------
4 * Lightweight lock manager
6 * Lightweight locks are intended primarily to provide mutual exclusion of
7 * access to shared-memory data structures. Therefore, they offer both
8 * exclusive and shared lock modes (to support read/write and read-only
9 * access to a shared object). There are few other frammishes. User-level
10 * locking should be done with the full lock manager --- which depends on
11 * LWLocks to protect its shared state.
13 * In addition to exclusive and shared modes, lightweight locks can be used to
14 * wait until a variable changes value. The variable is initially not set
15 * when the lock is acquired with LWLockAcquire, i.e. it remains set to the
16 * value it was set to when the lock was released last, and can be updated
17 * without releasing the lock by calling LWLockUpdateVar. LWLockWaitForVar
18 * waits for the variable to be updated, or until the lock is free. When
19 * releasing the lock with LWLockReleaseClearVar() the value can be set to an
20 * appropriate value for a free lock. The meaning of the variable is up to
21 * the caller, the lightweight lock code just assigns and compares it.
23 * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
24 * Portions Copyright (c) 1994, Regents of the University of California
27 * src/backend/storage/lmgr/lwlock.c
31 * This used to be a pretty straight forward reader-writer lock
32 * implementation, in which the internal state was protected by a
33 * spinlock. Unfortunately the overhead of taking the spinlock proved to be
34 * too high for workloads/locks that were taken in shared mode very
35 * frequently. Often we were spinning in the (obviously exclusive) spinlock,
36 * while trying to acquire a shared lock that was actually free.
38 * Thus a new implementation was devised that provides wait-free shared lock
39 * acquisition for locks that aren't exclusively locked.
41 * The basic idea is to have a single atomic variable 'lockcount' instead of
42 * the formerly separate shared and exclusive counters and to use atomic
43 * operations to acquire the lock. That's fairly easy to do for plain
44 * rw-spinlocks, but a lot harder for something like LWLocks that want to wait
47 * For lock acquisition we use an atomic compare-and-exchange on the lockcount
48 * variable. For exclusive lock we swap in a sentinel value
49 * (LW_VAL_EXCLUSIVE), for shared locks we count the number of holders.
51 * To release the lock we use an atomic decrement to release the lock. If the
52 * new value is zero (we get that atomically), we know we can/have to release
55 * Obviously it is important that the sentinel value for exclusive locks
56 * doesn't conflict with the maximum number of possible share lockers -
57 * luckily MAX_BACKENDS makes that easily possible.
60 * The attentive reader might have noticed that naively doing the above has a
61 * glaring race condition: We try to lock using the atomic operations and
62 * notice that we have to wait. Unfortunately by the time we have finished
63 * queuing, the former locker very well might have already finished it's
64 * work. That's problematic because we're now stuck waiting inside the OS.
66 * To mitigate those races we use a two phased attempt at locking:
67 * Phase 1: Try to do it atomically, if we succeed, nice
68 * Phase 2: Add ourselves to the waitqueue of the lock
69 * Phase 3: Try to grab the lock again, if we succeed, remove ourselves from
71 * Phase 4: Sleep till wake-up, goto Phase 1
73 * This protects us against the problem from above as nobody can release too
74 * quick, before we're queued, since after Phase 2 we're already queued.
75 * -------------------------------------------------------------------------
79 #include "miscadmin.h"
82 #include "port/pg_bitutils.h"
83 #include "postmaster/postmaster.h"
84 #include "storage/proc.h"
85 #include "storage/proclist.h"
86 #include "storage/spin.h"
87 #include "utils/memutils.h"
90 #include "utils/hsearch.h"
94 #define LW_FLAG_HAS_WAITERS ((uint32) 1 << 30)
95 #define LW_FLAG_RELEASE_OK ((uint32) 1 << 29)
96 #define LW_FLAG_LOCKED ((uint32) 1 << 28)
98 #define LW_VAL_EXCLUSIVE ((uint32) 1 << 24)
99 #define LW_VAL_SHARED 1
101 #define LW_LOCK_MASK ((uint32) ((1 << 25)-1))
102 /* Must be greater than MAX_BACKENDS - which is 2^23-1, so we're fine. */
103 #define LW_SHARED_MASK ((uint32) ((1 << 24)-1))
105 StaticAssertDecl(LW_VAL_EXCLUSIVE
> (uint32
) MAX_BACKENDS
,
106 "MAX_BACKENDS too big for lwlock.c");
109 * There are three sorts of LWLock "tranches":
111 * 1. The individually-named locks defined in lwlocknames.h each have their
112 * own tranche. We absorb the names of these tranches from there into
113 * BuiltinTrancheNames here.
115 * 2. There are some predefined tranches for built-in groups of locks.
116 * These are listed in enum BuiltinTrancheIds in lwlock.h, and their names
117 * appear in BuiltinTrancheNames[] below.
119 * 3. Extensions can create new tranches, via either RequestNamedLWLockTranche
120 * or LWLockRegisterTranche. The names of these that are known in the current
121 * process appear in LWLockTrancheNames[].
123 * All these names are user-visible as wait event names, so choose with care
124 * ... and do not forget to update the documentation's list of wait events.
126 static const char *const BuiltinTrancheNames
[] = {
127 #define PG_LWLOCK(id, lockname) [id] = CppAsString(lockname) "Lock",
128 #include "storage/lwlocklist.h"
130 [LWTRANCHE_XACT_BUFFER
] = "XactBuffer",
131 [LWTRANCHE_COMMITTS_BUFFER
] = "CommitTsBuffer",
132 [LWTRANCHE_SUBTRANS_BUFFER
] = "SubtransBuffer",
133 [LWTRANCHE_MULTIXACTOFFSET_BUFFER
] = "MultiXactOffsetBuffer",
134 [LWTRANCHE_MULTIXACTMEMBER_BUFFER
] = "MultiXactMemberBuffer",
135 [LWTRANCHE_NOTIFY_BUFFER
] = "NotifyBuffer",
136 [LWTRANCHE_SERIAL_BUFFER
] = "SerialBuffer",
137 [LWTRANCHE_WAL_INSERT
] = "WALInsert",
138 [LWTRANCHE_BUFFER_CONTENT
] = "BufferContent",
139 [LWTRANCHE_REPLICATION_ORIGIN_STATE
] = "ReplicationOriginState",
140 [LWTRANCHE_REPLICATION_SLOT_IO
] = "ReplicationSlotIO",
141 [LWTRANCHE_LOCK_FASTPATH
] = "LockFastPath",
142 [LWTRANCHE_BUFFER_MAPPING
] = "BufferMapping",
143 [LWTRANCHE_LOCK_MANAGER
] = "LockManager",
144 [LWTRANCHE_PREDICATE_LOCK_MANAGER
] = "PredicateLockManager",
145 [LWTRANCHE_PARALLEL_HASH_JOIN
] = "ParallelHashJoin",
146 [LWTRANCHE_PARALLEL_QUERY_DSA
] = "ParallelQueryDSA",
147 [LWTRANCHE_PER_SESSION_DSA
] = "PerSessionDSA",
148 [LWTRANCHE_PER_SESSION_RECORD_TYPE
] = "PerSessionRecordType",
149 [LWTRANCHE_PER_SESSION_RECORD_TYPMOD
] = "PerSessionRecordTypmod",
150 [LWTRANCHE_SHARED_TUPLESTORE
] = "SharedTupleStore",
151 [LWTRANCHE_SHARED_TIDBITMAP
] = "SharedTidBitmap",
152 [LWTRANCHE_PARALLEL_APPEND
] = "ParallelAppend",
153 [LWTRANCHE_PER_XACT_PREDICATE_LIST
] = "PerXactPredicateList",
154 [LWTRANCHE_PGSTATS_DSA
] = "PgStatsDSA",
155 [LWTRANCHE_PGSTATS_HASH
] = "PgStatsHash",
156 [LWTRANCHE_PGSTATS_DATA
] = "PgStatsData",
157 [LWTRANCHE_LAUNCHER_DSA
] = "LogicalRepLauncherDSA",
158 [LWTRANCHE_LAUNCHER_HASH
] = "LogicalRepLauncherHash",
159 [LWTRANCHE_DSM_REGISTRY_DSA
] = "DSMRegistryDSA",
160 [LWTRANCHE_DSM_REGISTRY_HASH
] = "DSMRegistryHash",
161 [LWTRANCHE_COMMITTS_SLRU
] = "CommitTSSLRU",
162 [LWTRANCHE_MULTIXACTOFFSET_SLRU
] = "MultixactOffsetSLRU",
163 [LWTRANCHE_MULTIXACTMEMBER_SLRU
] = "MultixactMemberSLRU",
164 [LWTRANCHE_NOTIFY_SLRU
] = "NotifySLRU",
165 [LWTRANCHE_SERIAL_SLRU
] = "SerialSLRU",
166 [LWTRANCHE_SUBTRANS_SLRU
] = "SubtransSLRU",
167 [LWTRANCHE_XACT_SLRU
] = "XactSLRU",
168 [LWTRANCHE_PARALLEL_VACUUM_DSA
] = "ParallelVacuumDSA",
171 StaticAssertDecl(lengthof(BuiltinTrancheNames
) ==
172 LWTRANCHE_FIRST_USER_DEFINED
,
173 "missing entries in BuiltinTrancheNames[]");
176 * This is indexed by tranche ID minus LWTRANCHE_FIRST_USER_DEFINED, and
177 * stores the names of all dynamically-created tranches known to the current
178 * process. Any unused entries in the array will contain NULL.
180 static const char **LWLockTrancheNames
= NULL
;
181 static int LWLockTrancheNamesAllocated
= 0;
184 * This points to the main array of LWLocks in shared memory. Backends inherit
185 * the pointer by fork from the postmaster (except in the EXEC_BACKEND case,
186 * where we have special measures to pass it down).
188 LWLockPadded
*MainLWLockArray
= NULL
;
191 * We use this structure to keep track of locked LWLocks for release
192 * during error recovery. Normally, only a few will be held at once, but
193 * occasionally the number can be much higher; for example, the pg_buffercache
194 * extension locks all buffer partitions simultaneously.
196 #define MAX_SIMUL_LWLOCKS 200
198 /* struct representing the LWLocks we're holding */
199 typedef struct LWLockHandle
205 static int num_held_lwlocks
= 0;
206 static LWLockHandle held_lwlocks
[MAX_SIMUL_LWLOCKS
];
208 /* struct representing the LWLock tranche request for named tranche */
209 typedef struct NamedLWLockTrancheRequest
211 char tranche_name
[NAMEDATALEN
];
213 } NamedLWLockTrancheRequest
;
215 static NamedLWLockTrancheRequest
*NamedLWLockTrancheRequestArray
= NULL
;
216 static int NamedLWLockTrancheRequestsAllocated
= 0;
219 * NamedLWLockTrancheRequests is both the valid length of the request array,
220 * and the length of the shared-memory NamedLWLockTrancheArray later on.
221 * This variable and NamedLWLockTrancheArray are non-static so that
222 * postmaster.c can copy them to child processes in EXEC_BACKEND builds.
224 int NamedLWLockTrancheRequests
= 0;
226 /* points to data in shared memory: */
227 NamedLWLockTranche
*NamedLWLockTrancheArray
= NULL
;
229 static void InitializeLWLocks(void);
230 static inline void LWLockReportWaitStart(LWLock
*lock
);
231 static inline void LWLockReportWaitEnd(void);
232 static const char *GetLWTrancheName(uint16 trancheId
);
234 #define T_NAME(lock) \
235 GetLWTrancheName((lock)->tranche)
238 typedef struct lwlock_stats_key
244 typedef struct lwlock_stats
246 lwlock_stats_key key
;
247 int sh_acquire_count
;
248 int ex_acquire_count
;
250 int dequeue_self_count
;
251 int spin_delay_count
;
254 static HTAB
*lwlock_stats_htab
;
255 static lwlock_stats lwlock_stats_dummy
;
259 bool Trace_lwlocks
= false;
262 PRINT_LWDEBUG(const char *where
, LWLock
*lock
, LWLockMode mode
)
264 /* hide statement & context here, otherwise the log is just too verbose */
267 uint32 state
= pg_atomic_read_u32(&lock
->state
);
271 errhidecontext(true),
272 errmsg_internal("%d: %s(%s %p): excl %u shared %u haswaiters %u waiters %u rOK %d",
274 where
, T_NAME(lock
), lock
,
275 (state
& LW_VAL_EXCLUSIVE
) != 0,
276 state
& LW_SHARED_MASK
,
277 (state
& LW_FLAG_HAS_WAITERS
) != 0,
278 pg_atomic_read_u32(&lock
->nwaiters
),
279 (state
& LW_FLAG_RELEASE_OK
) != 0)));
284 LOG_LWDEBUG(const char *where
, LWLock
*lock
, const char *msg
)
286 /* hide statement & context here, otherwise the log is just too verbose */
291 errhidecontext(true),
292 errmsg_internal("%s(%s %p): %s", where
,
293 T_NAME(lock
), lock
, msg
)));
297 #else /* not LOCK_DEBUG */
298 #define PRINT_LWDEBUG(a,b,c) ((void)0)
299 #define LOG_LWDEBUG(a,b,c) ((void)0)
300 #endif /* LOCK_DEBUG */
304 static void init_lwlock_stats(void);
305 static void print_lwlock_stats(int code
, Datum arg
);
306 static lwlock_stats
* get_lwlock_stats_entry(LWLock
*lock
);
309 init_lwlock_stats(void)
312 static MemoryContext lwlock_stats_cxt
= NULL
;
313 static bool exit_registered
= false;
315 if (lwlock_stats_cxt
!= NULL
)
316 MemoryContextDelete(lwlock_stats_cxt
);
319 * The LWLock stats will be updated within a critical section, which
320 * requires allocating new hash entries. Allocations within a critical
321 * section are normally not allowed because running out of memory would
322 * lead to a PANIC, but LWLOCK_STATS is debugging code that's not normally
323 * turned on in production, so that's an acceptable risk. The hash entries
324 * are small, so the risk of running out of memory is minimal in practice.
326 lwlock_stats_cxt
= AllocSetContextCreate(TopMemoryContext
,
328 ALLOCSET_DEFAULT_SIZES
);
329 MemoryContextAllowInCriticalSection(lwlock_stats_cxt
, true);
331 ctl
.keysize
= sizeof(lwlock_stats_key
);
332 ctl
.entrysize
= sizeof(lwlock_stats
);
333 ctl
.hcxt
= lwlock_stats_cxt
;
334 lwlock_stats_htab
= hash_create("lwlock stats", 16384, &ctl
,
335 HASH_ELEM
| HASH_BLOBS
| HASH_CONTEXT
);
336 if (!exit_registered
)
338 on_shmem_exit(print_lwlock_stats
, 0);
339 exit_registered
= true;
344 print_lwlock_stats(int code
, Datum arg
)
346 HASH_SEQ_STATUS scan
;
347 lwlock_stats
*lwstats
;
349 hash_seq_init(&scan
, lwlock_stats_htab
);
351 /* Grab an LWLock to keep different backends from mixing reports */
352 LWLockAcquire(&MainLWLockArray
[0].lock
, LW_EXCLUSIVE
);
354 while ((lwstats
= (lwlock_stats
*) hash_seq_search(&scan
)) != NULL
)
357 "PID %d lwlock %s %p: shacq %u exacq %u blk %u spindelay %u dequeue self %u\n",
358 MyProcPid
, GetLWTrancheName(lwstats
->key
.tranche
),
359 lwstats
->key
.instance
, lwstats
->sh_acquire_count
,
360 lwstats
->ex_acquire_count
, lwstats
->block_count
,
361 lwstats
->spin_delay_count
, lwstats
->dequeue_self_count
);
364 LWLockRelease(&MainLWLockArray
[0].lock
);
367 static lwlock_stats
*
368 get_lwlock_stats_entry(LWLock
*lock
)
370 lwlock_stats_key key
;
371 lwlock_stats
*lwstats
;
375 * During shared memory initialization, the hash table doesn't exist yet.
376 * Stats of that phase aren't very interesting, so just collect operations
377 * on all locks in a single dummy entry.
379 if (lwlock_stats_htab
== NULL
)
380 return &lwlock_stats_dummy
;
382 /* Fetch or create the entry. */
383 MemSet(&key
, 0, sizeof(key
));
384 key
.tranche
= lock
->tranche
;
386 lwstats
= hash_search(lwlock_stats_htab
, &key
, HASH_ENTER
, &found
);
389 lwstats
->sh_acquire_count
= 0;
390 lwstats
->ex_acquire_count
= 0;
391 lwstats
->block_count
= 0;
392 lwstats
->dequeue_self_count
= 0;
393 lwstats
->spin_delay_count
= 0;
397 #endif /* LWLOCK_STATS */
401 * Compute number of LWLocks required by named tranches. These will be
402 * allocated in the main array.
405 NumLWLocksForNamedTranches(void)
410 for (i
= 0; i
< NamedLWLockTrancheRequests
; i
++)
411 numLocks
+= NamedLWLockTrancheRequestArray
[i
].num_lwlocks
;
417 * Compute shmem space needed for LWLocks and named tranches.
420 LWLockShmemSize(void)
424 int numLocks
= NUM_FIXED_LWLOCKS
;
426 /* Calculate total number of locks needed in the main array. */
427 numLocks
+= NumLWLocksForNamedTranches();
429 /* Space for the LWLock array. */
430 size
= mul_size(numLocks
, sizeof(LWLockPadded
));
432 /* Space for dynamic allocation counter, plus room for alignment. */
433 size
= add_size(size
, sizeof(int) + LWLOCK_PADDED_SIZE
);
435 /* space for named tranches. */
436 size
= add_size(size
, mul_size(NamedLWLockTrancheRequests
, sizeof(NamedLWLockTranche
)));
438 /* space for name of each tranche. */
439 for (i
= 0; i
< NamedLWLockTrancheRequests
; i
++)
440 size
= add_size(size
, strlen(NamedLWLockTrancheRequestArray
[i
].tranche_name
) + 1);
446 * Allocate shmem space for the main LWLock array and all tranches and
447 * initialize it. We also register extension LWLock tranches here.
452 if (!IsUnderPostmaster
)
454 Size spaceLocks
= LWLockShmemSize();
459 ptr
= (char *) ShmemAlloc(spaceLocks
);
461 /* Leave room for dynamic allocation of tranches */
464 /* Ensure desired alignment of LWLock array */
465 ptr
+= LWLOCK_PADDED_SIZE
- ((uintptr_t) ptr
) % LWLOCK_PADDED_SIZE
;
467 MainLWLockArray
= (LWLockPadded
*) ptr
;
470 * Initialize the dynamic-allocation counter for tranches, which is
471 * stored just before the first LWLock.
473 LWLockCounter
= (int *) ((char *) MainLWLockArray
- sizeof(int));
474 *LWLockCounter
= LWTRANCHE_FIRST_USER_DEFINED
;
476 /* Initialize all LWLocks */
480 /* Register named extension LWLock tranches in the current process. */
481 for (int i
= 0; i
< NamedLWLockTrancheRequests
; i
++)
482 LWLockRegisterTranche(NamedLWLockTrancheArray
[i
].trancheId
,
483 NamedLWLockTrancheArray
[i
].trancheName
);
487 * Initialize LWLocks that are fixed and those belonging to named tranches.
490 InitializeLWLocks(void)
492 int numNamedLocks
= NumLWLocksForNamedTranches();
498 /* Initialize all individual LWLocks in main array */
499 for (id
= 0, lock
= MainLWLockArray
; id
< NUM_INDIVIDUAL_LWLOCKS
; id
++, lock
++)
500 LWLockInitialize(&lock
->lock
, id
);
502 /* Initialize buffer mapping LWLocks in main array */
503 lock
= MainLWLockArray
+ BUFFER_MAPPING_LWLOCK_OFFSET
;
504 for (id
= 0; id
< NUM_BUFFER_PARTITIONS
; id
++, lock
++)
505 LWLockInitialize(&lock
->lock
, LWTRANCHE_BUFFER_MAPPING
);
507 /* Initialize lmgrs' LWLocks in main array */
508 lock
= MainLWLockArray
+ LOCK_MANAGER_LWLOCK_OFFSET
;
509 for (id
= 0; id
< NUM_LOCK_PARTITIONS
; id
++, lock
++)
510 LWLockInitialize(&lock
->lock
, LWTRANCHE_LOCK_MANAGER
);
512 /* Initialize predicate lmgrs' LWLocks in main array */
513 lock
= MainLWLockArray
+ PREDICATELOCK_MANAGER_LWLOCK_OFFSET
;
514 for (id
= 0; id
< NUM_PREDICATELOCK_PARTITIONS
; id
++, lock
++)
515 LWLockInitialize(&lock
->lock
, LWTRANCHE_PREDICATE_LOCK_MANAGER
);
518 * Copy the info about any named tranches into shared memory (so that
519 * other processes can see it), and initialize the requested LWLocks.
521 if (NamedLWLockTrancheRequests
> 0)
525 NamedLWLockTrancheArray
= (NamedLWLockTranche
*)
526 &MainLWLockArray
[NUM_FIXED_LWLOCKS
+ numNamedLocks
];
528 trancheNames
= (char *) NamedLWLockTrancheArray
+
529 (NamedLWLockTrancheRequests
* sizeof(NamedLWLockTranche
));
530 lock
= &MainLWLockArray
[NUM_FIXED_LWLOCKS
];
532 for (i
= 0; i
< NamedLWLockTrancheRequests
; i
++)
534 NamedLWLockTrancheRequest
*request
;
535 NamedLWLockTranche
*tranche
;
538 request
= &NamedLWLockTrancheRequestArray
[i
];
539 tranche
= &NamedLWLockTrancheArray
[i
];
542 trancheNames
+= strlen(request
->tranche_name
) + 1;
543 strcpy(name
, request
->tranche_name
);
544 tranche
->trancheId
= LWLockNewTrancheId();
545 tranche
->trancheName
= name
;
547 for (j
= 0; j
< request
->num_lwlocks
; j
++, lock
++)
548 LWLockInitialize(&lock
->lock
, tranche
->trancheId
);
554 * InitLWLockAccess - initialize backend-local state needed to hold LWLocks
557 InitLWLockAccess(void)
565 * GetNamedLWLockTranche - returns the base address of LWLock from the
568 * Caller needs to retrieve the requested number of LWLocks starting from
569 * the base lock address returned by this API. This can be used for
570 * tranches that are requested by using RequestNamedLWLockTranche() API.
573 GetNamedLWLockTranche(const char *tranche_name
)
579 * Obtain the position of base address of LWLock belonging to requested
580 * tranche_name in MainLWLockArray. LWLocks for named tranches are placed
581 * in MainLWLockArray after fixed locks.
583 lock_pos
= NUM_FIXED_LWLOCKS
;
584 for (i
= 0; i
< NamedLWLockTrancheRequests
; i
++)
586 if (strcmp(NamedLWLockTrancheRequestArray
[i
].tranche_name
,
588 return &MainLWLockArray
[lock_pos
];
590 lock_pos
+= NamedLWLockTrancheRequestArray
[i
].num_lwlocks
;
593 elog(ERROR
, "requested tranche is not registered");
595 /* just to keep compiler quiet */
600 * Allocate a new tranche ID.
603 LWLockNewTrancheId(void)
608 LWLockCounter
= (int *) ((char *) MainLWLockArray
- sizeof(int));
609 /* We use the ShmemLock spinlock to protect LWLockCounter */
610 SpinLockAcquire(ShmemLock
);
611 result
= (*LWLockCounter
)++;
612 SpinLockRelease(ShmemLock
);
618 * Register a dynamic tranche name in the lookup table of the current process.
620 * This routine will save a pointer to the tranche name passed as an argument,
621 * so the name should be allocated in a backend-lifetime context
622 * (shared memory, TopMemoryContext, static constant, or similar).
624 * The tranche name will be user-visible as a wait event name, so try to
625 * use a name that fits the style for those.
628 LWLockRegisterTranche(int tranche_id
, const char *tranche_name
)
630 /* This should only be called for user-defined tranches. */
631 if (tranche_id
< LWTRANCHE_FIRST_USER_DEFINED
)
634 /* Convert to array index. */
635 tranche_id
-= LWTRANCHE_FIRST_USER_DEFINED
;
637 /* If necessary, create or enlarge array. */
638 if (tranche_id
>= LWLockTrancheNamesAllocated
)
642 newalloc
= pg_nextpower2_32(Max(8, tranche_id
+ 1));
644 if (LWLockTrancheNames
== NULL
)
645 LWLockTrancheNames
= (const char **)
646 MemoryContextAllocZero(TopMemoryContext
,
647 newalloc
* sizeof(char *));
650 repalloc0_array(LWLockTrancheNames
, const char *, LWLockTrancheNamesAllocated
, newalloc
);
651 LWLockTrancheNamesAllocated
= newalloc
;
654 LWLockTrancheNames
[tranche_id
] = tranche_name
;
658 * RequestNamedLWLockTranche
659 * Request that extra LWLocks be allocated during postmaster
662 * This may only be called via the shmem_request_hook of a library that is
663 * loaded into the postmaster via shared_preload_libraries. Calls from
664 * elsewhere will fail.
666 * The tranche name will be user-visible as a wait event name, so try to
667 * use a name that fits the style for those.
670 RequestNamedLWLockTranche(const char *tranche_name
, int num_lwlocks
)
672 NamedLWLockTrancheRequest
*request
;
674 if (!process_shmem_requests_in_progress
)
675 elog(FATAL
, "cannot request additional LWLocks outside shmem_request_hook");
677 if (NamedLWLockTrancheRequestArray
== NULL
)
679 NamedLWLockTrancheRequestsAllocated
= 16;
680 NamedLWLockTrancheRequestArray
= (NamedLWLockTrancheRequest
*)
681 MemoryContextAlloc(TopMemoryContext
,
682 NamedLWLockTrancheRequestsAllocated
683 * sizeof(NamedLWLockTrancheRequest
));
686 if (NamedLWLockTrancheRequests
>= NamedLWLockTrancheRequestsAllocated
)
688 int i
= pg_nextpower2_32(NamedLWLockTrancheRequests
+ 1);
690 NamedLWLockTrancheRequestArray
= (NamedLWLockTrancheRequest
*)
691 repalloc(NamedLWLockTrancheRequestArray
,
692 i
* sizeof(NamedLWLockTrancheRequest
));
693 NamedLWLockTrancheRequestsAllocated
= i
;
696 request
= &NamedLWLockTrancheRequestArray
[NamedLWLockTrancheRequests
];
697 Assert(strlen(tranche_name
) + 1 <= NAMEDATALEN
);
698 strlcpy(request
->tranche_name
, tranche_name
, NAMEDATALEN
);
699 request
->num_lwlocks
= num_lwlocks
;
700 NamedLWLockTrancheRequests
++;
704 * LWLockInitialize - initialize a new lwlock; it's initially unlocked
707 LWLockInitialize(LWLock
*lock
, int tranche_id
)
709 pg_atomic_init_u32(&lock
->state
, LW_FLAG_RELEASE_OK
);
711 pg_atomic_init_u32(&lock
->nwaiters
, 0);
713 lock
->tranche
= tranche_id
;
714 proclist_init(&lock
->waiters
);
718 * Report start of wait event for light-weight locks.
720 * This function will be used by all the light-weight lock calls which
721 * needs to wait to acquire the lock. This function distinguishes wait
722 * event based on tranche and lock id.
725 LWLockReportWaitStart(LWLock
*lock
)
727 pgstat_report_wait_start(PG_WAIT_LWLOCK
| lock
->tranche
);
731 * Report end of wait event for light-weight locks.
734 LWLockReportWaitEnd(void)
736 pgstat_report_wait_end();
740 * Return the name of an LWLock tranche.
743 GetLWTrancheName(uint16 trancheId
)
745 /* Built-in tranche or individual LWLock? */
746 if (trancheId
< LWTRANCHE_FIRST_USER_DEFINED
)
747 return BuiltinTrancheNames
[trancheId
];
750 * It's an extension tranche, so look in LWLockTrancheNames[]. However,
751 * it's possible that the tranche has never been registered in the current
752 * process, in which case give up and return "extension".
754 trancheId
-= LWTRANCHE_FIRST_USER_DEFINED
;
756 if (trancheId
>= LWLockTrancheNamesAllocated
||
757 LWLockTrancheNames
[trancheId
] == NULL
)
760 return LWLockTrancheNames
[trancheId
];
764 * Return an identifier for an LWLock based on the wait class and event.
767 GetLWLockIdentifier(uint32 classId
, uint16 eventId
)
769 Assert(classId
== PG_WAIT_LWLOCK
);
770 /* The event IDs are just tranche numbers. */
771 return GetLWTrancheName(eventId
);
775 * Internal function that tries to atomically acquire the lwlock in the passed
778 * This function will not block waiting for a lock to become free - that's the
781 * Returns true if the lock isn't free and we need to wait.
784 LWLockAttemptLock(LWLock
*lock
, LWLockMode mode
)
788 Assert(mode
== LW_EXCLUSIVE
|| mode
== LW_SHARED
);
791 * Read once outside the loop, later iterations will get the newer value
792 * via compare & exchange.
794 old_state
= pg_atomic_read_u32(&lock
->state
);
796 /* loop until we've determined whether we could acquire the lock or not */
799 uint32 desired_state
;
802 desired_state
= old_state
;
804 if (mode
== LW_EXCLUSIVE
)
806 lock_free
= (old_state
& LW_LOCK_MASK
) == 0;
808 desired_state
+= LW_VAL_EXCLUSIVE
;
812 lock_free
= (old_state
& LW_VAL_EXCLUSIVE
) == 0;
814 desired_state
+= LW_VAL_SHARED
;
818 * Attempt to swap in the state we are expecting. If we didn't see
819 * lock to be free, that's just the old value. If we saw it as free,
820 * we'll attempt to mark it acquired. The reason that we always swap
821 * in the value is that this doubles as a memory barrier. We could try
822 * to be smarter and only swap in values if we saw the lock as free,
823 * but benchmark haven't shown it as beneficial so far.
825 * Retry if the value changed since we last looked at it.
827 if (pg_atomic_compare_exchange_u32(&lock
->state
,
828 &old_state
, desired_state
))
832 /* Great! Got the lock. */
834 if (mode
== LW_EXCLUSIVE
)
835 lock
->owner
= MyProc
;
840 return true; /* somebody else has the lock */
847 * Lock the LWLock's wait list against concurrent activity.
849 * NB: even though the wait list is locked, non-conflicting lock operations
850 * may still happen concurrently.
852 * Time spent holding mutex should be short!
855 LWLockWaitListLock(LWLock
*lock
)
859 lwlock_stats
*lwstats
;
862 lwstats
= get_lwlock_stats_entry(lock
);
867 /* always try once to acquire lock directly */
868 old_state
= pg_atomic_fetch_or_u32(&lock
->state
, LW_FLAG_LOCKED
);
869 if (!(old_state
& LW_FLAG_LOCKED
))
870 break; /* got lock */
872 /* and then spin without atomic operations until lock is released */
874 SpinDelayStatus delayStatus
;
876 init_local_spin_delay(&delayStatus
);
878 while (old_state
& LW_FLAG_LOCKED
)
880 perform_spin_delay(&delayStatus
);
881 old_state
= pg_atomic_read_u32(&lock
->state
);
884 delays
+= delayStatus
.delays
;
886 finish_spin_delay(&delayStatus
);
890 * Retry. The lock might obviously already be re-acquired by the time
891 * we're attempting to get it again.
896 lwstats
->spin_delay_count
+= delays
;
901 * Unlock the LWLock's wait list.
903 * Note that it can be more efficient to manipulate flags and release the
904 * locks in a single atomic operation.
907 LWLockWaitListUnlock(LWLock
*lock
)
909 uint32 old_state PG_USED_FOR_ASSERTS_ONLY
;
911 old_state
= pg_atomic_fetch_and_u32(&lock
->state
, ~LW_FLAG_LOCKED
);
913 Assert(old_state
& LW_FLAG_LOCKED
);
917 * Wakeup all the lockers that currently have a chance to acquire the lock.
920 LWLockWakeup(LWLock
*lock
)
923 bool wokeup_somebody
= false;
924 proclist_head wakeup
;
925 proclist_mutable_iter iter
;
927 proclist_init(&wakeup
);
929 new_release_ok
= true;
931 /* lock wait list while collecting backends to wake up */
932 LWLockWaitListLock(lock
);
934 proclist_foreach_modify(iter
, &lock
->waiters
, lwWaitLink
)
936 PGPROC
*waiter
= GetPGProcByNumber(iter
.cur
);
938 if (wokeup_somebody
&& waiter
->lwWaitMode
== LW_EXCLUSIVE
)
941 proclist_delete(&lock
->waiters
, iter
.cur
, lwWaitLink
);
942 proclist_push_tail(&wakeup
, iter
.cur
, lwWaitLink
);
944 if (waiter
->lwWaitMode
!= LW_WAIT_UNTIL_FREE
)
947 * Prevent additional wakeups until retryer gets to run. Backends
948 * that are just waiting for the lock to become free don't retry
951 new_release_ok
= false;
954 * Don't wakeup (further) exclusive locks.
956 wokeup_somebody
= true;
960 * Signal that the process isn't on the wait list anymore. This allows
961 * LWLockDequeueSelf() to remove itself of the waitlist with a
962 * proclist_delete(), rather than having to check if it has been
963 * removed from the list.
965 Assert(waiter
->lwWaiting
== LW_WS_WAITING
);
966 waiter
->lwWaiting
= LW_WS_PENDING_WAKEUP
;
969 * Once we've woken up an exclusive lock, there's no point in waking
972 if (waiter
->lwWaitMode
== LW_EXCLUSIVE
)
976 Assert(proclist_is_empty(&wakeup
) || pg_atomic_read_u32(&lock
->state
) & LW_FLAG_HAS_WAITERS
);
978 /* unset required flags, and release lock, in one fell swoop */
981 uint32 desired_state
;
983 old_state
= pg_atomic_read_u32(&lock
->state
);
986 desired_state
= old_state
;
988 /* compute desired flags */
991 desired_state
|= LW_FLAG_RELEASE_OK
;
993 desired_state
&= ~LW_FLAG_RELEASE_OK
;
995 if (proclist_is_empty(&wakeup
))
996 desired_state
&= ~LW_FLAG_HAS_WAITERS
;
998 desired_state
&= ~LW_FLAG_LOCKED
; /* release lock */
1000 if (pg_atomic_compare_exchange_u32(&lock
->state
, &old_state
,
1006 /* Awaken any waiters I removed from the queue. */
1007 proclist_foreach_modify(iter
, &wakeup
, lwWaitLink
)
1009 PGPROC
*waiter
= GetPGProcByNumber(iter
.cur
);
1011 LOG_LWDEBUG("LWLockRelease", lock
, "release waiter");
1012 proclist_delete(&wakeup
, iter
.cur
, lwWaitLink
);
1015 * Guarantee that lwWaiting being unset only becomes visible once the
1016 * unlink from the link has completed. Otherwise the target backend
1017 * could be woken up for other reason and enqueue for a new lock - if
1018 * that happens before the list unlink happens, the list would end up
1021 * The barrier pairs with the LWLockWaitListLock() when enqueuing for
1025 waiter
->lwWaiting
= LW_WS_NOT_WAITING
;
1026 PGSemaphoreUnlock(waiter
->sem
);
1031 * Add ourselves to the end of the queue.
1033 * NB: Mode can be LW_WAIT_UNTIL_FREE here!
1036 LWLockQueueSelf(LWLock
*lock
, LWLockMode mode
)
1039 * If we don't have a PGPROC structure, there's no way to wait. This
1040 * should never occur, since MyProc should only be null during shared
1041 * memory initialization.
1044 elog(PANIC
, "cannot wait without a PGPROC structure");
1046 if (MyProc
->lwWaiting
!= LW_WS_NOT_WAITING
)
1047 elog(PANIC
, "queueing for lock while waiting on another one");
1049 LWLockWaitListLock(lock
);
1051 /* setting the flag is protected by the spinlock */
1052 pg_atomic_fetch_or_u32(&lock
->state
, LW_FLAG_HAS_WAITERS
);
1054 MyProc
->lwWaiting
= LW_WS_WAITING
;
1055 MyProc
->lwWaitMode
= mode
;
1057 /* LW_WAIT_UNTIL_FREE waiters are always at the front of the queue */
1058 if (mode
== LW_WAIT_UNTIL_FREE
)
1059 proclist_push_head(&lock
->waiters
, MyProcNumber
, lwWaitLink
);
1061 proclist_push_tail(&lock
->waiters
, MyProcNumber
, lwWaitLink
);
1063 /* Can release the mutex now */
1064 LWLockWaitListUnlock(lock
);
1067 pg_atomic_fetch_add_u32(&lock
->nwaiters
, 1);
1072 * Remove ourselves from the waitlist.
1074 * This is used if we queued ourselves because we thought we needed to sleep
1075 * but, after further checking, we discovered that we don't actually need to
1079 LWLockDequeueSelf(LWLock
*lock
)
1084 lwlock_stats
*lwstats
;
1086 lwstats
= get_lwlock_stats_entry(lock
);
1088 lwstats
->dequeue_self_count
++;
1091 LWLockWaitListLock(lock
);
1094 * Remove ourselves from the waitlist, unless we've already been removed.
1095 * The removal happens with the wait list lock held, so there's no race in
1098 on_waitlist
= MyProc
->lwWaiting
== LW_WS_WAITING
;
1100 proclist_delete(&lock
->waiters
, MyProcNumber
, lwWaitLink
);
1102 if (proclist_is_empty(&lock
->waiters
) &&
1103 (pg_atomic_read_u32(&lock
->state
) & LW_FLAG_HAS_WAITERS
) != 0)
1105 pg_atomic_fetch_and_u32(&lock
->state
, ~LW_FLAG_HAS_WAITERS
);
1108 /* XXX: combine with fetch_and above? */
1109 LWLockWaitListUnlock(lock
);
1111 /* clear waiting state again, nice for debugging */
1113 MyProc
->lwWaiting
= LW_WS_NOT_WAITING
;
1119 * Somebody else dequeued us and has or will wake us up. Deal with the
1120 * superfluous absorption of a wakeup.
1124 * Reset RELEASE_OK flag if somebody woke us before we removed
1125 * ourselves - they'll have set it to false.
1127 pg_atomic_fetch_or_u32(&lock
->state
, LW_FLAG_RELEASE_OK
);
1130 * Now wait for the scheduled wakeup, otherwise our ->lwWaiting would
1131 * get reset at some inconvenient point later. Most of the time this
1132 * will immediately return.
1136 PGSemaphoreLock(MyProc
->sem
);
1137 if (MyProc
->lwWaiting
== LW_WS_NOT_WAITING
)
1143 * Fix the process wait semaphore's count for any absorbed wakeups.
1145 while (extraWaits
-- > 0)
1146 PGSemaphoreUnlock(MyProc
->sem
);
1151 /* not waiting anymore */
1152 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY
= pg_atomic_fetch_sub_u32(&lock
->nwaiters
, 1);
1154 Assert(nwaiters
< MAX_BACKENDS
);
1160 * LWLockAcquire - acquire a lightweight lock in the specified mode
1162 * If the lock is not available, sleep until it is. Returns true if the lock
1163 * was available immediately, false if we had to sleep.
1165 * Side effect: cancel/die interrupts are held off until lock release.
1168 LWLockAcquire(LWLock
*lock
, LWLockMode mode
)
1170 PGPROC
*proc
= MyProc
;
1174 lwlock_stats
*lwstats
;
1176 lwstats
= get_lwlock_stats_entry(lock
);
1179 Assert(mode
== LW_SHARED
|| mode
== LW_EXCLUSIVE
);
1181 PRINT_LWDEBUG("LWLockAcquire", lock
, mode
);
1184 /* Count lock acquisition attempts */
1185 if (mode
== LW_EXCLUSIVE
)
1186 lwstats
->ex_acquire_count
++;
1188 lwstats
->sh_acquire_count
++;
1189 #endif /* LWLOCK_STATS */
1192 * We can't wait if we haven't got a PGPROC. This should only occur
1193 * during bootstrap or shared memory initialization. Put an Assert here
1194 * to catch unsafe coding practices.
1196 Assert(!(proc
== NULL
&& IsUnderPostmaster
));
1198 /* Ensure we will have room to remember the lock */
1199 if (num_held_lwlocks
>= MAX_SIMUL_LWLOCKS
)
1200 elog(ERROR
, "too many LWLocks taken");
1203 * Lock out cancel/die interrupts until we exit the code section protected
1204 * by the LWLock. This ensures that interrupts will not interfere with
1205 * manipulations of data structures in shared memory.
1210 * Loop here to try to acquire lock after each time we are signaled by
1213 * NOTE: it might seem better to have LWLockRelease actually grant us the
1214 * lock, rather than retrying and possibly having to go back to sleep. But
1215 * in practice that is no good because it means a process swap for every
1216 * lock acquisition when two or more processes are contending for the same
1217 * lock. Since LWLocks are normally used to protect not-very-long
1218 * sections of computation, a process needs to be able to acquire and
1219 * release the same lock many times during a single CPU time slice, even
1220 * in the presence of contention. The efficiency of being able to do that
1221 * outweighs the inefficiency of sometimes wasting a process dispatch
1222 * cycle because the lock is not free when a released waiter finally gets
1223 * to run. See pgsql-hackers archives for 29-Dec-01.
1230 * Try to grab the lock the first time, we're not in the waitqueue
1233 mustwait
= LWLockAttemptLock(lock
, mode
);
1237 LOG_LWDEBUG("LWLockAcquire", lock
, "immediately acquired lock");
1238 break; /* got the lock */
1242 * Ok, at this point we couldn't grab the lock on the first try. We
1243 * cannot simply queue ourselves to the end of the list and wait to be
1244 * woken up because by now the lock could long have been released.
1245 * Instead add us to the queue and try to grab the lock again. If we
1246 * succeed we need to revert the queuing and be happy, otherwise we
1247 * recheck the lock. If we still couldn't grab it, we know that the
1248 * other locker will see our queue entries when releasing since they
1249 * existed before we checked for the lock.
1252 /* add to the queue */
1253 LWLockQueueSelf(lock
, mode
);
1255 /* we're now guaranteed to be woken up if necessary */
1256 mustwait
= LWLockAttemptLock(lock
, mode
);
1258 /* ok, grabbed the lock the second time round, need to undo queueing */
1261 LOG_LWDEBUG("LWLockAcquire", lock
, "acquired, undoing queue");
1263 LWLockDequeueSelf(lock
);
1268 * Wait until awakened.
1270 * It is possible that we get awakened for a reason other than being
1271 * signaled by LWLockRelease. If so, loop back and wait again. Once
1272 * we've gotten the LWLock, re-increment the sema by the number of
1273 * additional signals received.
1275 LOG_LWDEBUG("LWLockAcquire", lock
, "waiting");
1278 lwstats
->block_count
++;
1281 LWLockReportWaitStart(lock
);
1282 if (TRACE_POSTGRESQL_LWLOCK_WAIT_START_ENABLED())
1283 TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock
), mode
);
1287 PGSemaphoreLock(proc
->sem
);
1288 if (proc
->lwWaiting
== LW_WS_NOT_WAITING
)
1293 /* Retrying, allow LWLockRelease to release waiters again. */
1294 pg_atomic_fetch_or_u32(&lock
->state
, LW_FLAG_RELEASE_OK
);
1298 /* not waiting anymore */
1299 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY
= pg_atomic_fetch_sub_u32(&lock
->nwaiters
, 1);
1301 Assert(nwaiters
< MAX_BACKENDS
);
1305 if (TRACE_POSTGRESQL_LWLOCK_WAIT_DONE_ENABLED())
1306 TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock
), mode
);
1307 LWLockReportWaitEnd();
1309 LOG_LWDEBUG("LWLockAcquire", lock
, "awakened");
1311 /* Now loop back and try to acquire lock again. */
1315 if (TRACE_POSTGRESQL_LWLOCK_ACQUIRE_ENABLED())
1316 TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock
), mode
);
1318 /* Add lock to list of locks held by this backend */
1319 held_lwlocks
[num_held_lwlocks
].lock
= lock
;
1320 held_lwlocks
[num_held_lwlocks
++].mode
= mode
;
1323 * Fix the process wait semaphore's count for any absorbed wakeups.
1325 while (extraWaits
-- > 0)
1326 PGSemaphoreUnlock(proc
->sem
);
1332 * LWLockConditionalAcquire - acquire a lightweight lock in the specified mode
1334 * If the lock is not available, return false with no side-effects.
1336 * If successful, cancel/die interrupts are held off until lock release.
1339 LWLockConditionalAcquire(LWLock
*lock
, LWLockMode mode
)
1343 Assert(mode
== LW_SHARED
|| mode
== LW_EXCLUSIVE
);
1345 PRINT_LWDEBUG("LWLockConditionalAcquire", lock
, mode
);
1347 /* Ensure we will have room to remember the lock */
1348 if (num_held_lwlocks
>= MAX_SIMUL_LWLOCKS
)
1349 elog(ERROR
, "too many LWLocks taken");
1352 * Lock out cancel/die interrupts until we exit the code section protected
1353 * by the LWLock. This ensures that interrupts will not interfere with
1354 * manipulations of data structures in shared memory.
1358 /* Check for the lock */
1359 mustwait
= LWLockAttemptLock(lock
, mode
);
1363 /* Failed to get lock, so release interrupt holdoff */
1364 RESUME_INTERRUPTS();
1366 LOG_LWDEBUG("LWLockConditionalAcquire", lock
, "failed");
1367 if (TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL_ENABLED())
1368 TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(T_NAME(lock
), mode
);
1372 /* Add lock to list of locks held by this backend */
1373 held_lwlocks
[num_held_lwlocks
].lock
= lock
;
1374 held_lwlocks
[num_held_lwlocks
++].mode
= mode
;
1375 if (TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_ENABLED())
1376 TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(T_NAME(lock
), mode
);
1382 * LWLockAcquireOrWait - Acquire lock, or wait until it's free
1384 * The semantics of this function are a bit funky. If the lock is currently
1385 * free, it is acquired in the given mode, and the function returns true. If
1386 * the lock isn't immediately free, the function waits until it is released
1387 * and returns false, but does not acquire the lock.
1389 * This is currently used for WALWriteLock: when a backend flushes the WAL,
1390 * holding WALWriteLock, it can flush the commit records of many other
1391 * backends as a side-effect. Those other backends need to wait until the
1392 * flush finishes, but don't need to acquire the lock anymore. They can just
1393 * wake up, observe that their records have already been flushed, and return.
1396 LWLockAcquireOrWait(LWLock
*lock
, LWLockMode mode
)
1398 PGPROC
*proc
= MyProc
;
1402 lwlock_stats
*lwstats
;
1404 lwstats
= get_lwlock_stats_entry(lock
);
1407 Assert(mode
== LW_SHARED
|| mode
== LW_EXCLUSIVE
);
1409 PRINT_LWDEBUG("LWLockAcquireOrWait", lock
, mode
);
1411 /* Ensure we will have room to remember the lock */
1412 if (num_held_lwlocks
>= MAX_SIMUL_LWLOCKS
)
1413 elog(ERROR
, "too many LWLocks taken");
1416 * Lock out cancel/die interrupts until we exit the code section protected
1417 * by the LWLock. This ensures that interrupts will not interfere with
1418 * manipulations of data structures in shared memory.
1423 * NB: We're using nearly the same twice-in-a-row lock acquisition
1424 * protocol as LWLockAcquire(). Check its comments for details.
1426 mustwait
= LWLockAttemptLock(lock
, mode
);
1430 LWLockQueueSelf(lock
, LW_WAIT_UNTIL_FREE
);
1432 mustwait
= LWLockAttemptLock(lock
, mode
);
1437 * Wait until awakened. Like in LWLockAcquire, be prepared for
1440 LOG_LWDEBUG("LWLockAcquireOrWait", lock
, "waiting");
1443 lwstats
->block_count
++;
1446 LWLockReportWaitStart(lock
);
1447 if (TRACE_POSTGRESQL_LWLOCK_WAIT_START_ENABLED())
1448 TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock
), mode
);
1452 PGSemaphoreLock(proc
->sem
);
1453 if (proc
->lwWaiting
== LW_WS_NOT_WAITING
)
1460 /* not waiting anymore */
1461 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY
= pg_atomic_fetch_sub_u32(&lock
->nwaiters
, 1);
1463 Assert(nwaiters
< MAX_BACKENDS
);
1466 if (TRACE_POSTGRESQL_LWLOCK_WAIT_DONE_ENABLED())
1467 TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock
), mode
);
1468 LWLockReportWaitEnd();
1470 LOG_LWDEBUG("LWLockAcquireOrWait", lock
, "awakened");
1474 LOG_LWDEBUG("LWLockAcquireOrWait", lock
, "acquired, undoing queue");
1477 * Got lock in the second attempt, undo queueing. We need to treat
1478 * this as having successfully acquired the lock, otherwise we'd
1479 * not necessarily wake up people we've prevented from acquiring
1482 LWLockDequeueSelf(lock
);
1487 * Fix the process wait semaphore's count for any absorbed wakeups.
1489 while (extraWaits
-- > 0)
1490 PGSemaphoreUnlock(proc
->sem
);
1494 /* Failed to get lock, so release interrupt holdoff */
1495 RESUME_INTERRUPTS();
1496 LOG_LWDEBUG("LWLockAcquireOrWait", lock
, "failed");
1497 if (TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT_FAIL_ENABLED())
1498 TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT_FAIL(T_NAME(lock
), mode
);
1502 LOG_LWDEBUG("LWLockAcquireOrWait", lock
, "succeeded");
1503 /* Add lock to list of locks held by this backend */
1504 held_lwlocks
[num_held_lwlocks
].lock
= lock
;
1505 held_lwlocks
[num_held_lwlocks
++].mode
= mode
;
1506 if (TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT_ENABLED())
1507 TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT(T_NAME(lock
), mode
);
1514 * Does the lwlock in its current state need to wait for the variable value to
1517 * If we don't need to wait, and it's because the value of the variable has
1518 * changed, store the current value in newval.
1520 * *result is set to true if the lock was free, and false otherwise.
1523 LWLockConflictsWithVar(LWLock
*lock
, pg_atomic_uint64
*valptr
, uint64 oldval
,
1524 uint64
*newval
, bool *result
)
1530 * Test first to see if it the slot is free right now.
1532 * XXX: the unique caller of this routine, WaitXLogInsertionsToFinish()
1533 * via LWLockWaitForVar(), uses an implied barrier with a spinlock before
1534 * this, so we don't need a memory barrier here as far as the current
1535 * usage is concerned. But that might not be safe in general.
1537 mustwait
= (pg_atomic_read_u32(&lock
->state
) & LW_VAL_EXCLUSIVE
) != 0;
1548 * Reading this value atomically is safe even on platforms where uint64
1549 * cannot be read without observing a torn value.
1551 value
= pg_atomic_read_u64(valptr
);
1553 if (value
!= oldval
)
1567 * LWLockWaitForVar - Wait until lock is free, or a variable is updated.
1569 * If the lock is held and *valptr equals oldval, waits until the lock is
1570 * either freed, or the lock holder updates *valptr by calling
1571 * LWLockUpdateVar. If the lock is free on exit (immediately or after
1572 * waiting), returns true. If the lock is still held, but *valptr no longer
1573 * matches oldval, returns false and sets *newval to the current value in
1576 * Note: this function ignores shared lock holders; if the lock is held
1577 * in shared mode, returns 'true'.
1579 * Be aware that LWLockConflictsWithVar() does not include a memory barrier,
1580 * hence the caller of this function may want to rely on an explicit barrier or
1581 * an implied barrier via spinlock or LWLock to avoid memory ordering issues.
1584 LWLockWaitForVar(LWLock
*lock
, pg_atomic_uint64
*valptr
, uint64 oldval
,
1587 PGPROC
*proc
= MyProc
;
1589 bool result
= false;
1591 lwlock_stats
*lwstats
;
1593 lwstats
= get_lwlock_stats_entry(lock
);
1596 PRINT_LWDEBUG("LWLockWaitForVar", lock
, LW_WAIT_UNTIL_FREE
);
1599 * Lock out cancel/die interrupts while we sleep on the lock. There is no
1600 * cleanup mechanism to remove us from the wait queue if we got
1606 * Loop here to check the lock's status after each time we are signaled.
1612 mustwait
= LWLockConflictsWithVar(lock
, valptr
, oldval
, newval
,
1616 break; /* the lock was free or value didn't match */
1619 * Add myself to wait queue. Note that this is racy, somebody else
1620 * could wakeup before we're finished queuing. NB: We're using nearly
1621 * the same twice-in-a-row lock acquisition protocol as
1622 * LWLockAcquire(). Check its comments for details. The only
1623 * difference is that we also have to check the variable's values when
1624 * checking the state of the lock.
1626 LWLockQueueSelf(lock
, LW_WAIT_UNTIL_FREE
);
1629 * Set RELEASE_OK flag, to make sure we get woken up as soon as the
1632 pg_atomic_fetch_or_u32(&lock
->state
, LW_FLAG_RELEASE_OK
);
1635 * We're now guaranteed to be woken up if necessary. Recheck the lock
1636 * and variables state.
1638 mustwait
= LWLockConflictsWithVar(lock
, valptr
, oldval
, newval
,
1641 /* Ok, no conflict after we queued ourselves. Undo queueing. */
1644 LOG_LWDEBUG("LWLockWaitForVar", lock
, "free, undoing queue");
1646 LWLockDequeueSelf(lock
);
1651 * Wait until awakened.
1653 * It is possible that we get awakened for a reason other than being
1654 * signaled by LWLockRelease. If so, loop back and wait again. Once
1655 * we've gotten the LWLock, re-increment the sema by the number of
1656 * additional signals received.
1658 LOG_LWDEBUG("LWLockWaitForVar", lock
, "waiting");
1661 lwstats
->block_count
++;
1664 LWLockReportWaitStart(lock
);
1665 if (TRACE_POSTGRESQL_LWLOCK_WAIT_START_ENABLED())
1666 TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock
), LW_EXCLUSIVE
);
1670 PGSemaphoreLock(proc
->sem
);
1671 if (proc
->lwWaiting
== LW_WS_NOT_WAITING
)
1678 /* not waiting anymore */
1679 uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY
= pg_atomic_fetch_sub_u32(&lock
->nwaiters
, 1);
1681 Assert(nwaiters
< MAX_BACKENDS
);
1685 if (TRACE_POSTGRESQL_LWLOCK_WAIT_DONE_ENABLED())
1686 TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock
), LW_EXCLUSIVE
);
1687 LWLockReportWaitEnd();
1689 LOG_LWDEBUG("LWLockWaitForVar", lock
, "awakened");
1691 /* Now loop back and check the status of the lock again. */
1695 * Fix the process wait semaphore's count for any absorbed wakeups.
1697 while (extraWaits
-- > 0)
1698 PGSemaphoreUnlock(proc
->sem
);
1701 * Now okay to allow cancel/die interrupts.
1703 RESUME_INTERRUPTS();
1710 * LWLockUpdateVar - Update a variable and wake up waiters atomically
1712 * Sets *valptr to 'val', and wakes up all processes waiting for us with
1713 * LWLockWaitForVar(). It first sets the value atomically and then wakes up
1714 * waiting processes so that any process calling LWLockWaitForVar() on the same
1715 * lock is guaranteed to see the new value, and act accordingly.
1717 * The caller must be holding the lock in exclusive mode.
1720 LWLockUpdateVar(LWLock
*lock
, pg_atomic_uint64
*valptr
, uint64 val
)
1722 proclist_head wakeup
;
1723 proclist_mutable_iter iter
;
1725 PRINT_LWDEBUG("LWLockUpdateVar", lock
, LW_EXCLUSIVE
);
1728 * Note that pg_atomic_exchange_u64 is a full barrier, so we're guaranteed
1729 * that the variable is updated before waking up waiters.
1731 pg_atomic_exchange_u64(valptr
, val
);
1733 proclist_init(&wakeup
);
1735 LWLockWaitListLock(lock
);
1737 Assert(pg_atomic_read_u32(&lock
->state
) & LW_VAL_EXCLUSIVE
);
1740 * See if there are any LW_WAIT_UNTIL_FREE waiters that need to be woken
1741 * up. They are always in the front of the queue.
1743 proclist_foreach_modify(iter
, &lock
->waiters
, lwWaitLink
)
1745 PGPROC
*waiter
= GetPGProcByNumber(iter
.cur
);
1747 if (waiter
->lwWaitMode
!= LW_WAIT_UNTIL_FREE
)
1750 proclist_delete(&lock
->waiters
, iter
.cur
, lwWaitLink
);
1751 proclist_push_tail(&wakeup
, iter
.cur
, lwWaitLink
);
1753 /* see LWLockWakeup() */
1754 Assert(waiter
->lwWaiting
== LW_WS_WAITING
);
1755 waiter
->lwWaiting
= LW_WS_PENDING_WAKEUP
;
1758 /* We are done updating shared state of the lock itself. */
1759 LWLockWaitListUnlock(lock
);
1762 * Awaken any waiters I removed from the queue.
1764 proclist_foreach_modify(iter
, &wakeup
, lwWaitLink
)
1766 PGPROC
*waiter
= GetPGProcByNumber(iter
.cur
);
1768 proclist_delete(&wakeup
, iter
.cur
, lwWaitLink
);
1769 /* check comment in LWLockWakeup() about this barrier */
1771 waiter
->lwWaiting
= LW_WS_NOT_WAITING
;
1772 PGSemaphoreUnlock(waiter
->sem
);
1778 * LWLockRelease - release a previously acquired lock
1781 LWLockRelease(LWLock
*lock
)
1789 * Remove lock from list of locks held. Usually, but not always, it will
1790 * be the latest-acquired lock; so search array backwards.
1792 for (i
= num_held_lwlocks
; --i
>= 0;)
1793 if (lock
== held_lwlocks
[i
].lock
)
1797 elog(ERROR
, "lock %s is not held", T_NAME(lock
));
1799 mode
= held_lwlocks
[i
].mode
;
1802 for (; i
< num_held_lwlocks
; i
++)
1803 held_lwlocks
[i
] = held_lwlocks
[i
+ 1];
1805 PRINT_LWDEBUG("LWLockRelease", lock
, mode
);
1808 * Release my hold on lock, after that it can immediately be acquired by
1809 * others, even if we still have to wakeup other waiters.
1811 if (mode
== LW_EXCLUSIVE
)
1812 oldstate
= pg_atomic_sub_fetch_u32(&lock
->state
, LW_VAL_EXCLUSIVE
);
1814 oldstate
= pg_atomic_sub_fetch_u32(&lock
->state
, LW_VAL_SHARED
);
1816 /* nobody else can have that kind of lock */
1817 Assert(!(oldstate
& LW_VAL_EXCLUSIVE
));
1819 if (TRACE_POSTGRESQL_LWLOCK_RELEASE_ENABLED())
1820 TRACE_POSTGRESQL_LWLOCK_RELEASE(T_NAME(lock
));
1823 * We're still waiting for backends to get scheduled, don't wake them up
1826 if ((oldstate
& (LW_FLAG_HAS_WAITERS
| LW_FLAG_RELEASE_OK
)) ==
1827 (LW_FLAG_HAS_WAITERS
| LW_FLAG_RELEASE_OK
) &&
1828 (oldstate
& LW_LOCK_MASK
) == 0)
1829 check_waiters
= true;
1831 check_waiters
= false;
1834 * As waking up waiters requires the spinlock to be acquired, only do so
1839 /* XXX: remove before commit? */
1840 LOG_LWDEBUG("LWLockRelease", lock
, "releasing waiters");
1845 * Now okay to allow cancel/die interrupts.
1847 RESUME_INTERRUPTS();
1851 * LWLockReleaseClearVar - release a previously acquired lock, reset variable
1854 LWLockReleaseClearVar(LWLock
*lock
, pg_atomic_uint64
*valptr
, uint64 val
)
1857 * Note that pg_atomic_exchange_u64 is a full barrier, so we're guaranteed
1858 * that the variable is updated before releasing the lock.
1860 pg_atomic_exchange_u64(valptr
, val
);
1862 LWLockRelease(lock
);
1867 * LWLockReleaseAll - release all currently-held locks
1869 * Used to clean up after ereport(ERROR). An important difference between this
1870 * function and retail LWLockRelease calls is that InterruptHoldoffCount is
1871 * unchanged by this operation. This is necessary since InterruptHoldoffCount
1872 * has been set to an appropriate level earlier in error recovery. We could
1873 * decrement it below zero if we allow it to drop for each released lock!
1876 LWLockReleaseAll(void)
1878 while (num_held_lwlocks
> 0)
1880 HOLD_INTERRUPTS(); /* match the upcoming RESUME_INTERRUPTS */
1882 LWLockRelease(held_lwlocks
[num_held_lwlocks
- 1].lock
);
1888 * LWLockHeldByMe - test whether my process holds a lock in any mode
1890 * This is meant as debug support only.
1893 LWLockHeldByMe(LWLock
*lock
)
1897 for (i
= 0; i
< num_held_lwlocks
; i
++)
1899 if (held_lwlocks
[i
].lock
== lock
)
1906 * LWLockAnyHeldByMe - test whether my process holds any of an array of locks
1908 * This is meant as debug support only.
1911 LWLockAnyHeldByMe(LWLock
*lock
, int nlocks
, size_t stride
)
1913 char *held_lock_addr
;
1918 begin
= (char *) lock
;
1919 end
= begin
+ nlocks
* stride
;
1920 for (i
= 0; i
< num_held_lwlocks
; i
++)
1922 held_lock_addr
= (char *) held_lwlocks
[i
].lock
;
1923 if (held_lock_addr
>= begin
&&
1924 held_lock_addr
< end
&&
1925 (held_lock_addr
- begin
) % stride
== 0)
1932 * LWLockHeldByMeInMode - test whether my process holds a lock in given mode
1934 * This is meant as debug support only.
1937 LWLockHeldByMeInMode(LWLock
*lock
, LWLockMode mode
)
1941 for (i
= 0; i
< num_held_lwlocks
; i
++)
1943 if (held_lwlocks
[i
].lock
== lock
&& held_lwlocks
[i
].mode
== mode
)