2 This source code file is part of thread_mpi.
3 Written by Sander Pronk, Erik Lindahl, and possibly others.
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in the
14 documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16 names of its contributors may be used to endorse or promote products
17 derived from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
39 /* this file is #included from collective.c; it's not really a header file,
40 but this defines a lot of functions that probably need to be inlined.*/
44 int tMPI_Gather(void* sendbuf
, int sendcount
, tMPI_Datatype sendtype
,
45 void* recvbuf
, int recvcount
, tMPI_Datatype recvtype
,
46 int root
, tMPI_Comm comm
)
52 struct tmpi_thread
*cur
=tMPI_Get_current();
55 tMPI_Profile_count_start(cur
);
58 tMPI_Trace_print("tMPI_Gather(%p, %d, %p, %p, %d, %p, %d, %p)",
59 sendbuf
, sendcount
, sendtype
,
60 recvbuf
, recvcount
, recvtype
, root
, comm
);
65 return tMPI_Error(TMPI_COMM_WORLD
, TMPI_ERR_COMM
);
67 myrank
=tMPI_Comm_seek_rank(comm
, cur
);
69 /* we increase our counter, and determine which coll_env we get */
70 cev
=tMPI_Get_cev(comm
, myrank
, &synct
);
75 int n_remaining
=comm
->grp
.N
-1;
76 /* do root transfer */
77 if (sendbuf
!=TMPI_IN_PLACE
)
79 tMPI_Coll_root_xfer(comm
, sendtype
, recvtype
,
80 sendtype
->size
*sendcount
,
81 recvtype
->size
*recvcount
,
83 (char*)recvbuf
+myrank
*recvcount
*recvtype
->size
,
86 for(i
=0;i
<comm
->grp
.N
;i
++)
87 cev
->met
[myrank
].read_data
[i
]=FALSE
;
88 cev
->met
[myrank
].read_data
[myrank
]=TRUE
;
90 /* wait for data availability as long as there are xfers to be done */
93 #if defined(TMPI_PROFILE) && defined(TMPI_CYCLE_COUNT)
94 tMPI_Profile_wait_start(cur
);
96 tMPI_Event_wait( &(cev
->met
[myrank
]).recv_ev
) ;
97 #if defined(TMPI_PROFILE) && defined(TMPI_CYCLE_COUNT)
98 tMPI_Profile_wait_stop(cur
, TMPIWAIT_Coll_recv
);
100 /* now check all of them */
101 for(i
=0;i
<comm
->grp
.N
;i
++)
103 if ( !cev
->met
[myrank
].read_data
[i
] &&
104 (tMPI_Atomic_get(&(cev
->met
[i
].current_sync
))== synct
))
106 tMPI_Mult_recv(comm
, cev
, i
, 0, TMPI_GATHER_TAG
, recvtype
,
107 recvcount
*recvtype
->size
,
108 (char*)recvbuf
+i
*recvcount
*recvtype
->size
,
110 tMPI_Event_process( &(cev
->met
[myrank
]).recv_ev
, 1) ;
111 if (ret
!=TMPI_SUCCESS
)
113 cev
->met
[myrank
].read_data
[i
]=TRUE
;
121 if (!sendbuf
) /* don't do pointer arithmetic on a NULL ptr */
123 return tMPI_Error(comm
, TMPI_ERR_BUF
);
126 /* first set up the data just to root. */
127 tMPI_Post_multi(cev
, myrank
, 0, TMPI_GATHER_TAG
, sendtype
,
128 sendcount
*sendtype
->size
, sendbuf
, 1, synct
, root
);
129 /* and wait until root is done copying */
130 tMPI_Wait_for_others(cev
, myrank
);
133 tMPI_Profile_count_stop(cur
,TMPIFN_Gather
);
143 int tMPI_Gatherv(void* sendbuf
, int sendcount
, tMPI_Datatype sendtype
,
144 void* recvbuf
, int *recvcounts
, int *displs
,
145 tMPI_Datatype recvtype
, int root
, tMPI_Comm comm
)
148 struct coll_env
*cev
;
150 int ret
=TMPI_SUCCESS
;
151 struct tmpi_thread
*cur
=tMPI_Get_current();
154 tMPI_Profile_count_start(cur
);
157 tMPI_Trace_print("tMPI_Gatherv(%p, %d, %p, %p, %p, %p, %p, %d, %p)",
158 sendbuf
, sendcount
, sendtype
, recvbuf
,
159 recvcounts
, displs
, recvtype
, root
, comm
);
164 return tMPI_Error(TMPI_COMM_WORLD
, TMPI_ERR_COMM
);
166 myrank
=tMPI_Comm_seek_rank(comm
, cur
);
168 /* we increase our counter, and determine which coll_env we get */
169 cev
=tMPI_Get_cev(comm
, myrank
, &synct
);
174 int n_remaining
=comm
->grp
.N
-1;
175 /* do root transfer */
176 if (sendbuf
!=TMPI_IN_PLACE
)
178 tMPI_Coll_root_xfer(comm
, sendtype
, recvtype
,
179 sendtype
->size
*sendcount
,
180 recvtype
->size
*recvcounts
[myrank
],
182 (char*)recvbuf
+displs
[myrank
]*recvtype
->size
,
185 for(i
=0;i
<comm
->grp
.N
;i
++)
186 cev
->met
[myrank
].read_data
[i
]=FALSE
;
187 cev
->met
[myrank
].read_data
[myrank
]=TRUE
;
189 /* wait for data availability as long as there are xfers to be done */
192 #if defined(TMPI_PROFILE) && defined(TMPI_CYCLE_COUNT)
193 tMPI_Profile_wait_start(cur
);
195 tMPI_Event_wait( &(cev
->met
[myrank
]).recv_ev
) ;
196 #if defined(TMPI_PROFILE) && defined(TMPI_CYCLE_COUNT)
197 tMPI_Profile_wait_stop(cur
, TMPIWAIT_Coll_recv
);
199 for(i
=0;i
<comm
->grp
.N
;i
++)
201 if ( !cev
->met
[myrank
].read_data
[i
] &&
202 (tMPI_Atomic_get(&(cev
->met
[i
].current_sync
))==synct
) )
204 tMPI_Event_process( &(cev
->met
[myrank
]).recv_ev
, 1) ;
205 tMPI_Mult_recv(comm
, cev
, i
, 0, TMPI_GATHERV_TAG
, recvtype
,
206 recvcounts
[i
]*recvtype
->size
,
207 (char*)recvbuf
+displs
[i
]*recvtype
->size
,
209 if (ret
!=TMPI_SUCCESS
)
211 cev
->met
[myrank
].read_data
[i
]=TRUE
;
219 if (!sendbuf
) /* don't do pointer arithmetic on a NULL ptr */
221 return tMPI_Error(comm
, TMPI_ERR_BUF
);
224 /* first set up the data just to root. */
225 tMPI_Post_multi(cev
, myrank
, 0, TMPI_GATHERV_TAG
, sendtype
,
226 sendcount
*sendtype
->size
, sendbuf
, 1, synct
, root
);
227 /* and wait until root is done copying */
228 tMPI_Wait_for_others(cev
, myrank
);
232 tMPI_Profile_count_stop(cur
, TMPIFN_Gatherv
);