gcc:
[official-gcc.git] / libgfortran / io / async.c
bloba07b831eb7cc123f9e9b3c7c64b23e3b42acc90a
1 /* Copyright (C) 2018 Free Software Foundation, Inc.
2 Contributed by Nicolas Koenig
4 This file is part of the GNU Fortran runtime library (libgfortran).
6 Libgfortran is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3, or (at your option)
9 any later version.
11 Libgfortran is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 Under Section 7 of GPL version 3, you are granted additional
17 permissions described in the GCC Runtime Library Exception, version
18 3.1, as published by the Free Software Foundation.
20 You should have received a copy of the GNU General Public License and
21 a copy of the GCC Runtime Library Exception along with this program;
22 see the files COPYING3 and COPYING.RUNTIME respectively. If not, see
23 <http://www.gnu.org/licenses/>. */
25 #include "libgfortran.h"
27 #define _GTHREAD_USE_COND_INIT_FUNC
28 #include "../../libgcc/gthr.h"
29 #include "io.h"
30 #include "fbuf.h"
31 #include "format.h"
32 #include "unix.h"
33 #include <string.h>
34 #include <assert.h>
36 #include <sys/types.h>
38 #include "async.h"
39 #if ASYNC_IO
41 DEBUG_LINE (__thread const char *aio_prefix = MPREFIX);
43 DEBUG_LINE (__gthread_mutex_t debug_queue_lock = __GTHREAD_MUTEX_INIT;)
44 DEBUG_LINE (aio_lock_debug *aio_debug_head = NULL;)
46 /* Current unit for asynchronous I/O. Needed for error reporting. */
48 __thread gfc_unit *thread_unit = NULL;
50 /* Queue entry for the asynchronous I/O entry. */
51 typedef struct transfer_queue
53 enum aio_do type;
54 struct transfer_queue *next;
55 struct st_parameter_dt *new_pdt;
56 transfer_args arg;
57 _Bool has_id;
58 int read_flag;
59 } transfer_queue;
61 struct error {
62 st_parameter_dt *dtp;
63 int id;
66 /* Helper function to exchange the old vs. a new PDT. */
68 static void
69 update_pdt (st_parameter_dt **old, st_parameter_dt *new) {
70 st_parameter_dt *temp;
71 NOTE ("Changing pdts, current_unit = %p", (void *) (new->u.p.current_unit));
72 temp = *old;
73 *old = new;
74 if (temp)
75 free (temp);
78 /* Destroy an adv_cond structure. */
80 static void
81 destroy_adv_cond (struct adv_cond *ac)
83 T_ERROR (__gthread_mutex_destroy, &ac->lock);
84 T_ERROR (__gthread_cond_destroy, &ac->signal);
87 /* Function invoked as start routine for a new asynchronous I/O unit.
88 Contains the main loop for accepting requests and handling them. */
90 static void *
91 async_io (void *arg)
93 DEBUG_LINE (aio_prefix = TPREFIX);
94 transfer_queue *ctq = NULL, *prev = NULL;
95 gfc_unit *u = (gfc_unit *) arg;
96 async_unit *au = u->au;
97 LOCK (&au->lock);
98 thread_unit = u;
99 au->thread = __gthread_self ();
100 while (true)
102 /* Main loop. At this point, au->lock is always held. */
103 WAIT_SIGNAL_MUTEX (&au->work, au->tail != NULL, &au->lock);
104 LOCK (&au->lock);
105 ctq = au->head;
106 prev = NULL;
107 /* Loop over the queue entries until they are finished. */
108 while (ctq)
110 if (prev)
111 free (prev);
112 prev = ctq;
113 if (!au->error.has_error)
115 UNLOCK (&au->lock);
117 switch (ctq->type)
119 case AIO_WRITE_DONE:
120 NOTE ("Finalizing write");
121 st_write_done_worker (au->pdt);
122 UNLOCK (&au->io_lock);
123 break;
125 case AIO_READ_DONE:
126 NOTE ("Finalizing read");
127 st_read_done_worker (au->pdt);
128 UNLOCK (&au->io_lock);
129 break;
131 case AIO_DATA_TRANSFER_INIT:
132 NOTE ("Data transfer init");
133 LOCK (&au->io_lock);
134 update_pdt (&au->pdt, ctq->new_pdt);
135 data_transfer_init_worker (au->pdt, ctq->read_flag);
136 break;
138 case AIO_TRANSFER_SCALAR:
139 NOTE ("Starting scalar transfer");
140 ctq->arg.scalar.transfer (au->pdt, ctq->arg.scalar.arg_bt,
141 ctq->arg.scalar.data,
142 ctq->arg.scalar.i,
143 ctq->arg.scalar.s1,
144 ctq->arg.scalar.s2);
145 break;
147 case AIO_TRANSFER_ARRAY:
148 NOTE ("Starting array transfer");
149 NOTE ("ctq->arg.array.desc = %p",
150 (void *) (ctq->arg.array.desc));
151 transfer_array_inner (au->pdt, ctq->arg.array.desc,
152 ctq->arg.array.kind,
153 ctq->arg.array.charlen);
154 free (ctq->arg.array.desc);
155 break;
157 case AIO_CLOSE:
158 NOTE ("Received AIO_CLOSE");
159 goto finish_thread;
161 default:
162 internal_error (NULL, "Invalid queue type");
163 break;
165 LOCK (&au->lock);
166 if (unlikely (au->error.has_error))
167 au->error.last_good_id = au->id.low - 1;
169 else
171 if (ctq->type == AIO_WRITE_DONE || ctq->type == AIO_READ_DONE)
173 UNLOCK (&au->io_lock);
175 else if (ctq->type == AIO_CLOSE)
177 NOTE ("Received AIO_CLOSE during error condition");
178 UNLOCK (&au->lock);
179 goto finish_thread;
183 NOTE ("Next ctq, current id: %d", au->id.low);
184 if (ctq->has_id && au->id.waiting == au->id.low++)
185 SIGNAL (&au->id.done);
187 ctq = ctq->next;
189 au->tail = NULL;
190 au->head = NULL;
191 au->empty = 1;
192 UNLOCK (&au->lock);
193 SIGNAL (&au->emptysignal);
194 LOCK (&au->lock);
196 finish_thread:
197 au->tail = NULL;
198 au->head = NULL;
199 au->empty = 1;
200 SIGNAL (&au->emptysignal);
201 free (ctq);
202 return NULL;
205 /* Free an asynchronous unit. */
207 static void
208 free_async_unit (async_unit *au)
210 if (au->tail)
211 internal_error (NULL, "Trying to free nonempty asynchronous unit");
213 destroy_adv_cond (&au->work);
214 destroy_adv_cond (&au->emptysignal);
215 destroy_adv_cond (&au->id.done);
216 T_ERROR (__gthread_mutex_destroy, &au->lock);
217 free (au);
220 /* Initialize an adv_cond structure. */
222 static void
223 init_adv_cond (struct adv_cond *ac)
225 ac->pending = 0;
226 __GTHREAD_MUTEX_INIT_FUNCTION (&ac->lock);
227 __gthread_cond_init_function (&ac->signal);
230 /* Initialize an asyncronous unit, returning zero on success,
231 nonzero on failure. It also sets u->au. */
233 void
234 init_async_unit (gfc_unit *u)
236 async_unit *au;
237 if (!__gthread_active_p ())
239 u->au = NULL;
240 return;
243 au = (async_unit *) xmalloc (sizeof (async_unit));
244 u->au = au;
245 init_adv_cond (&au->work);
246 init_adv_cond (&au->emptysignal);
247 __GTHREAD_MUTEX_INIT_FUNCTION (&au->lock);
248 __GTHREAD_MUTEX_INIT_FUNCTION (&au->io_lock);
249 LOCK (&au->lock);
250 T_ERROR (__gthread_create, &au->thread, &async_io, (void *) u);
251 au->pdt = NULL;
252 au->head = NULL;
253 au->tail = NULL;
254 au->empty = true;
255 au->id.waiting = -1;
256 au->id.low = 0;
257 au->id.high = 0;
258 au->error.fatal_error = 0;
259 au->error.has_error = 0;
260 au->error.last_good_id = 0;
261 init_adv_cond (&au->id.done);
262 UNLOCK (&au->lock);
265 /* Enqueue a transfer statement. */
267 void
268 enqueue_transfer (async_unit *au, transfer_args *arg, enum aio_do type)
270 transfer_queue *tq = calloc (sizeof (transfer_queue), 1);
271 tq->arg = *arg;
272 tq->type = type;
273 tq->has_id = 0;
274 LOCK (&au->lock);
275 if (!au->tail)
276 au->head = tq;
277 else
278 au->tail->next = tq;
279 au->tail = tq;
280 REVOKE_SIGNAL (&(au->emptysignal));
281 au->empty = false;
282 UNLOCK (&au->lock);
283 SIGNAL (&au->work);
286 /* Enqueue an st_write_done or st_read_done which contains an ID. */
289 enqueue_done_id (async_unit *au, enum aio_do type)
291 int ret;
292 transfer_queue *tq = calloc (sizeof (transfer_queue), 1);
294 tq->type = type;
295 tq->has_id = 1;
296 LOCK (&au->lock);
297 if (!au->tail)
298 au->head = tq;
299 else
300 au->tail->next = tq;
301 au->tail = tq;
302 REVOKE_SIGNAL (&(au->emptysignal));
303 au->empty = false;
304 ret = au->id.high++;
305 NOTE ("Enqueue id: %d", ret);
306 UNLOCK (&au->lock);
307 SIGNAL (&au->work);
308 return ret;
311 /* Enqueue an st_write_done or st_read_done without an ID. */
313 void
314 enqueue_done (async_unit *au, enum aio_do type)
316 transfer_queue *tq = calloc (sizeof (transfer_queue), 1);
317 tq->type = type;
318 tq->has_id = 0;
319 LOCK (&au->lock);
320 if (!au->tail)
321 au->head = tq;
322 else
323 au->tail->next = tq;
324 au->tail = tq;
325 REVOKE_SIGNAL (&(au->emptysignal));
326 au->empty = false;
327 UNLOCK (&au->lock);
328 SIGNAL (&au->work);
331 /* Enqueue a CLOSE statement. */
333 void
334 enqueue_close (async_unit *au)
336 transfer_queue *tq = calloc (sizeof (transfer_queue), 1);
338 tq->type = AIO_CLOSE;
339 LOCK (&au->lock);
340 if (!au->tail)
341 au->head = tq;
342 else
343 au->tail->next = tq;
344 au->tail = tq;
345 REVOKE_SIGNAL (&(au->emptysignal));
346 au->empty = false;
347 UNLOCK (&au->lock);
348 SIGNAL (&au->work);
351 /* The asynchronous unit keeps the currently active PDT around.
352 This function changes that to the current one. */
354 void
355 enqueue_data_transfer_init (async_unit *au, st_parameter_dt *dt, int read_flag)
357 st_parameter_dt *new = xmalloc (sizeof (st_parameter_dt));
358 transfer_queue *tq = xmalloc (sizeof (transfer_queue));
360 memcpy ((void *) new, (void *) dt, sizeof (st_parameter_dt));
362 NOTE ("dt->internal_unit_desc = %p", dt->internal_unit_desc);
363 NOTE ("common.flags & mask = %d", dt->common.flags & IOPARM_LIBRETURN_MASK);
364 tq->next = NULL;
365 tq->type = AIO_DATA_TRANSFER_INIT;
366 tq->read_flag = read_flag;
367 tq->has_id = 0;
368 tq->new_pdt = new;
369 LOCK (&au->lock);
371 if (!au->tail)
372 au->head = tq;
373 else
374 au->tail->next = tq;
375 au->tail = tq;
376 REVOKE_SIGNAL (&(au->emptysignal));
377 au->empty = 0;
378 UNLOCK (&au->lock);
379 SIGNAL (&au->work);
382 /* Collect the errors that may have happened asynchronously. Return true if
383 an error has been encountered. */
385 bool
386 collect_async_errors (st_parameter_common *cmp, async_unit *au)
388 bool has_error = au->error.has_error;
390 if (has_error)
392 if (generate_error_common (cmp, au->error.family, au->error.message))
394 au->error.has_error = 0;
395 au->error.cmp = NULL;
397 else
399 /* The program will exit later. */
400 au->error.fatal_error = true;
403 return has_error;
406 /* Perform a wait operation on an asynchronous unit with an ID specified,
407 which means collecting the errors that may have happened asynchronously.
408 Return true if an error has been encountered. */
410 bool
411 async_wait_id (st_parameter_common *cmp, async_unit *au, int i)
413 bool ret;
415 if (au == NULL)
416 return false;
418 if (cmp == NULL)
419 cmp = au->error.cmp;
421 if (au->error.has_error)
423 if (i <= au->error.last_good_id)
424 return false;
426 return collect_async_errors (cmp, au);
429 LOCK (&au->lock);
430 NOTE ("Waiting for id %d", i);
431 if (au->id.waiting < i)
432 au->id.waiting = i;
433 UNLOCK (&au->lock);
434 SIGNAL (&(au->work));
435 LOCK (&au->lock);
436 WAIT_SIGNAL_MUTEX (&(au->id.done),
437 (au->id.low >= au->id.waiting || au->empty), &au->lock);
438 LOCK (&au->lock);
439 ret = collect_async_errors (cmp, au);
440 UNLOCK (&au->lock);
441 return ret;
444 /* Perform a wait operation an an asynchronous unit without an ID. */
446 bool
447 async_wait (st_parameter_common *cmp, async_unit *au)
449 bool ret;
451 if (au == NULL)
452 return false;
454 if (cmp == NULL)
455 cmp = au->error.cmp;
457 SIGNAL (&(au->work));
458 LOCK (&(au->lock));
460 if (au->empty)
462 ret = collect_async_errors (cmp, au);
463 UNLOCK (&au->lock);
464 return ret;
467 WAIT_SIGNAL_MUTEX (&(au->emptysignal), (au->empty), &au->lock);
468 ret = collect_async_errors (cmp, au);
469 return ret;
472 /* Close an asynchronous unit. */
474 void
475 async_close (async_unit *au)
477 if (au == NULL)
478 return;
480 NOTE ("Closing async unit");
481 enqueue_close (au);
482 T_ERROR (__gthread_join, au->thread, NULL);
483 free_async_unit (au);
486 #else
488 /* Only set u->au to NULL so no async I/O will happen. */
490 void
491 init_async_unit (gfc_unit *u)
493 u->au = NULL;
494 return;
497 /* Do-nothing function, which will not be called. */
499 void
500 enqueue_transfer (async_unit *au, transfer_args *arg, enum aio_do type)
502 return;
505 /* Do-nothing function, which will not be called. */
508 enqueue_done_id (async_unit *au, enum aio_do type)
510 return 0;
513 /* Do-nothing function, which will not be called. */
515 void
516 enqueue_done (async_unit *au, enum aio_do type)
518 return;
521 /* Do-nothing function, which will not be called. */
523 void
524 enqueue_close (async_unit *au)
526 return;
529 /* Do-nothing function, which will not be called. */
531 void
532 enqueue_data_transfer_init (async_unit *au, st_parameter_dt *dt, int read_flag)
534 return;
537 /* Do-nothing function, which will not be called. */
539 bool
540 collect_async_errors (st_parameter_common *cmp, async_unit *au)
542 return false;
545 /* Do-nothing function, which will not be called. */
547 bool
548 async_wait_id (st_parameter_common *cmp, async_unit *au, int i)
550 return false;
553 /* Do-nothing function, which will not be called. */
555 bool
556 async_wait (st_parameter_common *cmp, async_unit *au)
558 return false;
561 /* Do-nothing function, which will not be called. */
563 void
564 async_close (async_unit *au)
566 return;
569 #endif