cmogstored 1.8.1 - use default system stack size
[cmogstored.git] / fdmap.c
blob872cc7a89ec9c6be8405da4cd51f3302926a6ac8
1 /*
2 * Copyright (C) 2012-2020 all contributors <cmogstored-public@yhbt.net>
3 * License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
5 * File descriptor-based memory allocation. We have a fixed slot of
6 * 128 bytes for every file descriptor. Once a file descriptor is
7 * allocated by the OS, we use mog_fd_init()/mog_fd_get() to reserve
8 * userspace memory for that FD. We release that memory by calling
9 * close(2) (via mog_close() wrapper) in mog_fd_put().
11 * mog_fd_get() is a simple offset lookup based on the file
12 * descriptor, so the "allocation" is simple.
14 * This memory is never returned to the kernel, but is bounded by
15 * the file descriptor limit (RLIMIT_NOFILE ("ulimit -n")) of the
16 * process. Allowing 20000 file descriptors will only use 2.5 MB
17 * of userspace memory.
19 * Any sane OS will try to keep file descriptors low and reuse
20 * low-numbered descriptors as they become available, reducing
21 * fragmentation from unused slots. We allocate aligned memory
22 * to 128 bytes (matching slot size).
24 * 128-byte alignment and slot size are used since it:
25 * a) is enough to hold per-client data in common cases without malloc()
26 * b) easy to align with cache line sizes of modern (200x-201x) CPUs,
27 * avoiding unnecessary cache flushing
29 * This 128-byte alignment will need to be expanded to 256 bytes when
30 * 128-bit general purpose CPUs become available.
32 #include "cmogstored.h"
33 #define FD_PAD_SIZE ((size_t)128)
34 verify(sizeof(struct mog_fd) <= FD_PAD_SIZE);
35 static int max_fd;
36 static size_t fd_heaps;
37 static const size_t FD_PER_HEAP = 256;
38 static unsigned char **fd_map;
39 static pthread_mutex_t fd_lock = PTHREAD_MUTEX_INITIALIZER;
40 size_t mog_nr_active_at_quit;
42 static inline struct mog_fd *aref(size_t fd)
44 unsigned char *base = fd_map[fd / FD_PER_HEAP];
46 return (struct mog_fd *)(base + (fd % FD_PER_HEAP) * FD_PAD_SIZE);
49 /* only for pedantic correctness, only one thread is running here */
50 static void destroy_spinlocks(void)
52 int fd;
53 struct mog_fd *mfd;
55 for (fd = 0; fd < max_fd; fd++) {
56 mfd = aref(fd);
57 CHECK(int, 0, pthread_spin_destroy(&mfd->expiring));
61 static void fd_map_atexit(void)
63 destroy_spinlocks();
65 while (fd_heaps-- > 0)
66 free(fd_map[fd_heaps]);
67 free(fd_map);
70 static void fd_map_init(void)
72 long open_max = sysconf(_SC_OPEN_MAX);
73 size_t slots = open_max / FD_PER_HEAP + 1;
74 size_t size = slots * sizeof(void *);
76 assert(fd_map == NULL && "fd_map reinitialized?");
77 fd_map = mog_cachealign(size);
78 atexit(fd_map_atexit);
81 MOG_NOINLINE static struct mog_fd * grow_ref(size_t fd)
83 int fd_max;
85 assert(fd < INT_MAX && "fd too large");
86 CHECK(int, 0, pthread_mutex_lock(&fd_lock));
88 if (!fd_map) fd_map_init();
89 while (fd >= (size_t)(fd_max = mog_sync_fetch(&max_fd))) {
90 unsigned char *base = mog_cachealign(FD_PAD_SIZE * FD_PER_HEAP);
91 struct mog_fd *tmp;
92 size_t i;
93 int rc;
95 for (i = 0; i < FD_PER_HEAP; i++) {
96 tmp = (struct mog_fd *)(base + (i * FD_PAD_SIZE));
97 tmp->fd_type = MOG_FD_TYPE_UNUSED;
99 rc = pthread_spin_init(&tmp->expiring, 0);
100 if (rc != 0)
101 die_errno("pthread_spin_init() failed");
102 tmp->fd = fd_max + i;
105 fd_map[fd_heaps++] = base;
106 (void)mog_sync_add_and_fetch(&max_fd, FD_PER_HEAP);
109 CHECK(int, 0, pthread_mutex_unlock(&fd_lock));
111 return aref(fd);
115 * Look up a mog_fd structure based on fd. This means memory is reused
116 * by us just as FDs are reused by the kernel.
118 static struct mog_fd *mog_fd_get(int fd)
120 assert(fd >= 0 && "FD is negative");
121 if (MOG_LIKELY(fd < mog_sync_fetch(&max_fd)))
122 return aref((size_t)fd);
124 return grow_ref(fd);
127 static inline bool mfd_expiring_trylock(struct mog_fd *mfd)
129 int rc = pthread_spin_trylock(&mfd->expiring);
131 if (MOG_LIKELY(rc == 0))
132 return true;
133 assert(rc == EBUSY && "pthread_spin_trylock error");
134 return false;
137 static inline void mfd_expiring_lock(struct mog_fd *mfd)
139 CHECK(int, 0, pthread_spin_lock(&mfd->expiring));
142 static inline void mfd_expiring_unlock(struct mog_fd *mfd)
144 CHECK(int, 0, pthread_spin_unlock(&mfd->expiring));
148 * Releases the memory used by mfd and releases the file descriptor
149 * back to the OS. mfd is unusable after this.
151 void mog_fd_put(struct mog_fd *mfd)
153 int fd = mfd->fd;
155 assert(fd >= 0 && "FD is negative");
156 assert(fd < mog_sync_fetch(&max_fd) && "FD too small");
157 assert(aref(fd) == mfd && "tried to put incorrect mog_fd back in");
159 mfd_expiring_lock(mfd);
160 mfd->fd_type = MOG_FD_TYPE_UNUSED;
161 mfd_expiring_unlock(mfd);
162 mog_close(fd);
163 /* mog_fd_get(fd) may be called here in another thread */
166 /* called during shutdown, no other threads are running when this is called */
167 void mog_fdmap_requeue(struct mog_queue *quit_queue)
169 int fd;
170 struct mog_fd *mfd;
172 for (fd = max_fd - 1; fd >= 0; fd--) {
173 mfd = aref(fd);
174 switch (mfd->fd_type) {
175 case MOG_FD_TYPE_MGMT:
176 /* ignore fsck priority in shutdown: */
177 mfd->as.mgmt.prio = MOG_PRIO_NONE;
178 /* fall-through: */
179 case MOG_FD_TYPE_HTTP:
180 case MOG_FD_TYPE_HTTPGET:
181 mog_activeq_add(quit_queue, mfd);
182 mog_nr_active_at_quit++;
183 default:
184 break;
189 struct mog_fd * mog_fd_init(int fd, enum mog_fd_type fd_type)
191 struct mog_fd *mfd = mog_fd_get(fd);
193 assert(mfd->fd == fd && "mfd->fd incorrect");
194 mfd_expiring_lock(mfd);
195 mfd->fd_type = fd_type;
196 mfd->ioq_blocked = 0;
197 mfd_expiring_unlock(mfd);
199 return mfd;
202 #ifndef __linux__
203 /* ugh, FreeBSD implements TCP_INFO but doesn't expose the fields we need */
204 size_t mog_fdmap_expire(uint32_t sec)
206 return 0;
208 #else /* Linux TCP_INFO tracks last_data_{sent,recv} */
210 static bool tcp_timedout(struct tcp_info *info, uint32_t msec)
212 bool send_timedout = !!(info->tcpi_last_data_sent > msec);
215 * tcpi_last_data_recv is not valid unless
216 * tcpi_ato (ACK timeout) is set
218 if (info->tcpi_ato == 0)
219 return send_timedout && (info->tcpi_last_ack_recv > msec);
221 return send_timedout && (info->tcpi_last_data_recv > msec);
224 static size_t expire_http(struct mog_fd *mfd, uint32_t msec)
226 struct tcp_info info;
227 socklen_t len = (socklen_t)sizeof(struct tcp_info);
229 if (getsockopt(mfd->fd, IPPROTO_TCP, TCP_INFO, &info, &len) == 0) {
230 if (info.tcpi_state == TCP_ESTABLISHED &&
231 tcp_timedout(&info, msec)) {
232 if (shutdown(mfd->fd, SHUT_RDWR) == 0)
233 return 1;
234 if (errno != ENOTCONN)
235 syslog(LOG_WARNING,
236 "BUG? expire_http,shutdown: %m");
238 } else {
239 assert(errno != EINVAL && "BUG: getsockopt: EINVAL");
240 assert(errno != EFAULT && "BUG: getsockopt: EFAULT");
241 syslog(LOG_WARNING, "BUG? expire_http,getsockopt: %m");
244 return 0;
247 size_t mog_fdmap_expire(uint32_t sec)
249 int fd;
250 struct mog_fd *mfd;
251 size_t expired = 0;
252 uint32_t msec = sec * 1000;
253 static time_t last_expire;
254 time_t now;
255 int rc = pthread_mutex_trylock(&fd_lock);
257 if (rc != 0) {
258 assert(rc == EBUSY && "pthread_mutex_trylock failed" && rc);
260 /* sleep on the lock, another thread already doing work */
261 CHECK(int, 0, pthread_mutex_lock(&fd_lock));
262 CHECK(int, 0, pthread_mutex_unlock(&fd_lock));
263 goto out;
266 now = time(NULL);
267 if (now == last_expire)
268 goto out_unlock;
270 /* skip stdin, stdout, stderr */
271 for (fd = 3; fd < max_fd; fd++) {
272 mfd = aref(fd);
274 /* bail if another thread just locked it (for close) */
275 if (mfd_expiring_trylock(mfd)) {
276 switch (mfd->fd_type) {
277 case MOG_FD_TYPE_HTTP:
278 case MOG_FD_TYPE_HTTPGET:
279 expired += expire_http(mfd, msec);
280 default:
281 mfd_expiring_unlock(mfd);
282 break;
287 now = time(NULL);
288 if (expired > 0 || last_expire != now)
289 syslog(LOG_NOTICE, "expired %llu idle connections (>%u sec)",
290 (unsigned long long)expired, (unsigned)sec);
291 last_expire = now;
293 out_unlock:
294 CHECK(int, 0, pthread_mutex_unlock(&fd_lock));
295 out:
297 * let other threads:
298 * 1) wake up from epoll_wait()
299 * 2) attempt to read/write
300 * 3) hit error
301 * 4) close sockets.
303 for (fd = (int)expired * 8; --fd >= 0; )
304 mog_yield();
306 return expired;
308 #endif /* Linux-only */