Fixed thread_mpi distribution issues.
[gromacs.git] / src / gmxlib / thread_mpi / scatter.h
blob4cdbac1c2a6a38a8d2eb7fa8abf5cbca07c915d4
1 /*
2 This source code file is part of thread_mpi.
3 Written by Sander Pronk, Erik Lindahl, and possibly others.
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
6 All rights reserved.
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in the
14 documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16 names of its contributors may be used to endorse or promote products
17 derived from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
35 files.
38 /* this file is #included from collective.c; it's not really a header file,
39 but this defines a lot of functions that probably need to be inlined.*/
42 int tMPI_Scatter(void* sendbuf, int sendcount, tMPI_Datatype sendtype,
43 void* recvbuf, int recvcount, tMPI_Datatype recvtype,
44 int root, tMPI_Comm comm)
46 int synct;
47 struct coll_env *cev;
48 int myrank;
49 int ret=TMPI_SUCCESS;
50 struct tmpi_thread *cur=tMPI_Get_current();
52 #ifdef TMPI_PROFILE
53 tMPI_Profile_count_start(cur);
54 #endif
55 #ifdef TMPI_TRACE
56 tMPI_Trace_print("tMPI_Scatter(%p, %d, %p, %p, %d, %p, %d, %p)",
57 sendbuf, sendcount, sendtype,
58 recvbuf, recvcount, recvtype, root, comm);
59 #endif
60 if (!comm)
62 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_COMM);
64 myrank=tMPI_Comm_seek_rank(comm, cur);
66 /* we increase our counter, and determine which coll_env we get */
67 cev=tMPI_Get_cev(comm, myrank, &synct);
69 if (myrank==root)
71 int i;
72 size_t sendsize=sendtype->size*sendcount;
73 size_t total_send_size=0;
74 #ifdef USE_COLLECTIVE_COPY_BUFFER
75 bool using_cb;
76 #endif
78 if (!sendbuf) /* don't do pointer arithmetic on a NULL ptr */
80 return tMPI_Error(comm, TMPI_ERR_BUF);
83 /* we set up multiple posts, so no Post_multi */
84 cev->met[myrank].tag=TMPI_SCATTER_TAG;
85 cev->met[myrank].datatype=sendtype;
86 tMPI_Atomic_set( &(cev->met[myrank].n_remaining), cev->N-1 );
87 for(i=0;i<comm->grp.N;i++)
89 total_send_size += sendtype->size*sendcount;
90 cev->met[myrank].bufsize[i]=sendsize;
91 cev->met[myrank].buf[i]=(char*)sendbuf+sendsize*i;
93 #ifdef USE_COLLECTIVE_COPY_BUFFER
94 /* we must copy our own data too, unfortunately (otherwise there's
95 a hole) */
96 using_cb=(total_send_size < (size_t)((cev->N)*COPY_BUFFER_SIZE));
97 cev->met[myrank].using_cb=using_cb;
98 if (using_cb)
100 /* we set cpbuf stuff to NULL initially */
101 for(i=0;i<cev->N;i++)
103 /*cev->met[myrank].cpbuf[i]=NULL;*/
104 tMPI_Atomic_ptr_set(&(cev->met[myrank].cpbuf[i]), NULL);
106 tMPI_Atomic_set(&(cev->met[myrank].buf_readcount), 0);
108 #endif
110 /* post availability */
111 for(i=0;i<cev->N;i++)
113 if (i != myrank)
114 tMPI_Event_signal( &(cev->met[i].recv_ev) );
120 #ifdef USE_COLLECTIVE_COPY_BUFFER
121 /* do the copy_buffer thing */
122 if (using_cb)
124 /* copy the buffer locally. First allocate */
125 cev->met[myrank].cb=tMPI_Copy_buffer_list_get(
126 &(tMPI_Get_thread(comm,myrank)->cbl_multi));
127 if (cev->met[myrank].cb->size < total_send_size)
129 fprintf(stderr, "ERROR: cb size too small\n");
130 exit(1);
132 /* copy to the new buf */
133 memcpy(cev->met[myrank].cb->buf, sendbuf, total_send_size);
135 /* post the new buf */
136 for(i=0;i<cev->N;i++)
138 tMPI_Atomic_memory_barrier();
139 tMPI_Atomic_ptr_set(&(cev->met[myrank].cpbuf[i]),
140 (char*)cev->met[myrank].cb->buf+sendsize*i);
141 /*cev->met[myrank].cpbuf[i] = (char*)cev->met[myrank].cb->buf +
142 sendsize*i ;*/
145 #endif
147 /* do root transfer */
148 if (recvbuf!=TMPI_IN_PLACE)
150 tMPI_Coll_root_xfer(comm, sendtype, recvtype,
151 sendsize, recvtype->size*recvcount,
152 (char*)sendbuf+sendsize*myrank,
153 recvbuf, &ret);
156 /* and wait until everybody is done copying */
157 tMPI_Wait_for_others(cev, myrank);
159 else
161 /* get the root cev */
162 size_t bufsize=recvcount*recvtype->size;
163 /* wait until root becomes available */
164 tMPI_Wait_for_data(cur, cev, myrank);
165 tMPI_Mult_recv(comm, cev, root, myrank,TMPI_SCATTER_TAG, recvtype,
166 bufsize, recvbuf, &ret);
168 #ifdef TMPI_PROFILE
169 tMPI_Profile_count_stop(cur, TMPIFN_Scatter);
170 #endif
171 return ret;
176 int tMPI_Scatterv(void* sendbuf, int *sendcounts, int *displs,
177 tMPI_Datatype sendtype, void* recvbuf, int recvcount,
178 tMPI_Datatype recvtype, int root, tMPI_Comm comm)
180 int synct;
181 struct coll_env *cev;
182 int myrank;
183 int ret=TMPI_SUCCESS;
184 struct tmpi_thread *cur=tMPI_Get_current();
186 #ifdef TMPI_TRACE
187 tMPI_Trace_print("tMPI_Scatterv(%p, %p, %p, %p, %p, %d, %p, %d, %p)",
188 sendbuf, sendcounts, displs, sendtype, recvbuf,
189 recvcount, recvtype, root, comm);
190 #endif
191 #ifdef TMPI_PROFILE
192 tMPI_Profile_count_start(cur);
193 #endif
196 if (!comm)
198 return tMPI_Error(TMPI_COMM_WORLD, TMPI_ERR_COMM);
200 myrank=tMPI_Comm_seek_rank(comm, cur);
202 /* we increase our counter, and determine which coll_env we get */
203 cev=tMPI_Get_cev(comm, myrank, &synct);
205 if (myrank==root)
207 int i;
208 size_t total_send_size=0;
209 #ifdef USE_COLLECTIVE_COPY_BUFFER
210 bool using_cb;
211 #endif
213 if (!sendbuf) /* don't do pointer arithmetic on a NULL ptr */
215 return tMPI_Error(comm, TMPI_ERR_BUF);
218 /* we set up multiple posts, so no Post_multi */
219 cev->met[myrank].tag=TMPI_SCATTERV_TAG;
220 cev->met[myrank].datatype=sendtype;
221 tMPI_Atomic_set( &(cev->met[myrank].n_remaining), cev->N-1 );
222 for(i=0;i<cev->N;i++)
224 total_send_size += sendtype->size*sendcounts[i];
225 cev->met[myrank].bufsize[i]=sendtype->size*sendcounts[i];
226 cev->met[myrank].buf[i]=(char*)sendbuf+sendtype->size*displs[i];
228 #ifdef USE_COLLECTIVE_COPY_BUFFER
229 /* we must copy our own data too, unfortunately (otherwise there's
230 a hole) */
231 using_cb=(total_send_size < (size_t)((cev->N)*COPY_BUFFER_SIZE));
232 cev->met[myrank].using_cb=using_cb;
233 if (using_cb)
235 /* we set cpbuf stuff to NULL initially */
236 for(i=0;i<cev->N;i++)
238 /*cev->met[myrank].cpbuf[i]=NULL;*/
239 tMPI_Atomic_ptr_set(&(cev->met[myrank].cpbuf[i]), NULL);
241 tMPI_Atomic_set(&(cev->met[myrank].buf_readcount), 0);
243 #endif
245 /* post availability */
246 for(i=0;i<cev->N;i++)
248 if (i != myrank)
249 tMPI_Event_signal( &(cev->met[i].recv_ev) );
254 #ifdef USE_COLLECTIVE_COPY_BUFFER
255 /* do the copy_buffer thing */
256 if (using_cb)
258 /* copy the buffer locally. First allocate */
259 cev->met[myrank].cb=tMPI_Copy_buffer_list_get(
260 &(tMPI_Get_thread(comm,myrank)->cbl_multi));
261 if (cev->met[myrank].cb->size < total_send_size)
263 fprintf(stderr, "ERROR: cb size too small\n");
264 exit(1);
266 /* copy to the new buf */
267 memcpy(cev->met[myrank].cb->buf, sendbuf, total_send_size);
268 /* post the new buf */
269 for(i=0;i<cev->N;i++)
271 tMPI_Atomic_memory_barrier();
272 tMPI_Atomic_ptr_set(&(cev->met[myrank].cpbuf[i]),
273 (char*)cev->met[myrank].cb->buf +
274 sendtype->size*displs[i]);
275 /*cev->met[myrank].cpbuf[i]=(char*)cev->met[myrank].cb->buf +
276 sendtype->size*displs[i];*/
278 tMPI_Atomic_memory_barrier();
280 #endif
282 /* do root transfer */
283 if (recvbuf!=TMPI_IN_PLACE)
285 tMPI_Coll_root_xfer(comm, sendtype, recvtype,
286 sendtype->size*sendcounts[myrank],
287 recvtype->size*recvcount,
288 (char*)sendbuf+sendtype->size*displs[myrank],
289 recvbuf,
290 &ret);
293 /* and wait until everybody is done copying */
294 tMPI_Wait_for_others(cev, myrank);
296 else
298 /* get the root cev */
299 size_t bufsize=recvcount*recvtype->size;
300 /* wait until root becomes available */
301 tMPI_Wait_for_data(cur, cev, myrank);
302 tMPI_Mult_recv(comm, cev, root, myrank, TMPI_SCATTERV_TAG,
303 recvtype, bufsize, recvbuf, &ret);
305 #ifdef TMPI_PROFILE
306 tMPI_Profile_count_stop(cur, TMPIFN_Scatterv);
307 #endif
308 return ret;