2 * Copyright (C) 2012-2020 all contributors <cmogstored-public@yhbt.net>
3 * License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
5 #include "cmogstored.h"
6 #include "compat_memstream.h"
9 * maps multiple "devXXX" directories to the device.
10 * This is uncommon in real world deployments (multiple mogdevs sharing
11 * the same system device), but happens frequently in testing
15 Hash_table
*by_mogdevid
;
18 static size_t devlist_hash(const void *x
, size_t tablesize
)
20 const struct mog_devlist
*devlist
= x
;
22 return devlist
->st_dev
% tablesize
;
25 static bool devlist_cmp(const void *a
, const void *b
)
27 const struct mog_devlist
*devlist_a
= a
;
28 const struct mog_devlist
*devlist_b
= b
;
30 return devlist_a
->st_dev
== devlist_b
->st_dev
;
33 static void devlist_free(void *x
)
35 struct mog_devlist
*devlist
= x
;
37 hash_free(devlist
->by_mogdevid
);
41 static struct mog_devlist
* mog_devlist_new(dev_t st_dev
)
43 struct mog_devlist
*devlist
= xmalloc(sizeof(struct mog_devlist
));
45 devlist
->st_dev
= st_dev
;
46 devlist
->by_mogdevid
= hash_initialize(7, NULL
,
47 mog_dev_hash
, mog_dev_cmp
,
50 * elements are freed when
51 * svc->by_mog_devid is freed
55 mog_oom_if_null(devlist
->by_mogdevid
);
60 /* ensures svc has a devlist, this must be called with devstats_lock held */
61 static struct mog_devlist
* svc_devlist(struct mog_svc
*svc
, dev_t st_dev
)
63 struct mog_devlist
*devlist
;
64 struct mog_devlist finder
;
66 assert(svc
->by_st_dev
&& "by_st_dev unintialized in svc");
68 finder
.st_dev
= st_dev
;
69 devlist
= hash_lookup(svc
->by_st_dev
, &finder
);
71 if (devlist
== NULL
) {
72 devlist
= mog_devlist_new(st_dev
);
73 switch (hash_insert_if_absent(svc
->by_st_dev
, devlist
, NULL
)) {
75 assert(0 && "race condition, devlist should insert "
78 case 1: break; /* OK, inserted */
79 default: mog_oom(); /* -1 */
85 static void svc_init_dev_hash(struct mog_svc
*svc
)
88 hash_clear(svc
->by_st_dev
);
92 svc
->by_st_dev
= hash_initialize(7, NULL
, devlist_hash
,
93 devlist_cmp
, devlist_free
);
94 mog_oom_if_null(svc
->by_st_dev
);
97 static int svc_scandev(struct mog_svc
*svc
, unsigned *nr
, mog_scandev_cb cb
)
102 CHECK(int, 0, pthread_mutex_lock(&svc
->devstats_lock
));
103 svc_init_dev_hash(svc
);
105 while ((ent
= readdir(svc
->dir
))) {
106 unsigned long mog_devid
;
108 size_t len
= strlen(ent
->d_name
);
110 struct mog_devlist
*devlist
;
113 if (len
<= 3) continue;
114 if (memcmp("dev", ent
->d_name
, 3) != 0) continue;
116 mog_devid
= strtoul(ent
->d_name
+ 3, &end
, 10);
117 if (*end
!= 0) continue;
118 if (mog_devid
> MOG_DEVID_MAX
) continue;
120 dev
= mog_dev_for(svc
, (uint32_t)mog_devid
, true);
123 devlist
= svc_devlist(svc
, dev
->st_dev
);
124 devhash
= devlist
->by_mogdevid
;
127 rc
|= cb(dev
, svc
); /* mog_dev_mkusage */
129 switch (hash_insert_if_absent(devhash
, dev
, NULL
)) {
131 /* do not free dev, it is in svc->by_mog_devid */
133 "%s/%s seen twice in readdir (BUG?)",
134 svc
->docroot
, ent
->d_name
);
139 default: mog_oom(); /* -1 */
142 CHECK(int, 0, pthread_mutex_unlock(&svc
->devstats_lock
));
147 static bool write_dev_stats(void *entry
, void *filep
)
149 struct mog_dev
*dev
= entry
;
151 char util
[MOG_IOUTIL_LEN
];
153 mog_iou_read(dev
->st_dev
, util
);
155 if (fprintf(*fp
, "%u\t%s\n", dev
->devid
, util
) > 0)
158 /* stop iteration in case we get EIO/ENOSPC on systems w/o memstream */
159 my_memstream_errclose(*fp
);
164 static bool write_devlist_stats(void *entry
, void *filep
)
166 struct mog_devlist
*devlist
= entry
;
169 hash_do_for_each(devlist
->by_mogdevid
, write_dev_stats
, filep
);
172 /* *filep becomes NULL on errors */
176 /* updates per-svc device stats from the global mount list */
177 static ssize_t
devstats_stringify(struct mog_svc
*svc
, char **dst
)
182 assert(svc
->by_st_dev
&& "need to scan devices first");
184 /* open_memstream() may fail on EIO/EMFILE/ENFILE on fake memstream */
185 fp
= open_memstream(dst
, &bytes
);
190 * write_devlist_stats->write_dev_stats may fclose and NULL fp
193 hash_do_for_each(svc
->by_st_dev
, write_devlist_stats
, &fp
);
197 if (fprintf(fp
, ".\n") == 2) {
198 CHECK(int, 0, my_memstream_close(fp
, dst
, &bytes
));
202 my_memstream_errclose(fp
);
206 void mog_svc_devstats_subscribe(struct mog_mgmt
*mgmt
)
208 struct mog_svc
*svc
= mgmt
->svc
;
210 CHECK(int, 0, pthread_mutex_lock(&svc
->devstats_lock
));
211 LIST_INSERT_HEAD(&svc
->devstats_subscribers
, mgmt
, subscribed
);
212 CHECK(int, 0, pthread_mutex_unlock(&svc
->devstats_lock
));
215 /* called while iterating through all mog_svc objects */
216 bool mog_svc_devstats_broadcast(void *ent
, void *ignored
)
218 struct mog_svc
*svc
= ent
;
219 struct mog_mgmt
*mgmt
, *tmp
;
225 CHECK(int, 0, pthread_mutex_lock(&svc
->devstats_lock
));
227 len
= devstats_stringify(svc
, &buf
);
231 LIST_FOREACH_SAFE(mgmt
, &svc
->devstats_subscribers
, subscribed
, tmp
) {
232 assert(mgmt
->wbuf
== NULL
&& "wbuf not null");
234 iov
.iov_len
= (size_t)len
;
235 mog_mgmt_writev(mgmt
, &iov
, 1);
237 if (mgmt
->wbuf
== NULL
) continue; /* success */
239 LIST_REMOVE(mgmt
, subscribed
);
240 mfd
= mog_fd_of(mgmt
);
241 if (mgmt
->wbuf
== MOG_WR_ERROR
) {
242 assert(mgmt
->rbuf
== NULL
&& "would leak rbuf");
244 } else { /* blocked on write */
245 mog_idleq_push(mgmt
->svc
->queue
, mfd
, MOG_QEV_WR
);
249 CHECK(int, 0, pthread_mutex_unlock(&svc
->devstats_lock
));
256 static bool devstats_shutdown_i(void *svcptr
, void *ignored
)
258 struct mog_svc
*svc
= svcptr
;
259 struct mog_mgmt
*mgmt
, *tmp
;
262 CHECK(int, 0, pthread_mutex_lock(&svc
->devstats_lock
));
263 LIST_FOREACH_SAFE(mgmt
, &svc
->devstats_subscribers
, subscribed
, tmp
) {
264 assert(mgmt
->wbuf
== NULL
&& "wbuf not null");
265 assert(mgmt
->rbuf
== NULL
&& "would leak rbuf");
266 LIST_REMOVE(mgmt
, subscribed
);
267 mfd
= mog_fd_of(mgmt
);
270 CHECK(int, 0, pthread_mutex_unlock(&svc
->devstats_lock
));
275 void mog_svc_dev_shutdown(void)
277 mog_svc_each(devstats_shutdown_i
, NULL
);
280 static bool svc_mkusage_each(void *svcptr
, void *ignored
)
282 struct mog_svc
*svc
= svcptr
;
285 svc_scandev(svc
, &ndev
, mog_dev_mkusage
);
287 if (svc
->queue
&& (svc
->nmogdev
!= ndev
))
288 mog_svc_thrpool_rescale(svc
, ndev
);
294 void mog_mkusage_all(void)
296 mog_svc_each(svc_mkusage_each
, NULL
);
299 /* we should never set ioq_max == 0 */
300 static void svc_rescale_warn_fix_capa(struct mog_svc
*svc
, unsigned ndev_new
)
302 if (svc
->thr_per_dev
!= 0)
306 "serving %s with fewer aio_threads(%u) than devices(%u)",
307 svc
->docroot
, svc
->user_set_aio_threads
, ndev_new
);
309 "set \"server aio_threads = %u\" or higher via sidechannel",
312 svc
->thr_per_dev
= 1;
315 static void mog_svc_dev_rescale_all(struct mog_svc
*svc
)
317 /* iterate through each device of this svc */
318 CHECK(int, 0, pthread_mutex_lock(&svc
->by_mog_devid_lock
));
319 hash_do_for_each(svc
->by_mog_devid
, mog_dev_user_rescale_i
, svc
);
320 CHECK(int, 0, pthread_mutex_unlock(&svc
->by_mog_devid_lock
));
323 void mog_svc_dev_requeue_prepare(struct mog_svc
*svc
)
325 /* iterate through each device of this svc */
326 CHECK(int, 0, pthread_mutex_lock(&svc
->by_mog_devid_lock
));
327 hash_do_for_each(svc
->by_mog_devid
, mog_dev_requeue_prepare
, svc
);
328 CHECK(int, 0, pthread_mutex_unlock(&svc
->by_mog_devid_lock
));
331 /* rescaling only happens in the main thread */
332 void mog_svc_dev_user_rescale(struct mog_svc
*svc
, size_t ndev_new
)
334 assert(svc
->user_set_aio_threads
&&
335 "user did not set aio_threads via sidechannel");
337 svc
->thr_per_dev
= svc
->user_set_aio_threads
/ ndev_new
;
339 svc_rescale_warn_fix_capa(svc
, ndev_new
);
340 mog_svc_dev_rescale_all(svc
);
344 * this forces ioqs to detect contention and yield,
345 * leading to quicker shutdown
347 void mog_svc_dev_quit_prepare(struct mog_svc
*svc
)
349 svc
->thr_per_dev
= 1;
350 mog_svc_dev_rescale_all(svc
);