3 #pragma GCC diagnostic push
4 # if defined(__has_warning)
5 # if __has_warning("-Wclass-memaccess")
6 # pragma GCC diagnostic ignored "-Wclass-memaccess"
10 #include "BenchTypeSupportImpl.h"
12 #pragma GCC diagnostic pop
14 #include "BuilderTypeSupportImpl.h"
16 #include "ListenerFactory.h"
18 #include "BuilderProcess.h"
19 #include "DataReader.h"
20 #include "DataReaderListener.h"
21 #include "DataWriter.h"
22 #include "DataWriterListener.h"
23 #include "ParticipantListener.h"
24 #include "PublisherListener.h"
25 #include "SubscriberListener.h"
26 #include "TopicListener.h"
29 #include "PropertyStatBlock.h"
31 #include "ActionManager.h"
32 #include "ForwardAction.h"
33 #include "ReadAction.h"
34 #include "SetCftParametersAction.h"
35 #include "WorkerDataReaderListener.h"
36 #include "WorkerDataWriterListener.h"
37 #include "WorkerTopicListener.h"
38 #include "WorkerSubscriberListener.h"
39 #include "WorkerPublisherListener.h"
40 #include "WorkerParticipantListener.h"
41 #include "WriteAction.h"
44 #include <json_conversion.h>
46 #include <dds/DCPS/Service_Participant.h>
47 #include <dds/DCPS/transport/framework/TransportRegistry.h>
48 #ifdef ACE_AS_STATIC_LIBS
49 # include <dds/DCPS/RTPS/RtpsDiscovery.h>
50 # include <dds/DCPS/transport/rtps_udp/RtpsUdp.h>
51 # include <dds/DCPS/transport/udp/Udp.h>
52 # include <dds/DCPS/transport/tcp/Tcp.h>
53 # include <dds/DCPS/transport/multicast/Multicast.h>
54 # ifdef OPENDDS_SECURITY
55 # include <dds/DCPS/security/BuiltInPlugins.h>
59 #include <ace/Proactor.h>
60 #ifdef ACE_HAS_AIO_CALLS
61 # include <ace/POSIX_CB_Proactor.h>
74 using Bench::get_option_argument
;
76 const size_t DEFAULT_MAX_DECIMAL_PLACES
= 9u;
77 const size_t DEFAULT_THREAD_POOL_SIZE
= 4u;
79 void do_wait(const Builder::TimeStamp
& ts
, const std::string
& ts_name
, bool zero_equals_key_press
= true) {
80 if (zero_equals_key_press
&& ts
== ZERO
) {
82 ss
<< "No " << ts_name
<< " time specified. Press enter to continue." << std::endl
;
83 std::cerr
<< ss
.str() << std::flush
;
85 std::getline(std::cin
, line
);
88 auto duration
= -1 * get_duration(ts
);
89 if (duration
> std::chrono::milliseconds(100)) {
90 std::this_thread::sleep_until(std::chrono::steady_clock::now() + duration
);
93 auto now
= std::chrono::system_clock::now();
94 auto duration
= std::chrono::system_clock::time_point(get_duration(ts
)) - now
;
95 if (duration
> std::chrono::milliseconds(100)) {
96 std::this_thread::sleep_until(std::chrono::steady_clock::now() + duration
);
102 int ACE_TMAIN(int argc
, ACE_TCHAR
* argv
[]) {
103 Builder::NullStream null_stream_i
;
104 std::ostream
null_stream(&null_stream_i
);
106 std::string log_file_fullname
;
107 std::string report_file_fullname
;
108 std::string config_file_fullname
;
109 std::string config_file_basename
;
112 for (int i
= 1; i
< argc
; i
++) {
113 const ACE_TCHAR
* argument
= argv
[i
];
114 if (!ACE_OS::strcmp(argv
[i
], ACE_TEXT("--log"))) {
115 log_file_fullname
= get_option_argument(i
, argc
, argv
);
116 } else if (!ACE_OS::strcmp(argv
[i
], ACE_TEXT("--report"))) {
117 report_file_fullname
= get_option_argument(i
, argc
, argv
);
118 } else if (config_file_fullname
.empty()) {
119 config_file_fullname
= ACE_TEXT_ALWAYS_CHAR(argument
);
120 config_file_basename
= ACE_TEXT_ALWAYS_CHAR(ACE::basename(argument
));
122 std::cerr
<< "Invalid option: " << argument
<< std::endl
;
127 if (config_file_fullname
.empty()) {
128 std::cerr
<< "Must pass a configuration file" << std::endl
;
131 } catch (int value
) {
132 std::cerr
<< "See DDS_ROOT/performance-tests/bench/README.md for usage" << std::endl
;
136 std::ifstream
config_file(config_file_fullname
);
137 if (!config_file
.is_open()) {
138 std::cerr
<< "Unable to open configuration file: '" << config_file_fullname
<< "'" << std::endl
;
142 std::ofstream log_file
;
143 if (!log_file_fullname
.empty()) {
144 log_file
.open(log_file_fullname
, ios::app
);
145 if (!log_file
.good()) {
146 std::cerr
<< "Unable to open log file: '" << log_file_fullname
<< "'" << std::endl
;
149 Log::stream
= &log_file
;
151 Log::stream
= &std::cout
;
154 std::ofstream report_file
;
155 if (!report_file_fullname
.empty()) {
156 report_file
.open(report_file_fullname
);
157 if (!report_file
.good()) {
158 std::cerr
<< "Unable to open report file: '" << report_file_fullname
<< "'" << std::endl
;
165 Bench::WorkerConfig config
{};
167 config
.create_time
= ZERO
;
168 config
.enable_time
= ZERO
;
169 config
.start_time
= ZERO
;
170 config
.stop_time
= ZERO
;
171 config
.wait_for_discovery
= false;
172 config
.wait_for_discovery_seconds
= 0;
174 if (!json_2_idl(config_file
, config
)) {
175 std::cerr
<< "Unable to parse configuration" << std::endl
;
179 config
.process
.name
= config_file_basename
.c_str();
181 // Bad-actor test & debugging options for node & test controllers
183 Builder::ConstPropertyIndex force_worker_segfault_prop
=
184 get_property(config
.properties
, "force_worker_segfault", Builder::PVK_ULL
);
185 if (force_worker_segfault_prop
) {
186 if (force_worker_segfault_prop
->value
.ull_prop()) {
187 DDS::DataReaderListener
* drl_ptr
= 0;
188 drl_ptr
->on_data_available(0);
192 Builder::ConstPropertyIndex force_worker_assert_prop
=
193 get_property(config
.properties
, "force_worker_assert", Builder::PVK_ULL
);
194 if (force_worker_assert_prop
) {
195 if (force_worker_assert_prop
->value
.ull_prop()) {
196 OPENDDS_ASSERT(false);
200 Builder::ConstPropertyIndex force_worker_deadlock_prop
=
201 get_property(config
.properties
, "force_worker_deadlock", Builder::PVK_ULL
);
202 if (force_worker_deadlock_prop
) {
203 if (force_worker_deadlock_prop
->value
.ull_prop()) {
205 std::unique_lock
<std::mutex
> l(m
);
207 std::unique_lock
<std::mutex
> tl(m
);
213 // Register some Bench-specific types
214 Builder::TypeSupportRegistry::TypeSupportRegistration
215 process_config_registration(new Builder::ProcessConfigTypeSupportImpl());
216 Builder::TypeSupportRegistry::TypeSupportRegistration
217 data_registration(new Bench::DataTypeSupportImpl());
219 // Register some Bench-specific listener factories
221 Builder::register_topic_listener("bench_tl", [](const Builder::PropertySeq
& properties
){
222 return DDS::TopicListener_var(new Bench::WorkerTopicListener(properties
));
224 Builder::register_datareader_listener("bench_drl", [](const Builder::PropertySeq
& properties
){
225 return DDS::DataReaderListener_var(new Bench::WorkerDataReaderListener(properties
));
227 Builder::register_subscriber_listener("bench_sl", [](const Builder::PropertySeq
& properties
){
228 return DDS::SubscriberListener_var(new Bench::WorkerSubscriberListener(properties
));
230 Builder::register_datawriter_listener("bench_dwl", [](const Builder::PropertySeq
& properties
){
231 return DDS::DataWriterListener_var(new Bench::WorkerDataWriterListener(properties
));
233 Builder::register_publisher_listener("bench_pl", [](const Builder::PropertySeq
& properties
){
234 return DDS::PublisherListener_var(new Bench::WorkerPublisherListener(properties
));
236 Builder::register_domain_participant_listener("bench_partl", [](const Builder::PropertySeq
& properties
){
237 return DDS::DomainParticipantListener_var(new Bench::WorkerParticipantListener(properties
));
239 } catch (const std::exception
& e
) {
240 std::cerr
<< "Exception caught trying to register listener factories: " << e
.what() << std::endl
;
244 // Disable some Proactor debug chatter to stdout (eventually make this configurable?)
245 ACE_Log_Category::ace_lib().priority_mask(0);
247 #ifdef ACE_HAS_AIO_CALLS
248 Builder::ConstPropertyIndex use_aio_proactor_prop
=
249 get_property(config
.properties
, "use_aio_proactor", Builder::PVK_ULL
);
252 std::shared_ptr
<ACE_Proactor
> proactor
;
253 #ifdef ACE_HAS_AIO_CALLS
254 if (use_aio_proactor_prop
&& use_aio_proactor_prop
->value
.ull_prop()) {
255 proactor
.reset(new ACE_Proactor(new ACE_POSIX_AIOCB_Proactor()));
258 proactor
.reset(new ACE_Proactor());
259 #ifdef ACE_HAS_AIO_CALLS
263 int max_decimal_places
= DEFAULT_MAX_DECIMAL_PLACES
;
264 Builder::ConstPropertyIndex max_decimal_places_prop
=
265 get_property(config
.properties
, "max_decimal_places", Builder::PVK_ULL
);
266 if (max_decimal_places_prop
) {
267 max_decimal_places
= static_cast<int>(max_decimal_places_prop
->value
.ull_prop());
270 size_t action_thread_pool_size
= std::min(static_cast<size_t>(config
.actions
.length()), DEFAULT_THREAD_POOL_SIZE
);
271 Builder::ConstPropertyIndex action_thread_pool_size_prop
=
272 get_property(config
.properties
, "action_thread_pool_size", Builder::PVK_ULL
);
273 if (action_thread_pool_size_prop
) {
274 action_thread_pool_size
= static_cast<size_t>(action_thread_pool_size_prop
->value
.ull_prop());
277 size_t redirect_ace_log
= 1;
278 Builder::ConstPropertyIndex redirect_ace_log_prop
=
279 get_property(config
.properties
, "redirect_ace_log", Builder::PVK_ULL
);
280 if (redirect_ace_log_prop
) {
281 redirect_ace_log
= static_cast<size_t>(redirect_ace_log_prop
->value
.ull_prop());
284 if (redirect_ace_log
&& !log_file_fullname
.empty()) {
285 std::ofstream
* output_stream
= new std::ofstream(log_file_fullname
.c_str(), ios::app
);
286 if (output_stream
->bad()) {
287 delete output_stream
;
289 ACE_LOG_MSG
->msg_ostream(output_stream
, true);
291 ACE_LOG_MSG
->clr_flags(ACE_Log_Msg::STDERR
| ACE_Log_Msg::LOGGER
);
292 ACE_LOG_MSG
->set_flags(ACE_Log_Msg::OSTREAM
);
296 Bench::ActionManager::Registration
297 write_action_registration("write", [&](){
298 return std::shared_ptr
<Bench::Action
>(new Bench::WriteAction(*proactor
));
300 Bench::ActionManager::Registration
301 read_action_registration("read", [&](){
302 return std::shared_ptr
<Bench::Action
>(new Bench::ReadAction(*proactor
));
304 Bench::ActionManager::Registration
305 forward_action_registration("forward", [&](){
306 return std::shared_ptr
<Bench::Action
>(new Bench::ForwardAction(*proactor
));
308 Bench::ActionManager::Registration
309 set_cft_parameters_action_registration("set_cft_parameters", [&]() {
310 return std::shared_ptr
<Bench::Action
>(new Bench::SetCftParametersAction(*proactor
));
313 // Timestamps used to measure method call durations
314 Builder::TimeStamp process_construction_begin_time
= ZERO
, process_construction_end_time
= ZERO
;
315 Builder::TimeStamp process_enable_begin_time
= ZERO
, process_enable_end_time
= ZERO
;
316 Builder::TimeStamp process_start_begin_time
= ZERO
, process_start_end_time
= ZERO
;
317 Builder::TimeStamp process_stop_begin_time
= ZERO
, process_stop_end_time
= ZERO
;
318 Builder::TimeStamp process_destruction_begin_time
= ZERO
, process_destruction_end_time
= ZERO
;
319 Builder::TimeStamp process_start_discovery_time
= ZERO
, process_stop_discovery_time
= ZERO
;
321 set_global_properties(config
.properties
);
323 Bench::WorkerReport worker_report
{};
324 Builder::ProcessReport
& process_report
= worker_report
.process_report
;
326 const size_t thread_pool_size
= static_cast<size_t>(action_thread_pool_size
);
327 std::vector
<std::shared_ptr
<std::thread
> > thread_pool
;
328 for (size_t i
= 0; i
< thread_pool_size
; ++i
) {
329 thread_pool
.emplace_back(std::make_shared
<std::thread
>([&](){ proactor
->proactor_run_event_loop(); }));
335 do_wait(config
.create_time
, "create", false);
337 Log::log() << "Beginning process construction / entity creation." << std::endl
;
339 process_construction_begin_time
= Builder::get_hr_time();
340 Builder::BuilderProcess
process(config
.process
);
341 process_construction_end_time
= Builder::get_hr_time();
343 Log::log() << std::endl
<< "Process construction / entity creation complete." << std::endl
<< std::endl
;
345 Log::log() << "Beginning action construction / initialization." << std::endl
;
347 Bench::ActionManager
am(config
.actions
, config
.action_reports
, process
.get_reader_map(), process
.get_writer_map(), process
.get_cft_map());
349 Log::log() << "Action construction / initialization complete." << std::endl
<< std::endl
;
351 do_wait(config
.enable_time
, "enable");
353 Log::log() << "Enabling DDS entities (if not already enabled)." << std::endl
;
355 process_enable_begin_time
= Builder::get_hr_time();
356 process
.enable_dds_entities(true);
357 process_enable_end_time
= Builder::get_hr_time();
359 Log::log() << "DDS entities enabled." << std::endl
<< std::endl
;
361 if (config
.wait_for_discovery
) {
363 Log::log() << "Starting Discovery Check." << std::endl
;
365 process_start_discovery_time
= Builder::get_sys_time();
367 if (config
.wait_for_discovery_seconds
> 0) {
369 const std::chrono::seconds
timeoutPeriod(config
.wait_for_discovery_seconds
);
370 const std::chrono::system_clock::time_point timeout_time
= std::chrono::system_clock::now() + timeoutPeriod
;
372 auto readMap
= process
.get_reader_map();
373 if (readMap
.size() > 0) {
374 typedef std::map
<std::string
, std::shared_ptr
<Builder::DataReader
>>::iterator ReadMapIt
;
375 std::shared_ptr
<Builder::DataReader
> dtRdrPtr(nullptr);
377 for (ReadMapIt it
= readMap
.begin(); it
!= readMap
.end(); ++it
) {
378 dtRdrPtr
= it
->second
;
379 Bench::WorkerDataReaderListener
* wdrl
= dynamic_cast<Bench::WorkerDataReaderListener
*>(dtRdrPtr
->get_dds_datareaderlistener().in());
381 if (!wdrl
->wait_for_expected_match(timeout_time
)) {
382 Log::log() << "Error: " << it
->first
<< " Expected writers not found." << std::endl
<< std::endl
;
387 auto writeMap
= process
.get_writer_map();
388 if (writeMap
.size() > 0) {
389 typedef std::map
<std::string
, std::shared_ptr
<Builder::DataWriter
>>::iterator WriteMapIt
;
390 std::shared_ptr
<Builder::DataWriter
> dtWtrPtr(nullptr);
392 for (WriteMapIt it
= writeMap
.begin(); it
!= writeMap
.end(); ++it
) {
393 dtWtrPtr
= it
->second
;
394 Bench::WorkerDataWriterListener
* wdwl
= dynamic_cast<Bench::WorkerDataWriterListener
*>(dtWtrPtr
->get_dds_datawriterlistener().in());
396 if (!wdwl
->wait_for_expected_match(timeout_time
)) {
397 Log::log() << "Error: " << it
->first
<< " Expected readers not found." << std::endl
<< std::endl
;
403 process_stop_discovery_time
= Builder::get_sys_time();
405 Log::log() << "Discovery of expected entities took " << process_stop_discovery_time
- process_start_discovery_time
<< " seconds." << std::endl
<< std::endl
;
408 Log::log() << "Initializing process actions." << std::endl
;
412 do_wait(config
.start_time
, "start");
414 Log::log() << "Starting process actions." << std::endl
;
416 process_start_begin_time
= Builder::get_hr_time();
418 process_start_end_time
= Builder::get_hr_time();
420 Log::log() << "Process tests started." << std::endl
<< std::endl
;
422 do_wait(config
.stop_time
, "stop");
424 Log::log() << "Stopping process tests." << std::endl
;
426 process_stop_begin_time
= Builder::get_hr_time();
428 process_stop_end_time
= Builder::get_hr_time();
430 Log::log() << "Process tests stopped." << std::endl
<< std::endl
;
432 do_wait(config
.destruction_time
, "destruction");
434 Log::log() << "Stopping process actions." << std::endl
;
438 proactor
->proactor_end_event_loop();
439 for (size_t i
= 0; i
< thread_pool_size
; ++i
) {
440 thread_pool
[i
]->join();
444 Log::log() << "Detaching Listeners." << std::endl
;
446 process
.detach_listeners();
448 process_report
= process
.get_report();
450 Log::log() << "Beginning process destruction / entity deletion." << std::endl
;
452 process_destruction_begin_time
= Builder::get_hr_time();
453 } catch (const std::exception
& e
) {
454 std::cerr
<< "Exception caught trying to execute test sequence: " << e
.what() << std::endl
;
455 proactor
->proactor_end_event_loop();
456 for (size_t i
= 0; i
< thread_pool_size
; ++i
) {
457 thread_pool
[i
]->join();
460 TheServiceParticipant
->shutdown();
463 std::cerr
<< "Unknown exception caught trying to execute test sequence" << std::endl
;
464 proactor
->proactor_end_event_loop();
465 for (size_t i
= 0; i
< thread_pool_size
; ++i
) {
466 thread_pool
[i
]->join();
469 TheServiceParticipant
->shutdown();
472 process_destruction_end_time
= Builder::get_hr_time();
474 Log::log() << "Process destruction / entity deletion complete." << std::endl
<< std::endl
;
476 // Some preliminary measurements and reporting (eventually will shift to another process?)
477 worker_report
.construction_time
= process_construction_end_time
- process_construction_begin_time
;
478 worker_report
.enable_time
= process_enable_end_time
- process_enable_begin_time
;
479 worker_report
.start_time
= process_start_end_time
- process_start_begin_time
;
480 worker_report
.stop_time
= process_stop_end_time
- process_stop_begin_time
;
481 worker_report
.destruction_time
= process_destruction_end_time
- process_destruction_begin_time
;
482 worker_report
.undermatched_readers
= 0;
483 worker_report
.undermatched_writers
= 0;
486 for (CORBA::ULong i
= 0; i
< process_report
.participants
.length(); ++i
) {
487 for (CORBA::ULong j
= 0; j
< process_report
.participants
[i
].subscribers
.length(); ++j
) {
488 for (CORBA::ULong k
= 0; k
< process_report
.participants
[i
].subscribers
[j
].datareaders
.length(); ++k
) {
489 Builder::DataReaderReport
& dr_report
= process_report
.participants
[i
].subscribers
[j
].datareaders
[k
];
491 const Builder::TimeStamp dr_enable_time
=
492 get_or_create_property(dr_report
.properties
, "enable_time", Builder::PVK_TIME
)->value
.time_prop();
493 const Builder::TimeStamp dr_last_discovery_time
=
494 get_or_create_property(dr_report
.properties
, "last_discovery_time", Builder::PVK_TIME
)->value
.time_prop();
496 if (!(ZERO
< dr_enable_time
&& ZERO
< dr_last_discovery_time
)) {
497 ++worker_report
.undermatched_readers
;
502 for (CORBA::ULong j
= 0; j
< process_report
.participants
[i
].publishers
.length(); ++j
) {
503 for (CORBA::ULong k
= 0; k
< process_report
.participants
[i
].publishers
[j
].datawriters
.length(); ++k
) {
504 Builder::DataWriterReport
& dw_report
= process_report
.participants
[i
].publishers
[j
].datawriters
[k
];
506 const Builder::TimeStamp dw_enable_time
=
507 get_or_create_property(dw_report
.properties
, "enable_time", Builder::PVK_TIME
)->value
.time_prop();
508 const Builder::TimeStamp dw_last_discovery_time
=
509 get_or_create_property(dw_report
.properties
, "last_discovery_time", Builder::PVK_TIME
)->value
.time_prop();
511 if (!(ZERO
< dw_enable_time
&& ZERO
< dw_last_discovery_time
)) {
512 ++worker_report
.undermatched_writers
;
518 std::cerr
<< "Unknown exception caught trying to consolidate statistics" << std::endl
;
522 // If requested, write out worker report to file
524 if (!report_file_fullname
.empty()) {
525 idl_2_json(worker_report
, report_file
, max_decimal_places
);
528 // Log / print a few of the stats
530 Log::log() << std::endl
<< "--- Process Statistics ---" << std::endl
<< std::endl
;
532 Log::log() << "construction time: " << worker_report
.construction_time
<< std::endl
;
533 Log::log() << "enable time: " << worker_report
.enable_time
<< std::endl
;
534 Log::log() << "start time: " << worker_report
.start_time
<< std::endl
;
535 Log::log() << "stop time: " << worker_report
.stop_time
<< std::endl
;
536 Log::log() << "destruction time: " << worker_report
.destruction_time
<< std::endl
;
538 Log::log() << std::endl
<< "--- Discovery Statistics ---" << std::endl
<< std::endl
;
540 Log::log() << "undermatched readers: " << worker_report
.undermatched_readers
<< std::endl
;
541 Log::log() << "undermatched writers: " << worker_report
.undermatched_writers
<< std::endl
<< std::endl
;