1 /* Copyright (C) 2018 Free Software Foundation, Inc.
2 Contributed by Nicolas Koenig
4 This file is part of the GNU Fortran runtime library (libgfortran).
6 Libgfortran is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3, or (at your option)
11 Libgfortran is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 Under Section 7 of GPL version 3, you are granted additional
17 permissions described in the GCC Runtime Library Exception, version
18 3.1, as published by the Free Software Foundation.
20 You should have received a copy of the GNU General Public License and
21 a copy of the GCC Runtime Library Exception along with this program;
22 see the files COPYING3 and COPYING.RUNTIME respectively. If not, see
23 <http://www.gnu.org/licenses/>. */
25 #include "libgfortran.h"
27 #define _GTHREAD_USE_COND_INIT_FUNC
28 #include "../../libgcc/gthr.h"
36 #include <sys/types.h>
41 DEBUG_LINE (__thread
const char *aio_prefix
= MPREFIX
);
43 DEBUG_LINE (__gthread_mutex_t debug_queue_lock
= __GTHREAD_MUTEX_INIT
;)
44 DEBUG_LINE (aio_lock_debug
*aio_debug_head
= NULL
;)
46 /* Current unit for asynchronous I/O. Needed for error reporting. */
48 __thread gfc_unit
*thread_unit
= NULL
;
50 /* Queue entry for the asynchronous I/O entry. */
51 typedef struct transfer_queue
54 struct transfer_queue
*next
;
55 struct st_parameter_dt
*new_pdt
;
66 /* Helper function to exchange the old vs. a new PDT. */
69 update_pdt (st_parameter_dt
**old
, st_parameter_dt
*new) {
70 st_parameter_dt
*temp
;
71 NOTE ("Changing pdts, current_unit = %p", (void *) (new->u
.p
.current_unit
));
78 /* Destroy an adv_cond structure. */
81 destroy_adv_cond (struct adv_cond
*ac
)
83 T_ERROR (__gthread_mutex_destroy
, &ac
->lock
);
84 T_ERROR (__gthread_cond_destroy
, &ac
->signal
);
87 /* Function invoked as start routine for a new asynchronous I/O unit.
88 Contains the main loop for accepting requests and handling them. */
93 DEBUG_LINE (aio_prefix
= TPREFIX
);
94 transfer_queue
*ctq
= NULL
, *prev
= NULL
;
95 gfc_unit
*u
= (gfc_unit
*) arg
;
96 async_unit
*au
= u
->au
;
99 au
->thread
= __gthread_self ();
102 /* Main loop. At this point, au->lock is always held. */
103 WAIT_SIGNAL_MUTEX (&au
->work
, au
->tail
!= NULL
, &au
->lock
);
107 /* Loop over the queue entries until they are finished. */
113 if (!au
->error
.has_error
)
120 NOTE ("Finalizing write");
121 st_write_done_worker (au
->pdt
);
122 UNLOCK (&au
->io_lock
);
126 NOTE ("Finalizing read");
127 st_read_done_worker (au
->pdt
);
128 UNLOCK (&au
->io_lock
);
131 case AIO_DATA_TRANSFER_INIT
:
132 NOTE ("Data transfer init");
134 update_pdt (&au
->pdt
, ctq
->new_pdt
);
135 data_transfer_init_worker (au
->pdt
, ctq
->read_flag
);
138 case AIO_TRANSFER_SCALAR
:
139 NOTE ("Starting scalar transfer");
140 ctq
->arg
.scalar
.transfer (au
->pdt
, ctq
->arg
.scalar
.arg_bt
,
141 ctq
->arg
.scalar
.data
,
147 case AIO_TRANSFER_ARRAY
:
148 NOTE ("Starting array transfer");
149 NOTE ("ctq->arg.array.desc = %p",
150 (void *) (ctq
->arg
.array
.desc
));
151 transfer_array_inner (au
->pdt
, ctq
->arg
.array
.desc
,
153 ctq
->arg
.array
.charlen
);
154 free (ctq
->arg
.array
.desc
);
158 NOTE ("Received AIO_CLOSE");
162 internal_error (NULL
, "Invalid queue type");
166 if (unlikely (au
->error
.has_error
))
167 au
->error
.last_good_id
= au
->id
.low
- 1;
171 if (ctq
->type
== AIO_WRITE_DONE
|| ctq
->type
== AIO_READ_DONE
)
173 UNLOCK (&au
->io_lock
);
175 else if (ctq
->type
== AIO_CLOSE
)
177 NOTE ("Received AIO_CLOSE during error condition");
183 NOTE ("Next ctq, current id: %d", au
->id
.low
);
184 if (ctq
->has_id
&& au
->id
.waiting
== au
->id
.low
++)
185 SIGNAL (&au
->id
.done
);
193 SIGNAL (&au
->emptysignal
);
200 SIGNAL (&au
->emptysignal
);
205 /* Free an asynchronous unit. */
208 free_async_unit (async_unit
*au
)
211 internal_error (NULL
, "Trying to free nonempty asynchronous unit");
213 destroy_adv_cond (&au
->work
);
214 destroy_adv_cond (&au
->emptysignal
);
215 destroy_adv_cond (&au
->id
.done
);
216 T_ERROR (__gthread_mutex_destroy
, &au
->lock
);
220 /* Initialize an adv_cond structure. */
223 init_adv_cond (struct adv_cond
*ac
)
226 __GTHREAD_MUTEX_INIT_FUNCTION (&ac
->lock
);
227 __gthread_cond_init_function (&ac
->signal
);
230 /* Initialize an asyncronous unit, returning zero on success,
231 nonzero on failure. It also sets u->au. */
234 init_async_unit (gfc_unit
*u
)
237 if (!__gthread_active_p ())
243 au
= (async_unit
*) xmalloc (sizeof (async_unit
));
245 init_adv_cond (&au
->work
);
246 init_adv_cond (&au
->emptysignal
);
247 __GTHREAD_MUTEX_INIT_FUNCTION (&au
->lock
);
248 __GTHREAD_MUTEX_INIT_FUNCTION (&au
->io_lock
);
250 T_ERROR (__gthread_create
, &au
->thread
, &async_io
, (void *) u
);
258 au
->error
.fatal_error
= 0;
259 au
->error
.has_error
= 0;
260 au
->error
.last_good_id
= 0;
261 init_adv_cond (&au
->id
.done
);
265 /* Enqueue a transfer statement. */
268 enqueue_transfer (async_unit
*au
, transfer_args
*arg
, enum aio_do type
)
270 transfer_queue
*tq
= calloc (sizeof (transfer_queue
), 1);
280 REVOKE_SIGNAL (&(au
->emptysignal
));
286 /* Enqueue an st_write_done or st_read_done which contains an ID. */
289 enqueue_done_id (async_unit
*au
, enum aio_do type
)
292 transfer_queue
*tq
= calloc (sizeof (transfer_queue
), 1);
302 REVOKE_SIGNAL (&(au
->emptysignal
));
305 NOTE ("Enqueue id: %d", ret
);
311 /* Enqueue an st_write_done or st_read_done without an ID. */
314 enqueue_done (async_unit
*au
, enum aio_do type
)
316 transfer_queue
*tq
= calloc (sizeof (transfer_queue
), 1);
325 REVOKE_SIGNAL (&(au
->emptysignal
));
331 /* Enqueue a CLOSE statement. */
334 enqueue_close (async_unit
*au
)
336 transfer_queue
*tq
= calloc (sizeof (transfer_queue
), 1);
338 tq
->type
= AIO_CLOSE
;
345 REVOKE_SIGNAL (&(au
->emptysignal
));
351 /* The asynchronous unit keeps the currently active PDT around.
352 This function changes that to the current one. */
355 enqueue_data_transfer_init (async_unit
*au
, st_parameter_dt
*dt
, int read_flag
)
357 st_parameter_dt
*new = xmalloc (sizeof (st_parameter_dt
));
358 transfer_queue
*tq
= xmalloc (sizeof (transfer_queue
));
360 memcpy ((void *) new, (void *) dt
, sizeof (st_parameter_dt
));
362 NOTE ("dt->internal_unit_desc = %p", dt
->internal_unit_desc
);
363 NOTE ("common.flags & mask = %d", dt
->common
.flags
& IOPARM_LIBRETURN_MASK
);
365 tq
->type
= AIO_DATA_TRANSFER_INIT
;
366 tq
->read_flag
= read_flag
;
376 REVOKE_SIGNAL (&(au
->emptysignal
));
382 /* Collect the errors that may have happened asynchronously. Return true if
383 an error has been encountered. */
386 collect_async_errors (st_parameter_common
*cmp
, async_unit
*au
)
388 bool has_error
= au
->error
.has_error
;
392 if (generate_error_common (cmp
, au
->error
.family
, au
->error
.message
))
394 au
->error
.has_error
= 0;
395 au
->error
.cmp
= NULL
;
399 /* The program will exit later. */
400 au
->error
.fatal_error
= true;
406 /* Perform a wait operation on an asynchronous unit with an ID specified,
407 which means collecting the errors that may have happened asynchronously.
408 Return true if an error has been encountered. */
411 async_wait_id (st_parameter_common
*cmp
, async_unit
*au
, int i
)
421 if (au
->error
.has_error
)
423 if (i
<= au
->error
.last_good_id
)
426 return collect_async_errors (cmp
, au
);
430 NOTE ("Waiting for id %d", i
);
431 if (au
->id
.waiting
< i
)
434 SIGNAL (&(au
->work
));
436 WAIT_SIGNAL_MUTEX (&(au
->id
.done
),
437 (au
->id
.low
>= au
->id
.waiting
|| au
->empty
), &au
->lock
);
439 ret
= collect_async_errors (cmp
, au
);
444 /* Perform a wait operation an an asynchronous unit without an ID. */
447 async_wait (st_parameter_common
*cmp
, async_unit
*au
)
457 SIGNAL (&(au
->work
));
462 ret
= collect_async_errors (cmp
, au
);
467 WAIT_SIGNAL_MUTEX (&(au
->emptysignal
), (au
->empty
), &au
->lock
);
468 ret
= collect_async_errors (cmp
, au
);
472 /* Close an asynchronous unit. */
475 async_close (async_unit
*au
)
480 NOTE ("Closing async unit");
482 T_ERROR (__gthread_join
, au
->thread
, NULL
);
483 free_async_unit (au
);
488 /* Only set u->au to NULL so no async I/O will happen. */
491 init_async_unit (gfc_unit
*u
)
497 /* Do-nothing function, which will not be called. */
500 enqueue_transfer (async_unit
*au
, transfer_args
*arg
, enum aio_do type
)
505 /* Do-nothing function, which will not be called. */
508 enqueue_done_id (async_unit
*au
, enum aio_do type
)
513 /* Do-nothing function, which will not be called. */
516 enqueue_done (async_unit
*au
, enum aio_do type
)
521 /* Do-nothing function, which will not be called. */
524 enqueue_close (async_unit
*au
)
529 /* Do-nothing function, which will not be called. */
532 enqueue_data_transfer_init (async_unit
*au
, st_parameter_dt
*dt
, int read_flag
)
537 /* Do-nothing function, which will not be called. */
540 collect_async_errors (st_parameter_common
*cmp
, async_unit
*au
)
545 /* Do-nothing function, which will not be called. */
548 async_wait_id (st_parameter_common
*cmp
, async_unit
*au
, int i
)
553 /* Do-nothing function, which will not be called. */
556 async_wait (st_parameter_common
*cmp
, async_unit
*au
)
561 /* Do-nothing function, which will not be called. */
564 async_close (async_unit
*au
)