Merge pull request #3835 from simpsont-oci/bench_config_name
[OpenDDS.git] / performance-tests / bench / worker / main.cpp
blob50a550bef67be1e8a71d94d5ce7adab6dc85b80e
1 #include "BenchC.h"
2 #ifdef __GNUC__
3 #pragma GCC diagnostic push
4 # if defined(__has_warning)
5 # if __has_warning("-Wclass-memaccess")
6 # pragma GCC diagnostic ignored "-Wclass-memaccess"
7 # endif
8 # endif
9 #endif
10 #include "BenchTypeSupportImpl.h"
11 #ifdef __GNUC__
12 #pragma GCC diagnostic pop
13 #endif
14 #include "BuilderTypeSupportImpl.h"
16 #include "ListenerFactory.h"
18 #include "BuilderProcess.h"
19 #include "DataReader.h"
20 #include "DataReaderListener.h"
21 #include "DataWriter.h"
22 #include "DataWriterListener.h"
23 #include "ParticipantListener.h"
24 #include "PublisherListener.h"
25 #include "SubscriberListener.h"
26 #include "TopicListener.h"
28 #include "Utils.h"
29 #include "PropertyStatBlock.h"
31 #include "ActionManager.h"
32 #include "ForwardAction.h"
33 #include "ReadAction.h"
34 #include "SetCftParametersAction.h"
35 #include "WorkerDataReaderListener.h"
36 #include "WorkerDataWriterListener.h"
37 #include "WorkerTopicListener.h"
38 #include "WorkerSubscriberListener.h"
39 #include "WorkerPublisherListener.h"
40 #include "WorkerParticipantListener.h"
41 #include "WriteAction.h"
43 #include <util.h>
44 #include <json_conversion.h>
46 #include <dds/DCPS/Service_Participant.h>
47 #include <dds/DCPS/transport/framework/TransportRegistry.h>
48 #ifdef ACE_AS_STATIC_LIBS
49 # include <dds/DCPS/RTPS/RtpsDiscovery.h>
50 # include <dds/DCPS/transport/rtps_udp/RtpsUdp.h>
51 # include <dds/DCPS/transport/udp/Udp.h>
52 # include <dds/DCPS/transport/tcp/Tcp.h>
53 # include <dds/DCPS/transport/multicast/Multicast.h>
54 # ifdef OPENDDS_SECURITY
55 # include <dds/DCPS/security/BuiltInPlugins.h>
56 # endif
57 #endif
59 #include <ace/Proactor.h>
60 #ifdef ACE_HAS_AIO_CALLS
61 # include <ace/POSIX_CB_Proactor.h>
62 #endif
64 #include <algorithm>
65 #include <cmath>
66 #include <fstream>
67 #include <iomanip>
68 #include <iostream>
69 #include <sstream>
70 #include <thread>
72 using Builder::Log;
73 using Builder::ZERO;
74 using Bench::get_option_argument;
76 const size_t DEFAULT_MAX_DECIMAL_PLACES = 9u;
77 const size_t DEFAULT_THREAD_POOL_SIZE = 4u;
79 void do_wait(const Builder::TimeStamp& ts, const std::string& ts_name, bool zero_equals_key_press = true) {
80 if (zero_equals_key_press && ts == ZERO) {
81 std::stringstream ss;
82 ss << "No " << ts_name << " time specified. Press enter to continue." << std::endl;
83 std::cerr << ss.str() << std::flush;
84 std::string line;
85 std::getline(std::cin, line);
86 } else {
87 if (ts < ZERO) {
88 auto duration = -1 * get_duration(ts);
89 if (duration > std::chrono::milliseconds(100)) {
90 std::this_thread::sleep_until(std::chrono::steady_clock::now() + duration);
92 } else {
93 auto now = std::chrono::system_clock::now();
94 auto duration = std::chrono::system_clock::time_point(get_duration(ts)) - now;
95 if (duration > std::chrono::milliseconds(100)) {
96 std::this_thread::sleep_until(std::chrono::steady_clock::now() + duration);
102 int ACE_TMAIN(int argc, ACE_TCHAR* argv[]) {
103 Builder::NullStream null_stream_i;
104 std::ostream null_stream(&null_stream_i);
106 std::string log_file_fullname;
107 std::string report_file_fullname;
108 std::string config_file_fullname;
109 std::string config_file_basename;
111 try {
112 for (int i = 1; i < argc; i++) {
113 const ACE_TCHAR* argument = argv[i];
114 if (!ACE_OS::strcmp(argv[i], ACE_TEXT("--log"))) {
115 log_file_fullname = get_option_argument(i, argc, argv);
116 } else if (!ACE_OS::strcmp(argv[i], ACE_TEXT("--report"))) {
117 report_file_fullname = get_option_argument(i, argc, argv);
118 } else if (config_file_fullname.empty()) {
119 config_file_fullname = ACE_TEXT_ALWAYS_CHAR(argument);
120 config_file_basename = ACE_TEXT_ALWAYS_CHAR(ACE::basename(argument));
121 } else {
122 std::cerr << "Invalid option: " << argument << std::endl;
123 return 1;
127 if (config_file_fullname.empty()) {
128 std::cerr << "Must pass a configuration file" << std::endl;
129 throw 1;
131 } catch (int value) {
132 std::cerr << "See DDS_ROOT/performance-tests/bench/README.md for usage" << std::endl;
133 return value;
136 std::ifstream config_file(config_file_fullname);
137 if (!config_file.is_open()) {
138 std::cerr << "Unable to open configuration file: '" << config_file_fullname << "'" << std::endl;
139 return 2;
142 std::ofstream log_file;
143 if (!log_file_fullname.empty()) {
144 log_file.open(log_file_fullname, ios::app);
145 if (!log_file.good()) {
146 std::cerr << "Unable to open log file: '" << log_file_fullname << "'" << std::endl;
147 return 2;
149 Log::stream = &log_file;
150 } else {
151 Log::stream = &std::cout;
154 std::ofstream report_file;
155 if (!report_file_fullname.empty()) {
156 report_file.open(report_file_fullname);
157 if (!report_file.good()) {
158 std::cerr << "Unable to open report file: '" << report_file_fullname << "'" << std::endl;
159 return 2;
163 using Builder::ZERO;
165 Bench::WorkerConfig config{};
167 config.create_time = ZERO;
168 config.enable_time = ZERO;
169 config.start_time = ZERO;
170 config.stop_time = ZERO;
171 config.wait_for_discovery = false;
172 config.wait_for_discovery_seconds = 0;
174 if (!json_2_idl(config_file, config)) {
175 std::cerr << "Unable to parse configuration" << std::endl;
176 return 3;
179 config.process.name = config_file_basename.c_str();
181 // Bad-actor test & debugging options for node & test controllers
183 Builder::ConstPropertyIndex force_worker_segfault_prop =
184 get_property(config.properties, "force_worker_segfault", Builder::PVK_ULL);
185 if (force_worker_segfault_prop) {
186 if (force_worker_segfault_prop->value.ull_prop()) {
187 DDS::DataReaderListener* drl_ptr = 0;
188 drl_ptr->on_data_available(0);
192 Builder::ConstPropertyIndex force_worker_assert_prop =
193 get_property(config.properties, "force_worker_assert", Builder::PVK_ULL);
194 if (force_worker_assert_prop) {
195 if (force_worker_assert_prop->value.ull_prop()) {
196 OPENDDS_ASSERT(false);
200 Builder::ConstPropertyIndex force_worker_deadlock_prop =
201 get_property(config.properties, "force_worker_deadlock", Builder::PVK_ULL);
202 if (force_worker_deadlock_prop) {
203 if (force_worker_deadlock_prop->value.ull_prop()) {
204 std::mutex m;
205 std::unique_lock<std::mutex> l(m);
206 std::thread t([&](){
207 std::unique_lock<std::mutex> tl(m);
209 t.join();
213 // Register some Bench-specific types
214 Builder::TypeSupportRegistry::TypeSupportRegistration
215 process_config_registration(new Builder::ProcessConfigTypeSupportImpl());
216 Builder::TypeSupportRegistry::TypeSupportRegistration
217 data_registration(new Bench::DataTypeSupportImpl());
219 // Register some Bench-specific listener factories
220 try {
221 Builder::register_topic_listener("bench_tl", [](const Builder::PropertySeq& properties){
222 return DDS::TopicListener_var(new Bench::WorkerTopicListener(properties));
224 Builder::register_datareader_listener("bench_drl", [](const Builder::PropertySeq& properties){
225 return DDS::DataReaderListener_var(new Bench::WorkerDataReaderListener(properties));
227 Builder::register_subscriber_listener("bench_sl", [](const Builder::PropertySeq& properties){
228 return DDS::SubscriberListener_var(new Bench::WorkerSubscriberListener(properties));
230 Builder::register_datawriter_listener("bench_dwl", [](const Builder::PropertySeq& properties){
231 return DDS::DataWriterListener_var(new Bench::WorkerDataWriterListener(properties));
233 Builder::register_publisher_listener("bench_pl", [](const Builder::PropertySeq& properties){
234 return DDS::PublisherListener_var(new Bench::WorkerPublisherListener(properties));
236 Builder::register_domain_participant_listener("bench_partl", [](const Builder::PropertySeq& properties){
237 return DDS::DomainParticipantListener_var(new Bench::WorkerParticipantListener(properties));
239 } catch (const std::exception& e) {
240 std::cerr << "Exception caught trying to register listener factories: " << e.what() << std::endl;
241 return 4;
244 // Disable some Proactor debug chatter to stdout (eventually make this configurable?)
245 ACE_Log_Category::ace_lib().priority_mask(0);
247 #ifdef ACE_HAS_AIO_CALLS
248 Builder::ConstPropertyIndex use_aio_proactor_prop =
249 get_property(config.properties, "use_aio_proactor", Builder::PVK_ULL);
250 #endif
252 std::shared_ptr<ACE_Proactor> proactor;
253 #ifdef ACE_HAS_AIO_CALLS
254 if (use_aio_proactor_prop && use_aio_proactor_prop->value.ull_prop()) {
255 proactor.reset(new ACE_Proactor(new ACE_POSIX_AIOCB_Proactor()));
256 } else {
257 #endif
258 proactor.reset(new ACE_Proactor());
259 #ifdef ACE_HAS_AIO_CALLS
261 #endif
263 int max_decimal_places = DEFAULT_MAX_DECIMAL_PLACES;
264 Builder::ConstPropertyIndex max_decimal_places_prop =
265 get_property(config.properties, "max_decimal_places", Builder::PVK_ULL);
266 if (max_decimal_places_prop) {
267 max_decimal_places = static_cast<int>(max_decimal_places_prop->value.ull_prop());
270 size_t action_thread_pool_size = std::min(static_cast<size_t>(config.actions.length()), DEFAULT_THREAD_POOL_SIZE);
271 Builder::ConstPropertyIndex action_thread_pool_size_prop =
272 get_property(config.properties, "action_thread_pool_size", Builder::PVK_ULL);
273 if (action_thread_pool_size_prop) {
274 action_thread_pool_size = static_cast<size_t>(action_thread_pool_size_prop->value.ull_prop());
277 size_t redirect_ace_log = 1;
278 Builder::ConstPropertyIndex redirect_ace_log_prop =
279 get_property(config.properties, "redirect_ace_log", Builder::PVK_ULL);
280 if (redirect_ace_log_prop) {
281 redirect_ace_log = static_cast<size_t>(redirect_ace_log_prop->value.ull_prop());
284 if (redirect_ace_log && !log_file_fullname.empty()) {
285 std::ofstream* output_stream = new std::ofstream(log_file_fullname.c_str(), ios::app);
286 if (output_stream->bad()) {
287 delete output_stream;
288 } else {
289 ACE_LOG_MSG->msg_ostream(output_stream, true);
291 ACE_LOG_MSG->clr_flags(ACE_Log_Msg::STDERR | ACE_Log_Msg::LOGGER);
292 ACE_LOG_MSG->set_flags(ACE_Log_Msg::OSTREAM);
295 // Register actions
296 Bench::ActionManager::Registration
297 write_action_registration("write", [&](){
298 return std::shared_ptr<Bench::Action>(new Bench::WriteAction(*proactor));
300 Bench::ActionManager::Registration
301 read_action_registration("read", [&](){
302 return std::shared_ptr<Bench::Action>(new Bench::ReadAction(*proactor));
304 Bench::ActionManager::Registration
305 forward_action_registration("forward", [&](){
306 return std::shared_ptr<Bench::Action>(new Bench::ForwardAction(*proactor));
308 Bench::ActionManager::Registration
309 set_cft_parameters_action_registration("set_cft_parameters", [&]() {
310 return std::shared_ptr<Bench::Action>(new Bench::SetCftParametersAction(*proactor));
313 // Timestamps used to measure method call durations
314 Builder::TimeStamp process_construction_begin_time = ZERO, process_construction_end_time = ZERO;
315 Builder::TimeStamp process_enable_begin_time = ZERO, process_enable_end_time = ZERO;
316 Builder::TimeStamp process_start_begin_time = ZERO, process_start_end_time = ZERO;
317 Builder::TimeStamp process_stop_begin_time = ZERO, process_stop_end_time = ZERO;
318 Builder::TimeStamp process_destruction_begin_time = ZERO, process_destruction_end_time = ZERO;
319 Builder::TimeStamp process_start_discovery_time = ZERO, process_stop_discovery_time = ZERO;
321 set_global_properties(config.properties);
323 Bench::WorkerReport worker_report{};
324 Builder::ProcessReport& process_report = worker_report.process_report;
326 const size_t thread_pool_size = static_cast<size_t>(action_thread_pool_size);
327 std::vector<std::shared_ptr<std::thread> > thread_pool;
328 for (size_t i = 0; i < thread_pool_size; ++i) {
329 thread_pool.emplace_back(std::make_shared<std::thread>([&](){ proactor->proactor_run_event_loop(); }));
332 try {
333 std::string line;
335 do_wait(config.create_time, "create", false);
337 Log::log() << "Beginning process construction / entity creation." << std::endl;
339 process_construction_begin_time = Builder::get_hr_time();
340 Builder::BuilderProcess process(config.process);
341 process_construction_end_time = Builder::get_hr_time();
343 Log::log() << std::endl << "Process construction / entity creation complete." << std::endl << std::endl;
345 Log::log() << "Beginning action construction / initialization." << std::endl;
347 Bench::ActionManager am(config.actions, config.action_reports, process.get_reader_map(), process.get_writer_map(), process.get_cft_map());
349 Log::log() << "Action construction / initialization complete." << std::endl << std::endl;
351 do_wait(config.enable_time, "enable");
353 Log::log() << "Enabling DDS entities (if not already enabled)." << std::endl;
355 process_enable_begin_time = Builder::get_hr_time();
356 process.enable_dds_entities(true);
357 process_enable_end_time = Builder::get_hr_time();
359 Log::log() << "DDS entities enabled." << std::endl << std::endl;
361 if (config.wait_for_discovery) {
363 Log::log() << "Starting Discovery Check." << std::endl;
365 process_start_discovery_time = Builder::get_sys_time();
367 if (config.wait_for_discovery_seconds > 0) {
369 const std::chrono::seconds timeoutPeriod(config.wait_for_discovery_seconds);
370 const std::chrono::system_clock::time_point timeout_time = std::chrono::system_clock::now() + timeoutPeriod;
372 auto readMap = process.get_reader_map();
373 if (readMap.size() > 0) {
374 typedef std::map<std::string, std::shared_ptr<Builder::DataReader>>::iterator ReadMapIt;
375 std::shared_ptr<Builder::DataReader> dtRdrPtr(nullptr);
377 for (ReadMapIt it = readMap.begin(); it != readMap.end(); ++it) {
378 dtRdrPtr = it->second;
379 Bench::WorkerDataReaderListener* wdrl = dynamic_cast<Bench::WorkerDataReaderListener*>(dtRdrPtr->get_dds_datareaderlistener().in());
381 if (!wdrl->wait_for_expected_match(timeout_time)) {
382 Log::log() << "Error: " << it->first << " Expected writers not found." << std::endl << std::endl;
387 auto writeMap = process.get_writer_map();
388 if (writeMap.size() > 0) {
389 typedef std::map<std::string, std::shared_ptr<Builder::DataWriter>>::iterator WriteMapIt;
390 std::shared_ptr<Builder::DataWriter> dtWtrPtr(nullptr);
392 for (WriteMapIt it = writeMap.begin(); it != writeMap.end(); ++it) {
393 dtWtrPtr = it->second;
394 Bench::WorkerDataWriterListener* wdwl = dynamic_cast<Bench::WorkerDataWriterListener*>(dtWtrPtr->get_dds_datawriterlistener().in());
396 if (!wdwl->wait_for_expected_match(timeout_time)) {
397 Log::log() << "Error: " << it->first << " Expected readers not found." << std::endl << std::endl;
403 process_stop_discovery_time = Builder::get_sys_time();
405 Log::log() << "Discovery of expected entities took " << process_stop_discovery_time - process_start_discovery_time << " seconds." << std::endl << std::endl;
408 Log::log() << "Initializing process actions." << std::endl;
410 am.action_start();
412 do_wait(config.start_time, "start");
414 Log::log() << "Starting process actions." << std::endl;
416 process_start_begin_time = Builder::get_hr_time();
417 am.test_start();
418 process_start_end_time = Builder::get_hr_time();
420 Log::log() << "Process tests started." << std::endl << std::endl;
422 do_wait(config.stop_time, "stop");
424 Log::log() << "Stopping process tests." << std::endl;
426 process_stop_begin_time = Builder::get_hr_time();
427 am.test_stop();
428 process_stop_end_time = Builder::get_hr_time();
430 Log::log() << "Process tests stopped." << std::endl << std::endl;
432 do_wait(config.destruction_time, "destruction");
434 Log::log() << "Stopping process actions." << std::endl;
436 am.action_stop();
438 proactor->proactor_end_event_loop();
439 for (size_t i = 0; i < thread_pool_size; ++i) {
440 thread_pool[i]->join();
442 thread_pool.clear();
444 Log::log() << "Detaching Listeners." << std::endl;
446 process.detach_listeners();
448 process_report = process.get_report();
450 Log::log() << "Beginning process destruction / entity deletion." << std::endl;
452 process_destruction_begin_time = Builder::get_hr_time();
453 } catch (const std::exception& e) {
454 std::cerr << "Exception caught trying to execute test sequence: " << e.what() << std::endl;
455 proactor->proactor_end_event_loop();
456 for (size_t i = 0; i < thread_pool_size; ++i) {
457 thread_pool[i]->join();
459 thread_pool.clear();
460 TheServiceParticipant->shutdown();
461 return 1;
462 } catch (...) {
463 std::cerr << "Unknown exception caught trying to execute test sequence" << std::endl;
464 proactor->proactor_end_event_loop();
465 for (size_t i = 0; i < thread_pool_size; ++i) {
466 thread_pool[i]->join();
468 thread_pool.clear();
469 TheServiceParticipant->shutdown();
470 return 1;
472 process_destruction_end_time = Builder::get_hr_time();
474 Log::log() << "Process destruction / entity deletion complete." << std::endl << std::endl;
476 // Some preliminary measurements and reporting (eventually will shift to another process?)
477 worker_report.construction_time = process_construction_end_time - process_construction_begin_time;
478 worker_report.enable_time = process_enable_end_time - process_enable_begin_time;
479 worker_report.start_time = process_start_end_time - process_start_begin_time;
480 worker_report.stop_time = process_stop_end_time - process_stop_begin_time;
481 worker_report.destruction_time = process_destruction_end_time - process_destruction_begin_time;
482 worker_report.undermatched_readers = 0;
483 worker_report.undermatched_writers = 0;
485 try {
486 for (CORBA::ULong i = 0; i < process_report.participants.length(); ++i) {
487 for (CORBA::ULong j = 0; j < process_report.participants[i].subscribers.length(); ++j) {
488 for (CORBA::ULong k = 0; k < process_report.participants[i].subscribers[j].datareaders.length(); ++k) {
489 Builder::DataReaderReport& dr_report = process_report.participants[i].subscribers[j].datareaders[k];
491 const Builder::TimeStamp dr_enable_time =
492 get_or_create_property(dr_report.properties, "enable_time", Builder::PVK_TIME)->value.time_prop();
493 const Builder::TimeStamp dr_last_discovery_time =
494 get_or_create_property(dr_report.properties, "last_discovery_time", Builder::PVK_TIME)->value.time_prop();
496 if (!(ZERO < dr_enable_time && ZERO < dr_last_discovery_time)) {
497 ++worker_report.undermatched_readers;
502 for (CORBA::ULong j = 0; j < process_report.participants[i].publishers.length(); ++j) {
503 for (CORBA::ULong k = 0; k < process_report.participants[i].publishers[j].datawriters.length(); ++k) {
504 Builder::DataWriterReport& dw_report = process_report.participants[i].publishers[j].datawriters[k];
506 const Builder::TimeStamp dw_enable_time =
507 get_or_create_property(dw_report.properties, "enable_time", Builder::PVK_TIME)->value.time_prop();
508 const Builder::TimeStamp dw_last_discovery_time =
509 get_or_create_property(dw_report.properties, "last_discovery_time", Builder::PVK_TIME)->value.time_prop();
511 if (!(ZERO < dw_enable_time && ZERO < dw_last_discovery_time)) {
512 ++worker_report.undermatched_writers;
517 } catch (...) {
518 std::cerr << "Unknown exception caught trying to consolidate statistics" << std::endl;
519 return 5;
522 // If requested, write out worker report to file
524 if (!report_file_fullname.empty()) {
525 idl_2_json(worker_report, report_file, max_decimal_places);
528 // Log / print a few of the stats
530 Log::log() << std::endl << "--- Process Statistics ---" << std::endl << std::endl;
532 Log::log() << "construction time: " << worker_report.construction_time << std::endl;
533 Log::log() << "enable time: " << worker_report.enable_time << std::endl;
534 Log::log() << "start time: " << worker_report.start_time << std::endl;
535 Log::log() << "stop time: " << worker_report.stop_time << std::endl;
536 Log::log() << "destruction time: " << worker_report.destruction_time << std::endl;
538 Log::log() << std::endl << "--- Discovery Statistics ---" << std::endl << std::endl;
540 Log::log() << "undermatched readers: " << worker_report.undermatched_readers << std::endl;
541 Log::log() << "undermatched writers: " << worker_report.undermatched_writers << std::endl << std::endl;
543 return 0;