2 This source code file is part of thread_mpi.
3 Written by Sander Pronk, Erik Lindahl, and possibly others.
5 Copyright (c) 2009,2018, Sander Pronk, Erik Lindahl.
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in the
14 documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16 names of its contributors may be used to endorse or promote products
17 derived from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
39 #ifdef HAVE_TMPI_CONFIG_H
40 #include "tmpi_config.h"
55 #if !(defined( _WIN32 ) || defined( _WIN64 ) )
73 /* there are a few global variables that maintain information about the
74 running threads. Some are defined by the MPI standard: */
75 /* TMPI_COMM_WORLD is in tmpi_malloc.c due to technical reasons */
76 tMPI_Group TMPI_GROUP_EMPTY
= NULL
;
79 /* the threads themselves (tmpi_comm only contains lists of pointers to this
81 struct tmpi_thread
*threads
= NULL
;
85 tMPI_Thread_key_t id_key
; /* the key to get the thread id */
89 /* whether MPI has finalized (we need this to distinguish pre-inited from
90 post-finalized states */
91 static tmpi_bool tmpi_finalized
= FALSE
;
93 /* misc. global information about MPI */
94 struct tmpi_global
*tmpi_global
= NULL
;
101 /* start N threads with argc, argv (used by tMPI_Init)*/
103 int tMPI_Start_threads(tmpi_bool main_returns
, int N
,
104 tMPI_Affinity_strategy aff_strategy
,
105 int *argc
, char ***argv
,
106 void (*start_fn
)(const void*), const void *start_arg
,
107 int (*start_fn_main
)(int, char**));
109 /* starter function for threads; takes a void pointer to a
110 struct tmpi_starter_, which calls main() if tmpi_start_.fn == NULL */
111 static void* tMPI_Thread_starter(void *arg
);
113 /* allocate and initialize the data associated with a thread structure */
114 static int tMPI_Thread_init(struct tmpi_thread
*th
);
115 /* deallocate the data associated with a thread structure */
116 static void tMPI_Thread_destroy(struct tmpi_thread
*th
);
122 void tMPI_Trace_print(const char *fmt
, ...)
125 struct tmpi_thread
* th
= NULL
;
126 static tMPI_Thread_mutex_t mtx
= TMPI_THREAD_MUTEX_INITIALIZER
;
128 /* don't check for errors during trace */
129 tMPI_Thread_mutex_lock(&mtx
);
132 th
= tMPI_Get_current();
133 printf("THREAD %02d: ", (int)(th
-threads
));
137 printf("THREAD main: ");
144 tMPI_Thread_mutex_unlock(&mtx
);
149 tmpi_bool
tMPI_Is_master(void)
151 /* if there are no other threads, we're the main thread */
152 if ( (!TMPI_COMM_WORLD
) || TMPI_COMM_WORLD
->grp
.N
== 0)
157 /* otherwise we know this through thread specific data: */
158 /* whether the thread pointer points to the head of the threads array */
159 return (tmpi_bool
)(tMPI_Get_current() == threads
);
162 tMPI_Comm
tMPI_Get_comm_self(void)
164 struct tmpi_thread
* th
= tMPI_Get_current();
165 return th
->self_comm
;
169 int tMPI_Get_N(int *argc
, char ***argv
, const char *optname
, int *nthreads
)
172 int ret
= TMPI_SUCCESS
;
181 for (i
= 1; i
< *argc
; i
++)
183 if (strcmp(optname
, (*argv
)[i
]) == 0)
191 /* the number of processes is an argument */
193 *nthreads
= strtol((*argv
)[i
+1], &end
, 10);
194 if (!end
|| (*end
!= 0) )
202 int nth
= tMPI_Thread_get_hw_number();
206 nth
= 1; /* make sure it's at least 1 */
214 static int tMPI_Thread_init(struct tmpi_thread
*th
)
217 int N_envelopes
= (Nthreads
+1)*N_EV_ALLOC
;
218 int N_send_envelopes
= N_EV_ALLOC
;
219 int N_reqs
= (Nthreads
+1)*N_EV_ALLOC
;
222 /* we set our thread id, as a thread-specific piece of global data. */
223 ret
= tMPI_Thread_setspecific(id_key
, th
);
229 /* allocate comm.self */
230 ret
= tMPI_Comm_alloc( &(th
->self_comm
), TMPI_COMM_WORLD
, 1);
231 if (ret
!= TMPI_SUCCESS
)
235 th
->self_comm
->grp
.peers
[0] = th
;
237 /* allocate envelopes */
238 ret
= tMPI_Free_env_list_init( &(th
->envelopes
), N_envelopes
);
239 if (ret
!= TMPI_SUCCESS
)
244 ret
= tMPI_Recv_env_list_init( &(th
->evr
));
245 if (ret
!= TMPI_SUCCESS
)
250 th
->evs
= (struct send_envelope_list
*)tMPI_Malloc(
251 sizeof(struct send_envelope_list
)*Nthreads
);
254 return TMPI_ERR_NO_MEM
;
256 for (i
= 0; i
< Nthreads
; i
++)
258 ret
= tMPI_Send_env_list_init( &(th
->evs
[i
]), N_send_envelopes
);
259 if (ret
!= TMPI_SUCCESS
)
265 tMPI_Atomic_set( &(th
->ev_outgoing_received
), 0);
267 tMPI_Event_init( &(th
->p2p_event
) );
269 /* allocate requests */
270 ret
= tMPI_Req_list_init(&(th
->rql
), N_reqs
);
271 if (ret
!= TMPI_SUCCESS
)
277 #ifdef USE_COLLECTIVE_COPY_BUFFER
278 /* allcate copy_buffer list */
279 ret
= tMPI_Copy_buffer_list_init(&(th
->cbl_multi
),
280 (Nthreads
+1)*(N_COLL_ENV
+1),
281 Nthreads
*COPY_BUFFER_SIZE
);
282 if (ret
!= TMPI_SUCCESS
)
289 ret
= tMPI_Profile_init(&(th
->profile
));
290 if (ret
!= TMPI_SUCCESS
)
295 /* now wait for all other threads to come on line, before we
296 start the MPI program */
297 ret
= tMPI_Thread_barrier_wait( &(tmpi_global
->barrier
) );
306 static void tMPI_Thread_destroy(struct tmpi_thread
*th
)
310 tMPI_Recv_env_list_destroy( &(th
->evr
));
311 for (i
= 0; i
< Nthreads
; i
++)
313 tMPI_Send_env_list_destroy( &(th
->evs
[i
]));
316 tMPI_Free_env_list_destroy( &(th
->envelopes
) );
317 tMPI_Event_destroy( &(th
->p2p_event
) );
318 tMPI_Req_list_destroy( &(th
->rql
) );
320 #ifdef USE_COLLECTIVE_COPY_BUFFER
321 tMPI_Copy_buffer_list_destroy(&(th
->cbl_multi
));
324 for (i
= 0; i
< th
->argc
; i
++)
330 static int tMPI_Global_init(struct tmpi_global
*g
, int Nthreads
)
336 g
->Nalloc_usertypes
= 0;
337 ret
= tMPI_Thread_mutex_init(&(g
->timer_mutex
));
340 return tMPI_Error(TMPI_COMM_WORLD
, TMPI_ERR_IO
);
342 tMPI_Spinlock_init(&(g
->datatype_lock
));
344 ret
= tMPI_Thread_barrier_init( &(g
->barrier
), Nthreads
);
347 return tMPI_Error(TMPI_COMM_WORLD
, TMPI_ERR_IO
);
350 ret
= tMPI_Thread_mutex_init(&(g
->comm_link_lock
));
353 return tMPI_Error(TMPI_COMM_WORLD
, TMPI_ERR_IO
);
357 #if !(defined( _WIN32 ) || defined( _WIN64 ) )
358 /* the time at initialization. */
359 gettimeofday( &(g
->timer_init
), NULL
);
361 /* the time at initialization. */
362 g
->timer_init
= GetTickCount();
367 static void tMPI_Global_destroy(struct tmpi_global
*g
)
369 tMPI_Thread_barrier_destroy(&(g
->barrier
));
370 tMPI_Thread_mutex_destroy(&(g
->timer_mutex
));
371 tMPI_Thread_mutex_destroy(&(g
->comm_link_lock
));
377 static void* tMPI_Thread_starter(void *arg
)
380 struct tmpi_thread
*th
= (struct tmpi_thread
*)arg
;
383 tMPI_Trace_print("Created thread nr. %d", (int)(th
-threads
));
386 ret
= tMPI_Thread_init(th
);
387 if (ret
!= TMPI_SUCCESS
)
392 /* start_fn, start_arg, argc and argv were set by the calling function */
395 th
->start_fn_main(th
->argc
, th
->argv
);
399 th
->start_fn(th
->start_arg
);
410 int tMPI_Start_threads(tmpi_bool main_returns
, int N
,
411 tMPI_Affinity_strategy aff_strategy
,
412 int *argc
, char ***argv
,
413 void (*start_fn
)(const void*), const void *start_arg
,
414 int (*start_fn_main
)(int, char**))
418 tMPI_Trace_print("tMPI_Start_threads(%d, %d, %d, %d, %d, %p, %p, %p, %p)",
419 main_returns
, N
, aff_strategy
, argc
, argv
, start_fn
,
425 int set_affinity
= FALSE
;
427 tmpi_finalized
= FALSE
;
430 /* allocate global data */
431 tmpi_global
= (struct tmpi_global
*)
432 tMPI_Malloc(sizeof(struct tmpi_global
));
433 if (tmpi_global
== 0)
435 return TMPI_ERR_NO_MEM
;
437 ret
= tMPI_Global_init(tmpi_global
, N
);
438 if (ret
!= TMPI_SUCCESS
)
443 /* allocate world and thread data */
444 threads
= (struct tmpi_thread
*)
445 tMPI_Malloc(sizeof(struct tmpi_thread
)*N
);
448 return TMPI_ERR_NO_MEM
;
450 ret
= tMPI_Comm_alloc(&TMPI_COMM_WORLD
, NULL
, N
);
451 if (ret
!= TMPI_SUCCESS
)
455 assert(TMPI_COMM_WORLD
!= nullptr);
456 TMPI_GROUP_EMPTY
= tMPI_Group_alloc();
458 if (tMPI_Thread_key_create(&id_key
, NULL
))
460 return tMPI_Error(TMPI_COMM_WORLD
, TMPI_ERR_INIT
);
462 for (i
= 0; i
< N
; i
++)
464 TMPI_COMM_WORLD
->grp
.peers
[i
] = &(threads
[i
]);
466 /* copy argc, argv */
470 threads
[i
].argc
= *argc
;
471 threads
[i
].argv
= (char**)tMPI_Malloc(threads
[i
].argc
*
473 for (j
= 0; j
< threads
[i
].argc
; j
++)
475 #if !(defined( _WIN32 ) || defined( _WIN64 ) )
476 threads
[i
].argv
[j
] = strdup( (*argv
)[j
] );
478 threads
[i
].argv
[j
] = _strdup( (*argv
)[j
] );
485 threads
[i
].argv
= NULL
;
487 threads
[i
].start_fn
= start_fn
;
488 threads
[i
].start_fn_main
= start_fn_main
;
489 threads
[i
].start_arg
= start_arg
;
492 /* now check whether to set affinity */
493 if (aff_strategy
== TMPI_AFFINITY_ALL_CORES
)
495 int nhw
= tMPI_Thread_get_hw_number();
496 if ((nhw
> 1) && (nhw
== N
))
502 /* set thread 0's properties */
503 threads
[0].thread_id
= tMPI_Thread_self();
506 /* set the main thread's affinity */
507 tMPI_Thread_setaffinity_single(threads
[0].thread_id
, 0);
510 for (i
= 1; i
< N
; i
++) /* zero is the main thread */
512 ret
= tMPI_Thread_create(&(threads
[i
].thread_id
),
514 (void*)&(threads
[i
]) );
518 tMPI_Thread_setaffinity_single(threads
[i
].thread_id
, i
);
520 if (ret
!= TMPI_SUCCESS
)
522 return tMPI_Error(TMPI_COMM_WORLD
, TMPI_ERR_INIT
);
525 /* the main thread also runs start_fn if we don't want
529 tMPI_Thread_starter((void*)&(threads
[0]));
534 ret
= tMPI_Thread_init(&(threads
[0]));
545 int tMPI_Init(int *argc
, char ***argv
,
546 int (*start_function
)(int, char**))
550 tMPI_Trace_print("tMPI_Init(%p, %p, %p)", argc
, argv
, start_function
);
553 if (TMPI_COMM_WORLD
== 0) /* we're the main process */
556 tMPI_Get_N(argc
, argv
, "-nt", &N
);
557 ret
= tMPI_Start_threads(TRUE
, N
, TMPI_AFFINITY_ALL_CORES
, argc
, argv
,
558 NULL
, NULL
, start_function
) != 0;
566 /* if we're a sub-thread we need don't need to do anyhing, because
567 everything has already been set up by either the main thread,
568 or the thread runner function.*/
576 int tMPI_Init_fn(int main_thread_returns
, int N
,
577 tMPI_Affinity_strategy aff_strategy
,
578 void (*start_function
)(const void*), const void *arg
)
582 tMPI_Trace_print("tMPI_Init_fn(%d, %p, %p)", N
, start_function
, arg
);
587 N
= tMPI_Thread_get_hw_number();
590 N
= 1; /*because that's what the fn returns if it doesn't know*/
594 if (TMPI_COMM_WORLD
== 0 && N
>= 1) /* we're the main process */
596 ret
= tMPI_Start_threads(main_thread_returns
, N
, aff_strategy
,
597 0, 0, start_function
, arg
, NULL
);
606 int tMPI_Initialized(int *flag
)
609 tMPI_Trace_print("tMPI_Initialized(%p)", flag
);
612 *flag
= (TMPI_COMM_WORLD
&& !tmpi_finalized
);
617 int tMPI_Finalize(void)
622 tMPI_Trace_print("tMPI_Finalize()");
625 printf("%5d: tMPI_Finalize called\n", tMPI_This_threadnr());
631 struct tmpi_thread
*cur
= tMPI_Get_current();
633 tMPI_Profile_stop( &(cur
->profile
) );
634 ret
= tMPI_Thread_barrier_wait( &(tmpi_global
->barrier
) );
637 return tMPI_Error(TMPI_COMM_WORLD
, TMPI_ERR_IO
);
640 if (tMPI_Is_master())
642 tMPI_Profiles_summarize(Nthreads
, threads
);
646 ret
= tMPI_Thread_barrier_wait( &(tmpi_global
->barrier
) );
649 return tMPI_Error(TMPI_COMM_WORLD
, TMPI_ERR_IO
);
654 if (tMPI_Is_master())
657 /* we just wait for all threads to finish; the order isn't very
658 relevant, as all threads should arrive at their endpoints soon. */
659 for (i
= 1; i
< Nthreads
; i
++)
661 if (tMPI_Thread_join(threads
[i
].thread_id
, NULL
))
663 return tMPI_Error(TMPI_COMM_WORLD
, TMPI_ERR_FINALIZE
);
665 tMPI_Thread_destroy(&(threads
[i
]));
667 /* at this point, we are the only thread left, so we can
668 destroy the global structures with impunity. */
669 tMPI_Thread_destroy(&(threads
[0]));
672 tMPI_Thread_key_delete(id_key
);
673 /* de-allocate all the comm stuctures. */
677 ret
= tMPI_Thread_mutex_lock(&(tmpi_global
->comm_link_lock
));
680 return tMPI_Error(TMPI_COMM_WORLD
, TMPI_ERR_IO
);
682 cur
= TMPI_COMM_WORLD
->next
;
683 while (cur
&& (cur
!= TMPI_COMM_WORLD
) )
685 tMPI_Comm next
= cur
->next
;
686 ret
= tMPI_Comm_destroy(cur
, FALSE
);
689 tMPI_Thread_mutex_unlock(&(tmpi_global
->comm_link_lock
));
694 ret
= tMPI_Comm_destroy(TMPI_COMM_WORLD
, FALSE
);
697 tMPI_Thread_mutex_unlock(&(tmpi_global
->comm_link_lock
));
700 ret
= tMPI_Thread_mutex_unlock(&(tmpi_global
->comm_link_lock
));
703 return tMPI_Error(TMPI_COMM_WORLD
, TMPI_ERR_IO
);
708 tMPI_Group_free(&TMPI_GROUP_EMPTY
);
710 TMPI_COMM_WORLD
= NULL
;
711 TMPI_GROUP_EMPTY
= NULL
;
714 /* deallocate the 'global' structure */
715 tMPI_Global_destroy(tmpi_global
);
718 tmpi_finalized
= TRUE
;
728 int tMPI_Finalized(int *flag
)
731 tMPI_Trace_print("tMPI_Finalized(%p)", flag
);
733 *flag
= tmpi_finalized
;
740 int tMPI_Abort(tMPI_Comm comm
, int errorcode
)
743 tMPI_Trace_print("tMPI_Abort(%p, %d)", comm
, errorcode
);
746 /* we abort(). This way we can run a debugger on it */
747 fprintf(stderr
, "tMPI_Abort called with error code %d", errorcode
);
748 if (comm
== TMPI_COMM_WORLD
)
750 fprintf(stderr
, " on TMPI_COMM_WORLD");
752 fprintf(stderr
, "\n");
757 /* we just kill all threads, but not the main process */
759 if (tMPI_Is_master())
761 if (comm
== TMPI_COMM_WORLD
)
764 "tMPI_Abort called on TMPI_COMM_WORLD main with errorcode=%d\n",
770 "tMPI_Abort called on main thread with errorcode=%d\n",
780 fprintf(stderr
, "tMPI_Abort called with error code %d on thread %d\n",
781 errorcode
, tMPI_This_threadnr());
783 ret
= (int*)malloc(sizeof(int));
784 tMPI_Thread_exit(ret
);
791 int tMPI_Get_processor_name(char *name
, int *resultlen
)
793 int nr
= tMPI_Threadnr(tMPI_Get_current());
794 unsigned int digits
= 0;
795 const unsigned int base
= 10;
798 tMPI_Trace_print("tMPI_Get_processor_name(%p, %p)", name
, resultlen
);
800 /* we don't want to call sprintf here (it turns out to be not entirely
801 thread-safe on Mac OS X, for example), so we do it our own way: */
803 /* first determine number of digits */
817 strcpy(name
, "thread #");
819 strncpy_s(name
, TMPI_MAX_PROCESSOR_NAME
, "thread #", TMPI_MAX_PROCESSOR_NAME
);
821 /* now construct the number */
823 size_t len
= strlen(name
);
827 for (i
= 0; i
< digits
; i
++)
829 size_t pos
= len
+ (digits
-i
-1);
830 if (pos
< (TMPI_MAX_PROCESSOR_NAME
-1) )
832 name
[ pos
] = (char)('0' + rest
%base
);
836 if ( (digits
+len
) < TMPI_MAX_PROCESSOR_NAME
)
838 name
[digits
+ len
] = '\0';
842 name
[TMPI_MAX_PROCESSOR_NAME
] = '\0';
848 *resultlen
= (int)strlen(name
); /* For some reason the MPI standard
849 uses ints instead of size_ts for
859 /* TODO: there must be better ways to do this */
860 double tMPI_Wtime(void)
865 tMPI_Trace_print("tMPI_Wtime()");
868 #if !(defined( _WIN32 ) || defined( _WIN64 ) )
874 gettimeofday(&tv
, NULL
);
875 secdiff
= tv
.tv_sec
- tmpi_global
->timer_init
.tv_sec
;
876 usecdiff
= tv
.tv_usec
- tmpi_global
->timer_init
.tv_usec
;
878 ret
= (double)secdiff
+ 1e-6*usecdiff
;
882 DWORD tv
= GetTickCount();
884 /* the windows absolute time GetTickCount() wraps around in ~49 days,
885 so it's safer to always use differences, and assume that our
886 program doesn't run that long.. */
887 ret
= 1e-3*((unsigned int)(tv
- tmpi_global
->timer_init
));
893 double tMPI_Wtick(void)
895 #if !(defined( _WIN32 ) || defined( _WIN64 ) )
896 /* In Unix, we don't really know. Any modern OS should be at least
897 this precise, though */
900 /* According to the Windows documentation, this is about right: */
905 int tMPI_Get_count(tMPI_Status
*status
, tMPI_Datatype datatype
, int *count
)
908 tMPI_Trace_print("tMPI_Get_count(%p, %p, %p)", status
, datatype
, count
);
912 return tMPI_Error(TMPI_COMM_WORLD
, TMPI_ERR_STATUS
);
914 *count
= (int)(status
->transferred
/datatype
->size
);