1 #include "ceph_debug.h"
4 #include <linux/sched.h>
5 #include <linux/types.h>
6 #include <linux/vmalloc.h>
11 * We use msg pools to preallocate memory for messages we expect to
12 * receive over the wire, to avoid getting ourselves into OOM
13 * conditions at unexpected times. We take use a few different
16 * - for request/response type interactions, we preallocate the
17 * memory needed for the response when we generate the request.
19 * - for messages we can receive at any time from the MDS, we preallocate
20 * a pool of messages we can re-use.
22 * - for writeback, we preallocate some number of messages to use for
23 * requests and their replies, so that we always make forward
26 * The msgpool behaves like a mempool_t, but keeps preallocated
27 * ceph_msgs strung together on a list_head instead of using a pointer
28 * vector. This avoids vector reallocation when we adjust the number
29 * of preallocated items (which happens frequently).
34 * Allocate or release as necessary to meet our target pool size.
36 static int __fill_msgpool(struct ceph_msgpool
*pool
)
40 while (pool
->num
< pool
->min
) {
41 dout("fill_msgpool %p %d/%d allocating\n", pool
, pool
->num
,
43 spin_unlock(&pool
->lock
);
44 msg
= ceph_msg_new(0, pool
->front_len
, 0, 0, NULL
);
45 spin_lock(&pool
->lock
);
49 list_add(&msg
->list_head
, &pool
->msgs
);
52 while (pool
->num
> pool
->min
) {
53 msg
= list_first_entry(&pool
->msgs
, struct ceph_msg
, list_head
);
54 dout("fill_msgpool %p %d/%d releasing %p\n", pool
, pool
->num
,
56 list_del_init(&msg
->list_head
);
63 int ceph_msgpool_init(struct ceph_msgpool
*pool
,
64 int front_len
, int min
, bool blocking
)
68 dout("msgpool_init %p front_len %d min %d\n", pool
, front_len
, min
);
69 spin_lock_init(&pool
->lock
);
70 pool
->front_len
= front_len
;
71 INIT_LIST_HEAD(&pool
->msgs
);
74 pool
->blocking
= blocking
;
75 init_waitqueue_head(&pool
->wait
);
77 spin_lock(&pool
->lock
);
78 ret
= __fill_msgpool(pool
);
79 spin_unlock(&pool
->lock
);
83 void ceph_msgpool_destroy(struct ceph_msgpool
*pool
)
85 dout("msgpool_destroy %p\n", pool
);
86 spin_lock(&pool
->lock
);
89 spin_unlock(&pool
->lock
);
92 int ceph_msgpool_resv(struct ceph_msgpool
*pool
, int delta
)
96 spin_lock(&pool
->lock
);
97 dout("msgpool_resv %p delta %d\n", pool
, delta
);
99 ret
= __fill_msgpool(pool
);
100 spin_unlock(&pool
->lock
);
104 struct ceph_msg
*ceph_msgpool_get(struct ceph_msgpool
*pool
, int front_len
)
107 struct ceph_msg
*msg
;
109 if (front_len
&& front_len
> pool
->front_len
) {
110 pr_err("msgpool_get pool %p need front %d, pool size is %d\n",
111 pool
, front_len
, pool
->front_len
);
114 /* try to alloc a fresh message */
115 msg
= ceph_msg_new(0, front_len
, 0, 0, NULL
);
121 front_len
= pool
->front_len
;
123 if (pool
->blocking
) {
124 /* mempool_t behavior; first try to alloc */
125 msg
= ceph_msg_new(0, front_len
, 0, 0, NULL
);
131 spin_lock(&pool
->lock
);
132 if (likely(pool
->num
)) {
133 msg
= list_entry(pool
->msgs
.next
, struct ceph_msg
,
135 list_del_init(&msg
->list_head
);
137 dout("msgpool_get %p got %p, now %d/%d\n", pool
, msg
,
138 pool
->num
, pool
->min
);
139 spin_unlock(&pool
->lock
);
142 pr_err("msgpool_get %p now %d/%d, %s\n", pool
, pool
->num
,
143 pool
->min
, pool
->blocking
? "waiting" : "may fail");
144 spin_unlock(&pool
->lock
);
146 if (!pool
->blocking
) {
149 /* maybe we can allocate it now? */
150 msg
= ceph_msg_new(0, front_len
, 0, 0, NULL
);
154 pr_err("msgpool_get %p empty + alloc failed\n", pool
);
155 return ERR_PTR(-ENOMEM
);
159 prepare_to_wait(&pool
->wait
, &wait
, TASK_UNINTERRUPTIBLE
);
161 finish_wait(&pool
->wait
, &wait
);
165 void ceph_msgpool_put(struct ceph_msgpool
*pool
, struct ceph_msg
*msg
)
167 spin_lock(&pool
->lock
);
168 if (pool
->num
< pool
->min
) {
169 /* reset msg front_len; user may have changed it */
170 msg
->front
.iov_len
= pool
->front_len
;
171 msg
->hdr
.front_len
= cpu_to_le32(pool
->front_len
);
173 kref_set(&msg
->kref
, 1); /* retake a single ref */
174 list_add(&msg
->list_head
, &pool
->msgs
);
176 dout("msgpool_put %p reclaim %p, now %d/%d\n", pool
, msg
,
177 pool
->num
, pool
->min
);
178 spin_unlock(&pool
->lock
);
179 wake_up(&pool
->wait
);
181 dout("msgpool_put %p drop %p, at %d/%d\n", pool
, msg
,
182 pool
->num
, pool
->min
);
183 spin_unlock(&pool
->lock
);