2 This source code file is part of thread_mpi.
3 Written by Sander Pronk, Erik Lindahl, and possibly others.
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in the
14 documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16 names of its contributors may be used to endorse or promote products
17 derived from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
38 /* this file is #included from collective.c; it's not really a header file,
39 but this defines a lot of functions that probably need to be inlined.*/
42 int tMPI_Scatter(void* sendbuf
, int sendcount
, tMPI_Datatype sendtype
,
43 void* recvbuf
, int recvcount
, tMPI_Datatype recvtype
,
44 int root
, tMPI_Comm comm
)
50 struct tmpi_thread
*cur
=tMPI_Get_current();
53 tMPI_Profile_count_start(cur
);
56 tMPI_Trace_print("tMPI_Scatter(%p, %d, %p, %p, %d, %p, %d, %p)",
57 sendbuf
, sendcount
, sendtype
,
58 recvbuf
, recvcount
, recvtype
, root
, comm
);
62 return tMPI_Error(TMPI_COMM_WORLD
, TMPI_ERR_COMM
);
64 myrank
=tMPI_Comm_seek_rank(comm
, cur
);
66 /* we increase our counter, and determine which coll_env we get */
67 cev
=tMPI_Get_cev(comm
, myrank
, &synct
);
72 size_t sendsize
=sendtype
->size
*sendcount
;
73 size_t total_send_size
=0;
74 #ifdef USE_COLLECTIVE_COPY_BUFFER
78 if (!sendbuf
) /* don't do pointer arithmetic on a NULL ptr */
80 return tMPI_Error(comm
, TMPI_ERR_BUF
);
83 /* we set up multiple posts, so no Post_multi */
84 cev
->met
[myrank
].tag
=TMPI_SCATTER_TAG
;
85 cev
->met
[myrank
].datatype
=sendtype
;
86 tMPI_Atomic_set( &(cev
->met
[myrank
].n_remaining
), cev
->N
-1 );
87 for(i
=0;i
<comm
->grp
.N
;i
++)
89 total_send_size
+= sendtype
->size
*sendcount
;
90 cev
->met
[myrank
].bufsize
[i
]=sendsize
;
91 cev
->met
[myrank
].buf
[i
]=(char*)sendbuf
+sendsize
*i
;
93 #ifdef USE_COLLECTIVE_COPY_BUFFER
94 /* we must copy our own data too, unfortunately (otherwise there's
96 using_cb
=(total_send_size
< (size_t)((cev
->N
)*COPY_BUFFER_SIZE
));
97 cev
->met
[myrank
].using_cb
=using_cb
;
100 /* we set cpbuf stuff to NULL initially */
101 for(i
=0;i
<cev
->N
;i
++)
103 /*cev->met[myrank].cpbuf[i]=NULL;*/
104 tMPI_Atomic_ptr_set(&(cev
->met
[myrank
].cpbuf
[i
]), NULL
);
106 tMPI_Atomic_set(&(cev
->met
[myrank
].buf_readcount
), 0);
110 /* post availability */
111 for(i
=0;i
<cev
->N
;i
++)
114 tMPI_Event_signal( &(cev
->met
[i
].recv_ev
) );
120 #ifdef USE_COLLECTIVE_COPY_BUFFER
121 /* do the copy_buffer thing */
124 /* copy the buffer locally. First allocate */
125 cev
->met
[myrank
].cb
=tMPI_Copy_buffer_list_get(
126 &(tMPI_Get_thread(comm
,myrank
)->cbl_multi
));
127 if (cev
->met
[myrank
].cb
->size
< total_send_size
)
129 fprintf(stderr
, "ERROR: cb size too small\n");
132 /* copy to the new buf */
133 memcpy(cev
->met
[myrank
].cb
->buf
, sendbuf
, total_send_size
);
135 /* post the new buf */
136 for(i
=0;i
<cev
->N
;i
++)
138 tMPI_Atomic_memory_barrier();
139 tMPI_Atomic_ptr_set(&(cev
->met
[myrank
].cpbuf
[i
]),
140 (char*)cev
->met
[myrank
].cb
->buf
+sendsize
*i
);
141 /*cev->met[myrank].cpbuf[i] = (char*)cev->met[myrank].cb->buf +
147 /* do root transfer */
148 if (recvbuf
!=TMPI_IN_PLACE
)
150 tMPI_Coll_root_xfer(comm
, sendtype
, recvtype
,
151 sendsize
, recvtype
->size
*recvcount
,
152 (char*)sendbuf
+sendsize
*myrank
,
156 /* and wait until everybody is done copying */
157 tMPI_Wait_for_others(cev
, myrank
);
161 /* get the root cev */
162 size_t bufsize
=recvcount
*recvtype
->size
;
163 /* wait until root becomes available */
164 tMPI_Wait_for_data(cur
, cev
, myrank
);
165 tMPI_Mult_recv(comm
, cev
, root
, myrank
,TMPI_SCATTER_TAG
, recvtype
,
166 bufsize
, recvbuf
, &ret
);
169 tMPI_Profile_count_stop(cur
, TMPIFN_Scatter
);
176 int tMPI_Scatterv(void* sendbuf
, int *sendcounts
, int *displs
,
177 tMPI_Datatype sendtype
, void* recvbuf
, int recvcount
,
178 tMPI_Datatype recvtype
, int root
, tMPI_Comm comm
)
181 struct coll_env
*cev
;
183 int ret
=TMPI_SUCCESS
;
184 struct tmpi_thread
*cur
=tMPI_Get_current();
187 tMPI_Trace_print("tMPI_Scatterv(%p, %p, %p, %p, %p, %d, %p, %d, %p)",
188 sendbuf
, sendcounts
, displs
, sendtype
, recvbuf
,
189 recvcount
, recvtype
, root
, comm
);
192 tMPI_Profile_count_start(cur
);
198 return tMPI_Error(TMPI_COMM_WORLD
, TMPI_ERR_COMM
);
200 myrank
=tMPI_Comm_seek_rank(comm
, cur
);
202 /* we increase our counter, and determine which coll_env we get */
203 cev
=tMPI_Get_cev(comm
, myrank
, &synct
);
208 size_t total_send_size
=0;
209 #ifdef USE_COLLECTIVE_COPY_BUFFER
213 if (!sendbuf
) /* don't do pointer arithmetic on a NULL ptr */
215 return tMPI_Error(comm
, TMPI_ERR_BUF
);
218 /* we set up multiple posts, so no Post_multi */
219 cev
->met
[myrank
].tag
=TMPI_SCATTERV_TAG
;
220 cev
->met
[myrank
].datatype
=sendtype
;
221 tMPI_Atomic_set( &(cev
->met
[myrank
].n_remaining
), cev
->N
-1 );
222 for(i
=0;i
<cev
->N
;i
++)
224 total_send_size
+= sendtype
->size
*sendcounts
[i
];
225 cev
->met
[myrank
].bufsize
[i
]=sendtype
->size
*sendcounts
[i
];
226 cev
->met
[myrank
].buf
[i
]=(char*)sendbuf
+sendtype
->size
*displs
[i
];
228 #ifdef USE_COLLECTIVE_COPY_BUFFER
229 /* we must copy our own data too, unfortunately (otherwise there's
231 using_cb
=(total_send_size
< (size_t)((cev
->N
)*COPY_BUFFER_SIZE
));
232 cev
->met
[myrank
].using_cb
=using_cb
;
235 /* we set cpbuf stuff to NULL initially */
236 for(i
=0;i
<cev
->N
;i
++)
238 /*cev->met[myrank].cpbuf[i]=NULL;*/
239 tMPI_Atomic_ptr_set(&(cev
->met
[myrank
].cpbuf
[i
]), NULL
);
241 tMPI_Atomic_set(&(cev
->met
[myrank
].buf_readcount
), 0);
245 /* post availability */
246 for(i
=0;i
<cev
->N
;i
++)
249 tMPI_Event_signal( &(cev
->met
[i
].recv_ev
) );
254 #ifdef USE_COLLECTIVE_COPY_BUFFER
255 /* do the copy_buffer thing */
258 /* copy the buffer locally. First allocate */
259 cev
->met
[myrank
].cb
=tMPI_Copy_buffer_list_get(
260 &(tMPI_Get_thread(comm
,myrank
)->cbl_multi
));
261 if (cev
->met
[myrank
].cb
->size
< total_send_size
)
263 fprintf(stderr
, "ERROR: cb size too small\n");
266 /* copy to the new buf */
267 memcpy(cev
->met
[myrank
].cb
->buf
, sendbuf
, total_send_size
);
268 /* post the new buf */
269 for(i
=0;i
<cev
->N
;i
++)
271 tMPI_Atomic_memory_barrier();
272 tMPI_Atomic_ptr_set(&(cev
->met
[myrank
].cpbuf
[i
]),
273 (char*)cev
->met
[myrank
].cb
->buf
+
274 sendtype
->size
*displs
[i
]);
275 /*cev->met[myrank].cpbuf[i]=(char*)cev->met[myrank].cb->buf +
276 sendtype->size*displs[i];*/
278 tMPI_Atomic_memory_barrier();
282 /* do root transfer */
283 if (recvbuf
!=TMPI_IN_PLACE
)
285 tMPI_Coll_root_xfer(comm
, sendtype
, recvtype
,
286 sendtype
->size
*sendcounts
[myrank
],
287 recvtype
->size
*recvcount
,
288 (char*)sendbuf
+sendtype
->size
*displs
[myrank
],
293 /* and wait until everybody is done copying */
294 tMPI_Wait_for_others(cev
, myrank
);
298 /* get the root cev */
299 size_t bufsize
=recvcount
*recvtype
->size
;
300 /* wait until root becomes available */
301 tMPI_Wait_for_data(cur
, cev
, myrank
);
302 tMPI_Mult_recv(comm
, cev
, root
, myrank
, TMPI_SCATTERV_TAG
,
303 recvtype
, bufsize
, recvbuf
, &ret
);
306 tMPI_Profile_count_stop(cur
, TMPIFN_Scatterv
);