Fixed thread_mpi distribution issues.
[gromacs.git] / src / gmxlib / thread_mpi / gather.h
blob99576271ee0da7516f2e998b75e3c02912514612
1 /*
2 This source code file is part of thread_mpi.
3 Written by Sander Pronk, Erik Lindahl, and possibly others.
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
6 All rights reserved.
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in the
14 documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16 names of its contributors may be used to endorse or promote products
17 derived from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
35 files.
39 /* this file is #included from collective.c; it's not really a header file,
40 but this defines a lot of functions that probably need to be inlined.*/
44 int tMPI_Gather(void* sendbuf, int sendcount, tMPI_Datatype sendtype,
45 void* recvbuf, int recvcount, tMPI_Datatype recvtype,
46 int root, tMPI_Comm comm)
48 int synct;
49 struct coll_env *cev;
50 int myrank;
51 int ret=TMPI_SUCCESS;
52 struct tmpi_thread *cur=tMPI_Get_current();
54 #ifdef TMPI_PROFILE
55 tMPI_Profile_count_start(cur);
56 #endif
57 #ifdef TMPI_TRACE
58 tMPI_Trace_print("tMPI_Gather(%p, %d, %p, %p, %d, %p, %d, %p)",
59 sendbuf, sendcount, sendtype,
60 recvbuf, recvcount, recvtype, root, comm);
61 #endif
63 if (!comm)
65 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_COMM);
67 myrank=tMPI_Comm_seek_rank(comm, cur);
69 /* we increase our counter, and determine which coll_env we get */
70 cev=tMPI_Get_cev(comm, myrank, &synct);
72 if (myrank==root)
74 int i;
75 int n_remaining=comm->grp.N-1;
76 /* do root transfer */
77 if (sendbuf!=TMPI_IN_PLACE)
79 tMPI_Coll_root_xfer(comm, sendtype, recvtype,
80 sendtype->size*sendcount,
81 recvtype->size*recvcount,
82 sendbuf,
83 (char*)recvbuf+myrank*recvcount*recvtype->size,
84 &ret);
86 for(i=0;i<comm->grp.N;i++)
87 cev->met[myrank].read_data[i]=FALSE;
88 cev->met[myrank].read_data[myrank]=TRUE;
90 /* wait for data availability as long as there are xfers to be done */
91 while(n_remaining>0)
93 #if defined(TMPI_PROFILE) && defined(TMPI_CYCLE_COUNT)
94 tMPI_Profile_wait_start(cur);
95 #endif
96 tMPI_Event_wait( &(cev->met[myrank]).recv_ev ) ;
97 #if defined(TMPI_PROFILE) && defined(TMPI_CYCLE_COUNT)
98 tMPI_Profile_wait_stop(cur, TMPIWAIT_Coll_recv);
99 #endif
100 /* now check all of them */
101 for(i=0;i<comm->grp.N;i++)
103 if ( !cev->met[myrank].read_data[i] &&
104 (tMPI_Atomic_get(&(cev->met[i].current_sync))== synct))
106 tMPI_Mult_recv(comm, cev, i, 0, TMPI_GATHER_TAG, recvtype,
107 recvcount*recvtype->size,
108 (char*)recvbuf+i*recvcount*recvtype->size,
109 &ret);
110 tMPI_Event_process( &(cev->met[myrank]).recv_ev, 1) ;
111 if (ret!=TMPI_SUCCESS)
112 return ret;
113 cev->met[myrank].read_data[i]=TRUE;
114 n_remaining--;
119 else
121 if (!sendbuf) /* don't do pointer arithmetic on a NULL ptr */
123 return tMPI_Error(comm, TMPI_ERR_BUF);
126 /* first set up the data just to root. */
127 tMPI_Post_multi(cev, myrank, 0, TMPI_GATHER_TAG, sendtype,
128 sendcount*sendtype->size, sendbuf, 1, synct, root);
129 /* and wait until root is done copying */
130 tMPI_Wait_for_others(cev, myrank);
132 #ifdef TMPI_PROFILE
133 tMPI_Profile_count_stop(cur,TMPIFN_Gather);
134 #endif
135 return ret;
143 int tMPI_Gatherv(void* sendbuf, int sendcount, tMPI_Datatype sendtype,
144 void* recvbuf, int *recvcounts, int *displs,
145 tMPI_Datatype recvtype, int root, tMPI_Comm comm)
147 int synct;
148 struct coll_env *cev;
149 int myrank;
150 int ret=TMPI_SUCCESS;
151 struct tmpi_thread *cur=tMPI_Get_current();
153 #ifdef TMPI_PROFILE
154 tMPI_Profile_count_start(cur);
155 #endif
156 #ifdef TMPI_TRACE
157 tMPI_Trace_print("tMPI_Gatherv(%p, %d, %p, %p, %p, %p, %p, %d, %p)",
158 sendbuf, sendcount, sendtype, recvbuf,
159 recvcounts, displs, recvtype, root, comm);
160 #endif
162 if (!comm)
164 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_COMM);
166 myrank=tMPI_Comm_seek_rank(comm, cur);
168 /* we increase our counter, and determine which coll_env we get */
169 cev=tMPI_Get_cev(comm, myrank, &synct);
171 if (myrank==root)
173 int i;
174 int n_remaining=comm->grp.N-1;
175 /* do root transfer */
176 if (sendbuf!=TMPI_IN_PLACE)
178 tMPI_Coll_root_xfer(comm, sendtype, recvtype,
179 sendtype->size*sendcount,
180 recvtype->size*recvcounts[myrank],
181 sendbuf,
182 (char*)recvbuf+displs[myrank]*recvtype->size,
183 &ret);
185 for(i=0;i<comm->grp.N;i++)
186 cev->met[myrank].read_data[i]=FALSE;
187 cev->met[myrank].read_data[myrank]=TRUE;
189 /* wait for data availability as long as there are xfers to be done */
190 while(n_remaining>0)
192 #if defined(TMPI_PROFILE) && defined(TMPI_CYCLE_COUNT)
193 tMPI_Profile_wait_start(cur);
194 #endif
195 tMPI_Event_wait( &(cev->met[myrank]).recv_ev ) ;
196 #if defined(TMPI_PROFILE) && defined(TMPI_CYCLE_COUNT)
197 tMPI_Profile_wait_stop(cur, TMPIWAIT_Coll_recv);
198 #endif
199 for(i=0;i<comm->grp.N;i++)
201 if ( !cev->met[myrank].read_data[i] &&
202 (tMPI_Atomic_get(&(cev->met[i].current_sync))==synct) )
204 tMPI_Event_process( &(cev->met[myrank]).recv_ev, 1) ;
205 tMPI_Mult_recv(comm, cev, i, 0, TMPI_GATHERV_TAG, recvtype,
206 recvcounts[i]*recvtype->size,
207 (char*)recvbuf+displs[i]*recvtype->size,
208 &ret);
209 if (ret!=TMPI_SUCCESS)
210 return ret;
211 cev->met[myrank].read_data[i]=TRUE;
212 n_remaining--;
217 else
219 if (!sendbuf) /* don't do pointer arithmetic on a NULL ptr */
221 return tMPI_Error(comm, TMPI_ERR_BUF);
224 /* first set up the data just to root. */
225 tMPI_Post_multi(cev, myrank, 0, TMPI_GATHERV_TAG, sendtype,
226 sendcount*sendtype->size, sendbuf, 1, synct, root);
227 /* and wait until root is done copying */
228 tMPI_Wait_for_others(cev, myrank);
231 #ifdef TMPI_PROFILE
232 tMPI_Profile_count_stop(cur, TMPIFN_Gatherv);
233 #endif
234 return ret;