1 /*---------------------------------------------------------------------------*\
3 \\ / F ield | OpenFOAM: The Open Source CFD Toolbox
5 \\ / A nd | Copyright (C) 1991-2009 OpenCFD Ltd.
7 -------------------------------------------------------------------------------
9 This file is part of OpenFOAM.
11 OpenFOAM is free software; you can redistribute it and/or modify it
12 under the terms of the GNU General Public License as published by the
13 Free Software Foundation; either version 2 of the License, or (at your
14 option) any later version.
16 OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
17 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
18 FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
21 You should have received a copy of the GNU General Public License
22 along with OpenFOAM; if not, write to the Free Software Foundation,
23 Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
28 GAMMA has a (polling) receive handler which gets called every time a
29 received message is complete. Ours stores the length of the currently
30 received message and sets up the next buffer to store the next message
32 Note that the pattern between two processors can be
37 since the first swap might belong to a local exchange and the second to
38 a reduce. Since gamma has to have the receive buffers already set up we
39 have to allocate them big enough. To prevent excessive amounts needed we
40 dynamically resize them (never shrink) by sending special 'resize' messages
41 before sending a largish message.
43 Because of this we actually need four receive buffers:
45 - receive resize message
46 - receive normal message
47 - receive resize message
48 - receive normal message
51 The special resize message is a message with a special header which
52 (hopefully) should never appear in normal exchanges (it actually checks
53 for this in the OPstream::send)
55 \*---------------------------------------------------------------------------*/
58 #include "PstreamReduceOps.H"
59 #include "OSspecific.H"
60 #include "PstreamGlobals.H"
68 # include <linux/gamma/libgamma.h>
72 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
77 // Receive handler to copy out received message length and switch buffers.
78 static void handler(void)
80 label current = PstreamGlobals::recvIndex[gamma_active_port];
82 List<char>& buf = PstreamGlobals::recvBuf[current][gamma_active_port];
83 label bufLen = PstreamGlobals::recvBufLen[current][gamma_active_port];
87 FatalErrorIn("Pstream::handler(void)")
88 << "Buffer length not reset : "
90 << " when receiving message of size " << gamma_msglen
91 << " from processor " << gamma_active_port << endl
92 << "This means that the existing data has not been consumed yet"
93 << " (by IPstream::read) and means your communication pattern"
94 << " is probably not balanced (a receive for every send)"
96 << "This can happen if you have e.g. gather without scatter."
98 << "A workaround is to increase the depth of the circular"
99 << " receive buffers in PstreamGlobals.H"
100 << abort(FatalError);
108 || gamma_msglen > buf.size()
111 FatalErrorIn("Pstream::handler(void)")
112 << "Received message of size " << gamma_msglen
113 << " from processor " << gamma_active_port
115 << "but global receive buffer is only of size "
117 << abort(FatalError);
120 // Check for resize message
121 label resizeLen = PstreamGlobals::getSizeFromHeader
131 Pout<< "Pstream::handler : Resize message:" << resizeLen
132 << " from proc " << gamma_active_port
134 << PstreamGlobals::getMaxBufSize(gamma_active_port)
138 // Saved current buffer.
141 if (resizeLen > PstreamGlobals::getMaxBufSize(gamma_active_port))
145 Pout<< "Pstream::handler :"
146 << " resizing receive buffer for processor "
149 << PstreamGlobals::getMaxBufSize(gamma_active_port)
150 << " to " << resizeLen << Foam::endl;
153 // Save the pointer (that gamma knows about) so we can safely
154 // gamma_switch_to_buffer with a valid pointer.
155 // Not sure if nessecary but do anyway.
156 savedBuf.transfer(buf);
158 // Resize all the buffers
159 forAll(PstreamGlobals::recvBuf, i)
162 PstreamGlobals::recvBuf[i][gamma_active_port];
164 // gamma_munlock(chars.begin(), chars.size());
165 chars.setSize(resizeLen);
166 // gamma_mlock(chars.begin(), chars.size());
170 // Update length with special value to denote resize was done.
171 PstreamGlobals::recvBufLen[current][gamma_active_port] = -2;
175 // Update length with actual message length
176 PstreamGlobals::recvBufLen[current][gamma_active_port] = gamma_msglen;
179 // Go to next buffer.
180 label next = PstreamGlobals::recvBuf.fcIndex(current);
181 PstreamGlobals::recvIndex[gamma_active_port] = next;
183 // gamma_switch_to_buffer
187 PstreamGlobals::recvBuf[next][gamma_active_port].begin(),
188 PstreamGlobals::recvBuf[next][gamma_active_port].size()
193 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
195 void Pstream::addValidParOptions(HashTable<string>& validParOptions)
197 validParOptions.insert("np", "");
198 validParOptions.insert("p4pg", "PI file");
199 validParOptions.insert("p4wd", "directory");
200 validParOptions.insert("p4amslave", "");
201 validParOptions.insert("p4yourname", "hostname");
203 validParOptions.insert("machinefile", "machine file");
204 validParOptions.insert("GAMMANP", "numProcs");
205 validParOptions.insert("GAMMAHOME", "gamma cwd");
206 validParOptions.insert("GAMMA", "1(enable) or 0(disable)");
210 bool Pstream::init(int& argc, char**& argv)
214 string npString("-GAMMANP");
216 for (label i = 0; i < argc; i++)
218 if (argv[i] == npString)
222 numprocs = atoi(argv[i+1]);
229 unsigned char smallNumprocs = numprocs;
231 gamma_init(smallNumprocs, argc, argv);
233 myProcNo_ = gamma_my_node();
235 // Make sure printing with prefix.
238 procIDs_.setSize(numprocs);
240 forAll(procIDs_, procNo)
242 procIDs_[procNo] = procNo;
246 // Allocate receive buffers.
247 // ~~~~~~~~~~~~~~~~~~~~~~~~~
249 // Make sure each receive buffer is at least large enough to receive
250 // the resize message.
252 // Current active buffer
253 PstreamGlobals::recvIndex.setSize(numprocs);
254 PstreamGlobals::recvIndex = 0;
255 PstreamGlobals::consumeIndex.setSize(numprocs);
256 PstreamGlobals::consumeIndex = 0;
258 forAll(PstreamGlobals::recvBuf, i)
260 PstreamGlobals::recvBufLen[i].setSize(numprocs);
261 PstreamGlobals::recvBufLen[i] = -1;
263 List<List<char> >& buffers = PstreamGlobals::recvBuf[i];
265 buffers.setSize(numprocs);
266 forAll(buffers, procNo)
268 if (procNo != myProcNo_)
270 buffers[procNo].setSize(PstreamGlobals::initialBufferLen);
272 // Acc. to gamma sources all buffers need to be in memory.
273 // Either locked or "write touched".
276 // buffers[procNo].begin(),
277 // buffers[procNo].size()
284 // Lock the special resize message
287 // reinterpret_cast<char*>(PstreamGlobals::resizeMessage),
288 // PstreamGlobals::resizeMessageLen*sizeof(uint64_t)
292 // Attach current receive buffers
293 forAll(procIDs_, procNo)
295 if (procNo != myProcNo_)
297 // Buffer index (always 0 at this point)
298 label current = PstreamGlobals::recvIndex[procNo];
300 // Current buffer for this processor.
301 List<char>& buf = PstreamGlobals::recvBuf[current][procNo];
303 gamma_set_active_port
305 procNo, //unsigned short port,
306 procNo, //unsigned short dest_node,
307 gamma_my_par_pid(), //unsigned char dest_par_pid,
308 myProcNo_, //unsigned short dest_port,
310 procNo, //unsigned short semaphore,
311 GO_BACK, //unsigned char buffer_kind,
319 // Make sure all have allocated the ports (so set the receive buffers)
322 Info<< "GAMMA Pstream initialized with:" << nl
323 << " floatTransfer : " << floatTransfer << nl
324 << " nProcsSimpleSum : " << nProcsSimpleSum << nl
325 << " scheduledTransfer : " << Pstream::scheduledTransfer << nl
328 // Now that nprocs is known construct communication tables.
329 initCommunicationSchedule();
335 void Pstream::exit(int errnum)
337 // gamma_munlockall();
343 void Pstream::abort()
345 Pout<< "**Pstream::abort()**" << endl;
346 // gamma_munlockall();
351 void reduce(scalar& Value, const sumOp<scalar>& bop)
353 if (!Pstream::parRun())
360 Pout<< "**entering Pstream::reduce for " << Value << Foam::endl;
364 if (Pstream::master())
368 int slave=Pstream::firstSlave();
369 slave<=Pstream::lastSlave();
380 reinterpret_cast<char*>(&value), // buf
381 sizeof(Value) // bufSize
387 "reduce(scalar& Value, const sumOp<scalar>& sumOp)"
388 ) << "IPstream::read failed"
389 << Foam::abort(FatalError);
392 Value = bop(Value, value);
402 reinterpret_cast<const char*>(&Value), // buf
403 sizeof(Value), // bufSize
404 false // non-buffered
410 "reduce(scalar& Value, const sumOp<scalar>& sumOp)"
411 ) << "OPstream::write failed"
412 << Foam::abort(FatalError);
416 if (Pstream::master())
420 int slave=Pstream::firstSlave();
421 slave<=Pstream::lastSlave();
430 reinterpret_cast<const char*>(&Value), // buf
431 sizeof(Value), // bufSize,
432 false // non-buffered
438 "reduce(scalar& Value, const sumOp<scalar>& sumOp)"
439 ) << "OPstream::write failed"
440 << Foam::abort(FatalError);
451 reinterpret_cast<char*>(&Value), // buf
452 sizeof(Value) // bufSize
458 "reduce(scalar& Value, const sumOp<scalar>& sumOp)"
459 ) << "IPstream::read failed"
460 << Foam::abort(FatalError);
466 Pout<< "**exiting Pstream::reduce with " << Value << Foam::endl;
471 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
473 } // End namespace Foam
475 // ************************************************************************* //