initial commit for version 1.6.x patch release
[OpenFOAM-1.6.x.git] / src / Pstream / gamma / Pstream.C
blob1e4a6fa6421d9ca735ee48feef327f7a3cd5f39f
1 /*---------------------------------------------------------------------------*\
2   =========                 |
3   \\      /  F ield         | OpenFOAM: The Open Source CFD Toolbox
4    \\    /   O peration     |
5     \\  /    A nd           | Copyright (C) 1991-2009 OpenCFD Ltd.
6      \\/     M anipulation  |
7 -------------------------------------------------------------------------------
8 License
9     This file is part of OpenFOAM.
11     OpenFOAM is free software; you can redistribute it and/or modify it
12     under the terms of the GNU General Public License as published by the
13     Free Software Foundation; either version 2 of the License, or (at your
14     option) any later version.
16     OpenFOAM is distributed in the hope that it will be useful, but WITHOUT
17     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
18     FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
19     for more details.
21     You should have received a copy of the GNU General Public License
22     along with OpenFOAM; if not, write to the Free Software Foundation,
23     Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
25 Description
26     Pstream for GAMMA
28     GAMMA has a (polling) receive handler which gets called every time a
29     received message is complete. Ours stores the length of the currently
30     received message and sets up the next buffer to store the next message
31     in.
32     Note that the pattern between two processors can be
33     - send
34     - receive
35     - receive
36     - send
37     since the first swap might belong to a local exchange and the second to
38     a reduce. Since gamma has to have the receive buffers already set up we
39     have to allocate them big enough. To prevent excessive amounts needed we
40     dynamically resize them (never shrink) by sending special 'resize' messages
41     before sending a largish message.
43     Because of this we actually need four receive buffers:
44     - send
45     - receive resize message
46     - receive normal message
47     - receive resize message
48     - receive normal message
49     - send
51     The special resize message is a message with a special header which
52     (hopefully) should never appear in normal exchanges (it actually checks
53     for this in the OPstream::send)
55 \*---------------------------------------------------------------------------*/
57 #include "Pstream.H"
58 #include "PstreamReduceOps.H"
59 #include "OSspecific.H"
60 #include "PstreamGlobals.H"
62 #include <cstring>
63 #include <cstdlib>
64 #include <csignal>
66 extern "C"
68 #   include <linux/gamma/libgamma.h>
72 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
74 namespace Foam
77 // Receive handler to copy out received message length and switch buffers.
78 static void handler(void)
80     label current = PstreamGlobals::recvIndex[gamma_active_port];
82     List<char>& buf = PstreamGlobals::recvBuf[current][gamma_active_port];
83     label bufLen = PstreamGlobals::recvBufLen[current][gamma_active_port];
85     if (bufLen != -1)
86     {
87         FatalErrorIn("Pstream::handler(void)")
88             << "Buffer length not reset : "
89             << bufLen
90             << " when receiving message of size " << gamma_msglen
91             << " from processor " << gamma_active_port << endl
92             << "This means that the existing data has not been consumed yet"
93             << " (by IPstream::read) and means your communication pattern"
94             << " is probably not balanced (a receive for every send)"
95             << endl
96             << "This can happen if you have e.g. gather without scatter."
97             << endl
98             << "A workaround is to increase the depth of the circular"
99             << " receive buffers in PstreamGlobals.H"
100             << abort(FatalError);
101     }
104     // Some checks
105     if
106     (
107         gamma_msglen < 0
108      || gamma_msglen > buf.size()
109     )
110     {
111         FatalErrorIn("Pstream::handler(void)")
112             << "Received message of size " << gamma_msglen
113             << " from processor " << gamma_active_port
114             << Foam::endl
115             << "but global receive buffer is only of size "
116             << buf.size()
117             << abort(FatalError);
118     }
120     // Check for resize message
121     label resizeLen = PstreamGlobals::getSizeFromHeader
122     (
123         buf.begin(),
124         gamma_msglen
125     );
127     if (resizeLen != -1)
128     {
129         if (Pstream::debug)
130         {
131             Pout<< "Pstream::handler : Resize message:" << resizeLen
132                 << " from proc " << gamma_active_port
133                 << " current size:"
134                 << PstreamGlobals::getMaxBufSize(gamma_active_port)
135                 << Foam::endl;
136         }
138         // Saved current buffer.
139         List<char> savedBuf;
141         if (resizeLen > PstreamGlobals::getMaxBufSize(gamma_active_port))
142         {
143             if (Pstream::debug)
144             {
145                 Pout<< "Pstream::handler :"
146                     << " resizing receive buffer for processor "
147                     << gamma_active_port
148                     << " from "
149                     << PstreamGlobals::getMaxBufSize(gamma_active_port)
150                     << " to " << resizeLen << Foam::endl;
151             }
153             // Save the pointer (that gamma knows about) so we can safely
154             // gamma_switch_to_buffer with a valid pointer.
155             // Not sure if nessecary but do anyway.
156             savedBuf.transfer(buf);
158             // Resize all the buffers
159             forAll(PstreamGlobals::recvBuf, i)
160             {
161                 List<char>& chars =
162                     PstreamGlobals::recvBuf[i][gamma_active_port];
164 //                gamma_munlock(chars.begin(), chars.size());
165                 chars.setSize(resizeLen);
166 //                gamma_mlock(chars.begin(), chars.size());
167             }
168         }
170         // Update length with special value to denote resize was done.
171         PstreamGlobals::recvBufLen[current][gamma_active_port] = -2;
172     }
173     else
174     {
175         // Update length with actual message length
176         PstreamGlobals::recvBufLen[current][gamma_active_port] = gamma_msglen;
177     }
179     // Go to next buffer.
180     label next = PstreamGlobals::recvBuf.fcIndex(current);
181     PstreamGlobals::recvIndex[gamma_active_port] = next;
183 //    gamma_switch_to_buffer
184     gamma_post_recv
185     (
186         gamma_active_port,
187         PstreamGlobals::recvBuf[next][gamma_active_port].begin(),
188         PstreamGlobals::recvBuf[next][gamma_active_port].size()
189     );
193 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
195 void Pstream::addValidParOptions(HashTable<string>& validParOptions)
197     validParOptions.insert("np", "");
198     validParOptions.insert("p4pg", "PI file");
199     validParOptions.insert("p4wd", "directory");
200     validParOptions.insert("p4amslave", "");
201     validParOptions.insert("p4yourname", "hostname");
203     validParOptions.insert("machinefile", "machine file");
204     validParOptions.insert("GAMMANP", "numProcs");
205     validParOptions.insert("GAMMAHOME", "gamma cwd");
206     validParOptions.insert("GAMMA", "1(enable) or 0(disable)");
210 bool Pstream::init(int& argc, char**& argv)
212     int numprocs = 0;
214     string npString("-GAMMANP");
216     for (label i = 0; i < argc; i++)
217     {
218         if (argv[i] == npString)
219         {
220             if (i+1 < argc)
221             {
222                 numprocs = atoi(argv[i+1]);
223                 break;
224             }
225         }
226     }
228     // Initialize GAMMA
229     unsigned char smallNumprocs = numprocs;
231     gamma_init(smallNumprocs, argc, argv);
233     myProcNo_ = gamma_my_node();
235     // Make sure printing with prefix.
236     setParRun();
238     procIDs_.setSize(numprocs);
240     forAll(procIDs_, procNo)
241     {
242         procIDs_[procNo] = procNo;
243     }
246     // Allocate receive buffers.
247     // ~~~~~~~~~~~~~~~~~~~~~~~~~
249     // Make sure each receive buffer is at least large enough to receive
250     // the resize message.
252     // Current active buffer
253     PstreamGlobals::recvIndex.setSize(numprocs);
254     PstreamGlobals::recvIndex = 0;
255     PstreamGlobals::consumeIndex.setSize(numprocs);
256     PstreamGlobals::consumeIndex = 0;
258     forAll(PstreamGlobals::recvBuf, i)
259     {
260         PstreamGlobals::recvBufLen[i].setSize(numprocs);
261         PstreamGlobals::recvBufLen[i] = -1;
263         List<List<char> >& buffers = PstreamGlobals::recvBuf[i];
265         buffers.setSize(numprocs);
266         forAll(buffers, procNo)
267         {
268             if (procNo != myProcNo_)
269             {
270                 buffers[procNo].setSize(PstreamGlobals::initialBufferLen);
272                 // Acc. to gamma sources all buffers need to be in memory.
273                 // Either locked or "write touched".
274 //                gamma_mlock
275  //               (
276   //                  buffers[procNo].begin(),
277    //                 buffers[procNo].size()
278     //            );
279             }
280         }
281     }
284     // Lock the special resize message
285 //    gamma_mlock
286 //    (
287  //       reinterpret_cast<char*>(PstreamGlobals::resizeMessage),
288   //      PstreamGlobals::resizeMessageLen*sizeof(uint64_t)
289    // );
292     // Attach current receive buffers
293     forAll(procIDs_, procNo)
294     {
295         if (procNo != myProcNo_)
296         {
297             // Buffer index (always 0 at this point)
298             label current = PstreamGlobals::recvIndex[procNo];
300             // Current buffer for this processor.
301             List<char>& buf = PstreamGlobals::recvBuf[current][procNo];
303             gamma_set_active_port
304             (
305                 procNo,             //unsigned short port,
306                 procNo,             //unsigned short dest_node,
307                 gamma_my_par_pid(), //unsigned char dest_par_pid,
308                 myProcNo_,          //unsigned short dest_port,
309                 handler,            //callback
310                 procNo,             //unsigned short semaphore,
311                 GO_BACK,            //unsigned char buffer_kind,
312                 buf.begin(),
313                 buf.size()
314             );
315         }
316     }
319     // Make sure all have allocated the ports (so set the receive buffers)
320     gamma_sync();
322     Info<< "GAMMA Pstream initialized with:" << nl
323         << "    floatTransfer         : " << floatTransfer << nl
324         << "    nProcsSimpleSum       : " << nProcsSimpleSum << nl
325         << "    scheduledTransfer     : " << Pstream::scheduledTransfer << nl
326         << Foam::endl;
328     // Now that nprocs is known construct communication tables.
329     initCommunicationSchedule();
331     return true;
335 void Pstream::exit(int errnum)
337 //    gamma_munlockall();
338     gamma_exit();
339     //gamma_abort();
343 void Pstream::abort()
345 Pout<< "**Pstream::abort()**" << endl;
346 //    gamma_munlockall();
347     gamma_abort();
351 void reduce(scalar& Value, const sumOp<scalar>& bop)
353     if (!Pstream::parRun())
354     {
355         return;
356     }
358     if (Pstream::debug)
359     {
360         Pout<< "**entering Pstream::reduce for " << Value << Foam::endl;
361     }
364     if (Pstream::master())
365     {
366         for
367         (
368             int slave=Pstream::firstSlave();
369             slave<=Pstream::lastSlave();
370             slave++
371         )
372         {
373             scalar value;
375             if
376             (
377                !IPstream::read
378                 (
379                     slave,
380                     reinterpret_cast<char*>(&value),    // buf
381                     sizeof(Value)                       // bufSize
382                 )
383             )
384             {
385                 FatalErrorIn
386                 (
387                     "reduce(scalar& Value, const sumOp<scalar>& sumOp)"
388                 )   << "IPstream::read failed"
389                     << Foam::abort(FatalError);
390             }
392             Value = bop(Value, value);
393         }
394     }
395     else
396     {
397         if
398         (
399            !OPstream::write
400             (
401                 Pstream::masterNo(),
402                 reinterpret_cast<const char*>(&Value),  // buf
403                 sizeof(Value),                          // bufSize
404                 false                                   // non-buffered
405             )
406         )
407         {
408             FatalErrorIn
409             (
410                 "reduce(scalar& Value, const sumOp<scalar>& sumOp)"
411             )   << "OPstream::write failed"
412                 << Foam::abort(FatalError);
413         }
414     }
416     if (Pstream::master())
417     {
418         for
419         (
420             int slave=Pstream::firstSlave();
421             slave<=Pstream::lastSlave();
422             slave++
423         )
424         {
425             if
426             (
427                !OPstream::write
428                 (
429                     slave,
430                     reinterpret_cast<const char*>(&Value),  // buf
431                     sizeof(Value),                          // bufSize,
432                     false                                   // non-buffered
433                 )
434             )
435             {
436                 FatalErrorIn
437                 (
438                     "reduce(scalar& Value, const sumOp<scalar>& sumOp)"
439                 )   << "OPstream::write failed"
440                     << Foam::abort(FatalError);
441             }
442         }
443     }
444     else
445     {
446         if
447         (
448            !IPstream::read
449             (
450                 Pstream::masterNo(),
451                 reinterpret_cast<char*>(&Value),    // buf
452                 sizeof(Value)                       // bufSize
453             )
454         )
455         {
456             FatalErrorIn
457             (
458                 "reduce(scalar& Value, const sumOp<scalar>& sumOp)"
459             )   << "IPstream::read failed"
460                 << Foam::abort(FatalError);
461         }
462     }
464     if (Pstream::debug)
465     {
466         Pout<< "**exiting Pstream::reduce with " << Value << Foam::endl;
467     }
471 // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * //
473 } // End namespace Foam
475 // ************************************************************************* //