3 * Copyright (C) Volker Lendecke 2012
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 3 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
22 #include "../pthreadpool/pthreadpool.h"
24 struct asys_pwrite_args
{
31 struct asys_pread_args
{
38 struct asys_fsync_args
{
43 struct asys_pwrite_args pwrite_args
;
44 struct asys_pread_args pread_args
;
45 struct asys_fsync_args fsync_args
;
50 union asys_job_args args
;
58 struct pthreadpool
*pool
;
62 struct asys_job
**jobs
;
65 struct asys_creds_context
{
69 int asys_context_init(struct asys_context
**pctx
, unsigned max_parallel
)
71 struct asys_context
*ctx
;
74 ctx
= calloc(1, sizeof(struct asys_context
));
78 ret
= pthreadpool_init(max_parallel
, &ctx
->pool
);
83 ctx
->pthreadpool_fd
= pthreadpool_signal_fd(ctx
->pool
);
89 int asys_signalfd(struct asys_context
*ctx
)
91 return ctx
->pthreadpool_fd
;
94 int asys_context_destroy(struct asys_context
*ctx
)
99 for (i
=0; i
<ctx
->num_jobs
; i
++) {
100 if (ctx
->jobs
[i
]->busy
) {
105 ret
= pthreadpool_destroy(ctx
->pool
);
109 for (i
=0; i
<ctx
->num_jobs
; i
++) {
117 static int asys_new_job(struct asys_context
*ctx
, int *jobid
,
118 struct asys_job
**pjob
)
120 struct asys_job
**tmp
;
121 struct asys_job
*job
;
124 for (i
=0; i
<ctx
->num_jobs
; i
++) {
134 if (ctx
->num_jobs
+1 == 0) {
135 return EBUSY
; /* overflow */
138 tmp
= realloc(ctx
->jobs
, sizeof(struct asys_job
*)*(ctx
->num_jobs
+1));
144 job
= calloc(1, sizeof(struct asys_job
));
148 ctx
->jobs
[ctx
->num_jobs
] = job
;
150 *jobid
= ctx
->num_jobs
;
156 static void asys_pwrite_do(void *private_data
);
158 int asys_pwrite(struct asys_context
*ctx
, int fildes
, const void *buf
,
159 size_t nbyte
, off_t offset
, void *private_data
)
161 struct asys_job
*job
;
162 struct asys_pwrite_args
*args
;
166 ret
= asys_new_job(ctx
, &jobid
, &job
);
170 job
->private_data
= private_data
;
172 args
= &job
->args
.pwrite_args
;
173 args
->fildes
= fildes
;
176 args
->offset
= offset
;
178 ret
= pthreadpool_add_job(ctx
->pool
, jobid
, asys_pwrite_do
, job
);
187 static void asys_pwrite_do(void *private_data
)
189 struct asys_job
*job
= (struct asys_job
*)private_data
;
190 struct asys_pwrite_args
*args
= &job
->args
.pwrite_args
;
192 job
->ret
= pwrite(args
->fildes
, args
->buf
, args
->nbyte
, args
->offset
);
193 if (job
->ret
== -1) {
198 static void asys_pread_do(void *private_data
);
200 int asys_pread(struct asys_context
*ctx
, int fildes
, void *buf
,
201 size_t nbyte
, off_t offset
, void *private_data
)
203 struct asys_job
*job
;
204 struct asys_pread_args
*args
;
208 ret
= asys_new_job(ctx
, &jobid
, &job
);
212 job
->private_data
= private_data
;
214 args
= &job
->args
.pread_args
;
215 args
->fildes
= fildes
;
218 args
->offset
= offset
;
220 ret
= pthreadpool_add_job(ctx
->pool
, jobid
, asys_pread_do
, job
);
229 static void asys_pread_do(void *private_data
)
231 struct asys_job
*job
= (struct asys_job
*)private_data
;
232 struct asys_pread_args
*args
= &job
->args
.pread_args
;
234 job
->ret
= pread(args
->fildes
, args
->buf
, args
->nbyte
, args
->offset
);
235 if (job
->ret
== -1) {
240 static void asys_fsync_do(void *private_data
);
242 int asys_fsync(struct asys_context
*ctx
, int fildes
, void *private_data
)
244 struct asys_job
*job
;
245 struct asys_fsync_args
*args
;
249 ret
= asys_new_job(ctx
, &jobid
, &job
);
253 job
->private_data
= private_data
;
255 args
= &job
->args
.fsync_args
;
256 args
->fildes
= fildes
;
258 ret
= pthreadpool_add_job(ctx
->pool
, jobid
, asys_fsync_do
, job
);
267 static void asys_fsync_do(void *private_data
)
269 struct asys_job
*job
= (struct asys_job
*)private_data
;
270 struct asys_fsync_args
*args
= &job
->args
.fsync_args
;
272 job
->ret
= fsync(args
->fildes
);
273 if (job
->ret
== -1) {
278 void asys_cancel(struct asys_context
*ctx
, void *private_data
)
282 for (i
=0; i
<ctx
->num_jobs
; i
++) {
283 struct asys_job
*job
= ctx
->jobs
[i
];
285 if (job
->private_data
== private_data
) {
291 int asys_results(struct asys_context
*ctx
, struct asys_result
*results
,
292 unsigned num_results
)
294 int jobids
[num_results
];
297 ret
= pthreadpool_finished_jobs(ctx
->pool
, jobids
, num_results
);
302 for (i
=0; i
<ret
; i
++) {
303 struct asys_result
*result
= &results
[i
];
304 struct asys_job
*job
;
309 if ((jobid
< 0) || (jobid
>= ctx
->num_jobs
)) {
313 job
= ctx
->jobs
[jobid
];
317 result
->err
= ECANCELED
;
319 result
->ret
= job
->ret
;
320 result
->err
= job
->err
;
322 result
->private_data
= job
->private_data
;