Release 1.39.0
[boost.git] / Boost_1_39_0 / libs / mpi / example / parallel_example.cpp
blob00347d51cabd36c33019ca78c9fb7e6f8e46b86b
1 // Copyright (C) 2005-2006 Matthias Troyer
3 // Use, modification and distribution is subject to the Boost Software
4 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
7 // An example of a parallel Monte Carlo simulation using some nodes to produce
8 // data and others to aggregate the data
9 #include <iostream>
11 #include <boost/mpi.hpp>
12 #include <boost/random/parallel.hpp>
13 #include <boost/random.hpp>
14 #include <boost/foreach.hpp>
15 #include <iostream>
16 #include <cstdlib>
18 namespace mpi = boost::mpi;
20 enum {sample_tag, sample_skeleton_tag, sample_broadcast_tag, quit_tag};
23 void calculate_samples(int sample_length)
25 int num_samples = 100;
26 std::vector<double> sample(sample_length);
28 // setup communicator by splitting
30 mpi::communicator world;
31 mpi::communicator calculate_communicator = world.split(0);
33 unsigned int num_calculate_ranks = calculate_communicator.size();
35 // the master of the accumulaion ranks is the first of them, hence
36 // with a rank just one after the last calculation rank
37 int master_accumulate_rank = num_calculate_ranks;
39 // the master of the calculation ranks sends the skeleton of the sample
40 // to the master of the accumulation ranks
42 if (world.rank()==0)
43 world.send(master_accumulate_rank,sample_skeleton_tag,mpi::skeleton(sample));
45 // next we extract the content of the sample vector, to be used in sending
46 // the content later on
48 mpi::content sample_content = mpi::get_content(sample);
50 // now intialize the parallel random number generator
52 boost::lcg64 engine(
53 boost::random::stream_number = calculate_communicator.rank(),
54 boost::random::total_streams = calculate_communicator.size()
57 boost::variate_generator<boost::lcg64&,boost::uniform_real<> >
58 rng(engine,boost::uniform_real<>());
60 for (unsigned int i=0; i<num_samples/num_calculate_ranks+1;++i) {
62 // calculate sample by filling the vector with random numbers
63 // note that std::generate will not work since it takes the generator
64 // by value, and boost::ref cannot be used as a generator.
65 // boost::ref should be fixed so that it can be used as generator
67 BOOST_FOREACH(double& x, sample)
68 x = rng();
70 // send sample to accumulation ranks
71 // Ideally we want to do this as a broadcast with an inter-communicator
72 // between the calculation and accumulation ranks. MPI2 should support
73 // this, but here we present an MPI1 compatible solution.
75 // send content of sample to first (master) accumulation process
77 world.send(master_accumulate_rank,sample_tag,sample_content);
79 // gather some results from all calculation ranks
81 double local_result = sample[0];
82 std::vector<double> gathered_results(calculate_communicator.size());
83 mpi::all_gather(calculate_communicator,local_result,gathered_results);
86 // we are done: the master tells the accumulation ranks to quit
87 if (world.rank()==0)
88 world.send(master_accumulate_rank,quit_tag);
93 void accumulate_samples()
95 std::vector<double> sample;
97 // setup the communicator for all accumulation ranks by splitting
99 mpi::communicator world;
100 mpi::communicator accumulate_communicator = world.split(1);
102 bool is_master_accumulate_rank = accumulate_communicator.rank()==0;
104 // the master receives the sample skeleton
106 if (is_master_accumulate_rank)
107 world.recv(0,sample_skeleton_tag,mpi::skeleton(sample));
109 // and broadcasts it to all accumulation ranks
110 mpi::broadcast(accumulate_communicator,mpi::skeleton(sample),0);
112 // next we extract the content of the sample vector, to be used in receiving
113 // the content later on
115 mpi::content sample_content = mpi::get_content(sample);
117 // accumulate until quit is called
118 double sum=0.;
119 while (true) {
122 // the accumulation master checks whether we should quit
123 if (world.iprobe(0,quit_tag)) {
124 world.recv(0,quit_tag);
125 for (int i=1; i<accumulate_communicator.size();++i)
126 accumulate_communicator.send(i,quit_tag);
127 std::cout << sum << "\n";
128 break; // We're done
131 // the otehr accumulation ranks check whether we should quit
132 if (accumulate_communicator.iprobe(0,quit_tag)) {
133 accumulate_communicator.recv(0,quit_tag);
134 std::cout << sum << "\n";
135 break; // We're done
138 // check whether the master accumulation rank has received a sample
139 if (world.iprobe(mpi::any_source,sample_tag)) {
140 BOOST_ASSERT(is_master_accumulate_rank);
142 // receive the content
143 world.recv(mpi::any_source,sample_tag,sample_content);
145 // now we need to braodcast
146 // the problam is we do not have a non-blocking broadcast that we could
147 // abort if we receive a quit message from the master. We thus need to
148 // first tell all accumulation ranks to start a broadcast. If the sample
149 // is small, we could just send the sample in this message, but here we
150 // optimize the code for large samples, so that the overhead of these
151 // sends can be ignored, and we count on an optimized broadcast
152 // implementation with O(log N) complexity
154 for (int i=1; i<accumulate_communicator.size();++i)
155 accumulate_communicator.send(i,sample_broadcast_tag);
157 // now broadcast the contents of the sample to all accumulate ranks
158 mpi::broadcast(accumulate_communicator,sample_content,0);
160 // and handle the sample by summing the appropriate value
161 sum += sample[0];
164 // the other accumulation ranks wait for a mesage to start the broadcast
165 if (accumulate_communicator.iprobe(0,sample_broadcast_tag)) {
166 BOOST_ASSERT(!is_master_accumulate_rank);
168 accumulate_communicator.recv(0,sample_broadcast_tag);
170 // receive broadcast of the sample contents
171 mpi::broadcast(accumulate_communicator,sample_content,0);
173 // and handle the sample
175 // and handle the sample by summing the appropriate value
176 sum += sample[accumulate_communicator.rank()];
181 int main(int argc, char** argv)
183 mpi::environment env(argc, argv);
184 mpi::communicator world;
186 // half of the processes generate, the others accumulate
187 // the sample size is just the number of accumulation ranks
188 if (world.rank() < world.size()/2)
189 calculate_samples(world.size()-world.size()/2);
190 else
191 accumulate_samples();
193 return 0;