1 // Copyright (C) 2005-2006 Matthias Troyer
3 // Use, modification and distribution is subject to the Boost Software
4 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
7 // An example of a parallel Monte Carlo simulation using some nodes to produce
8 // data and others to aggregate the data
11 #include <boost/mpi.hpp>
12 #include <boost/random/parallel.hpp>
13 #include <boost/random.hpp>
14 #include <boost/foreach.hpp>
18 namespace mpi
= boost::mpi
;
20 enum {sample_tag
, sample_skeleton_tag
, sample_broadcast_tag
, quit_tag
};
23 void calculate_samples(int sample_length
)
25 int num_samples
= 100;
26 std::vector
<double> sample(sample_length
);
28 // setup communicator by splitting
30 mpi::communicator world
;
31 mpi::communicator calculate_communicator
= world
.split(0);
33 unsigned int num_calculate_ranks
= calculate_communicator
.size();
35 // the master of the accumulaion ranks is the first of them, hence
36 // with a rank just one after the last calculation rank
37 int master_accumulate_rank
= num_calculate_ranks
;
39 // the master of the calculation ranks sends the skeleton of the sample
40 // to the master of the accumulation ranks
43 world
.send(master_accumulate_rank
,sample_skeleton_tag
,mpi::skeleton(sample
));
45 // next we extract the content of the sample vector, to be used in sending
46 // the content later on
48 mpi::content sample_content
= mpi::get_content(sample
);
50 // now intialize the parallel random number generator
53 boost::random::stream_number
= calculate_communicator
.rank(),
54 boost::random::total_streams
= calculate_communicator
.size()
57 boost::variate_generator
<boost::lcg64
&,boost::uniform_real
<> >
58 rng(engine
,boost::uniform_real
<>());
60 for (unsigned int i
=0; i
<num_samples
/num_calculate_ranks
+1;++i
) {
62 // calculate sample by filling the vector with random numbers
63 // note that std::generate will not work since it takes the generator
64 // by value, and boost::ref cannot be used as a generator.
65 // boost::ref should be fixed so that it can be used as generator
67 BOOST_FOREACH(double& x
, sample
)
70 // send sample to accumulation ranks
71 // Ideally we want to do this as a broadcast with an inter-communicator
72 // between the calculation and accumulation ranks. MPI2 should support
73 // this, but here we present an MPI1 compatible solution.
75 // send content of sample to first (master) accumulation process
77 world
.send(master_accumulate_rank
,sample_tag
,sample_content
);
79 // gather some results from all calculation ranks
81 double local_result
= sample
[0];
82 std::vector
<double> gathered_results(calculate_communicator
.size());
83 mpi::all_gather(calculate_communicator
,local_result
,gathered_results
);
86 // we are done: the master tells the accumulation ranks to quit
88 world
.send(master_accumulate_rank
,quit_tag
);
93 void accumulate_samples()
95 std::vector
<double> sample
;
97 // setup the communicator for all accumulation ranks by splitting
99 mpi::communicator world
;
100 mpi::communicator accumulate_communicator
= world
.split(1);
102 bool is_master_accumulate_rank
= accumulate_communicator
.rank()==0;
104 // the master receives the sample skeleton
106 if (is_master_accumulate_rank
)
107 world
.recv(0,sample_skeleton_tag
,mpi::skeleton(sample
));
109 // and broadcasts it to all accumulation ranks
110 mpi::broadcast(accumulate_communicator
,mpi::skeleton(sample
),0);
112 // next we extract the content of the sample vector, to be used in receiving
113 // the content later on
115 mpi::content sample_content
= mpi::get_content(sample
);
117 // accumulate until quit is called
122 // the accumulation master checks whether we should quit
123 if (world
.iprobe(0,quit_tag
)) {
124 world
.recv(0,quit_tag
);
125 for (int i
=1; i
<accumulate_communicator
.size();++i
)
126 accumulate_communicator
.send(i
,quit_tag
);
127 std::cout
<< sum
<< "\n";
131 // the otehr accumulation ranks check whether we should quit
132 if (accumulate_communicator
.iprobe(0,quit_tag
)) {
133 accumulate_communicator
.recv(0,quit_tag
);
134 std::cout
<< sum
<< "\n";
138 // check whether the master accumulation rank has received a sample
139 if (world
.iprobe(mpi::any_source
,sample_tag
)) {
140 BOOST_ASSERT(is_master_accumulate_rank
);
142 // receive the content
143 world
.recv(mpi::any_source
,sample_tag
,sample_content
);
145 // now we need to braodcast
146 // the problam is we do not have a non-blocking broadcast that we could
147 // abort if we receive a quit message from the master. We thus need to
148 // first tell all accumulation ranks to start a broadcast. If the sample
149 // is small, we could just send the sample in this message, but here we
150 // optimize the code for large samples, so that the overhead of these
151 // sends can be ignored, and we count on an optimized broadcast
152 // implementation with O(log N) complexity
154 for (int i
=1; i
<accumulate_communicator
.size();++i
)
155 accumulate_communicator
.send(i
,sample_broadcast_tag
);
157 // now broadcast the contents of the sample to all accumulate ranks
158 mpi::broadcast(accumulate_communicator
,sample_content
,0);
160 // and handle the sample by summing the appropriate value
164 // the other accumulation ranks wait for a mesage to start the broadcast
165 if (accumulate_communicator
.iprobe(0,sample_broadcast_tag
)) {
166 BOOST_ASSERT(!is_master_accumulate_rank
);
168 accumulate_communicator
.recv(0,sample_broadcast_tag
);
170 // receive broadcast of the sample contents
171 mpi::broadcast(accumulate_communicator
,sample_content
,0);
173 // and handle the sample
175 // and handle the sample by summing the appropriate value
176 sum
+= sample
[accumulate_communicator
.rank()];
181 int main(int argc
, char** argv
)
183 mpi::environment
env(argc
, argv
);
184 mpi::communicator world
;
186 // half of the processes generate, the others accumulate
187 // the sample size is just the number of accumulation ranks
188 if (world
.rank() < world
.size()/2)
189 calculate_samples(world
.size()-world
.size()/2);
191 accumulate_samples();