2 * ring buffer tester and benchmark
4 * Copyright (C) 2009 Steven Rostedt <srostedt@redhat.com>
6 #include <linux/ring_buffer.h>
7 #include <linux/completion.h>
8 #include <linux/kthread.h>
9 #include <linux/module.h>
10 #include <linux/time.h>
18 /* run time and sleep time in seconds */
22 /* number of events for writer to wake up the reader */
23 static int wakeup_interval
= 100;
25 static int reader_finish
;
26 static struct completion read_start
;
27 static struct completion read_done
;
29 static struct ring_buffer
*buffer
;
30 static struct task_struct
*producer
;
31 static struct task_struct
*consumer
;
32 static unsigned long read
;
34 static int disable_reader
;
35 module_param(disable_reader
, uint
, 0644);
36 MODULE_PARM_DESC(disable_reader
, "only run producer");
38 static int write_iteration
= 50;
39 module_param(write_iteration
, uint
, 0644);
40 MODULE_PARM_DESC(write_iteration
, "# of writes between timestamp readings");
42 static int read_events
;
59 static enum event_status
read_event(int cpu
)
61 struct ring_buffer_event
*event
;
65 event
= ring_buffer_consume(buffer
, cpu
, &ts
);
69 entry
= ring_buffer_event_data(event
);
79 static enum event_status
read_page(int cpu
)
81 struct ring_buffer_event
*event
;
82 struct rb_page
*rpage
;
90 bpage
= ring_buffer_alloc_read_page(buffer
);
94 ret
= ring_buffer_read_page(buffer
, &bpage
, PAGE_SIZE
, cpu
, 1);
97 commit
= local_read(&rpage
->commit
);
98 for (i
= 0; i
< commit
&& !kill_test
; i
+= inc
) {
100 if (i
>= (PAGE_SIZE
- offsetof(struct rb_page
, data
))) {
106 event
= (void *)&rpage
->data
[i
];
107 switch (event
->type_len
) {
108 case RINGBUF_TYPE_PADDING
:
109 /* failed writes may be discarded events */
110 if (!event
->time_delta
)
112 inc
= event
->array
[0] + 4;
114 case RINGBUF_TYPE_TIME_EXTEND
:
118 entry
= ring_buffer_event_data(event
);
124 if (!event
->array
[0]) {
128 inc
= event
->array
[0] + 4;
131 entry
= ring_buffer_event_data(event
);
137 inc
= ((event
->type_len
+ 1) * 4);
148 ring_buffer_free_read_page(buffer
, bpage
);
151 return EVENT_DROPPED
;
155 static void ring_buffer_consumer(void)
157 /* toggle between reading pages and events */
161 while (!reader_finish
&& !kill_test
) {
168 for_each_online_cpu(cpu
) {
169 enum event_status stat
;
172 stat
= read_event(cpu
);
174 stat
= read_page(cpu
);
178 if (stat
== EVENT_FOUND
)
181 } while (found
&& !kill_test
);
183 set_current_state(TASK_INTERRUPTIBLE
);
188 __set_current_state(TASK_RUNNING
);
191 complete(&read_done
);
194 static void ring_buffer_producer(void)
196 struct timeval start_tv
;
197 struct timeval end_tv
;
198 unsigned long long time
;
199 unsigned long long entries
;
200 unsigned long long overruns
;
201 unsigned long missed
= 0;
202 unsigned long hit
= 0;
207 * Hammer the buffer for 10 secs (this may
208 * make the system stall)
210 trace_printk("Starting ring buffer hammer\n");
211 do_gettimeofday(&start_tv
);
213 struct ring_buffer_event
*event
;
217 for (i
= 0; i
< write_iteration
; i
++) {
218 event
= ring_buffer_lock_reserve(buffer
, 10);
223 entry
= ring_buffer_event_data(event
);
224 *entry
= smp_processor_id();
225 ring_buffer_unlock_commit(buffer
, event
);
228 do_gettimeofday(&end_tv
);
231 if (consumer
&& !(cnt
% wakeup_interval
))
232 wake_up_process(consumer
);
234 #ifndef CONFIG_PREEMPT
236 * If we are a non preempt kernel, the 10 second run will
237 * stop everything while it runs. Instead, we will call
238 * cond_resched and also add any time that was lost by a
241 * Do a cond resched at the same frequency we would wake up
244 if (cnt
% wakeup_interval
)
248 } while (end_tv
.tv_sec
< (start_tv
.tv_sec
+ RUN_TIME
) && !kill_test
);
249 trace_printk("End ring buffer hammer\n");
252 /* Init both completions here to avoid races */
253 init_completion(&read_start
);
254 init_completion(&read_done
);
255 /* the completions must be visible before the finish var */
258 /* finish var visible before waking up the consumer */
260 wake_up_process(consumer
);
261 wait_for_completion(&read_done
);
264 time
= end_tv
.tv_sec
- start_tv
.tv_sec
;
265 time
*= USEC_PER_SEC
;
266 time
+= (long long)((long)end_tv
.tv_usec
- (long)start_tv
.tv_usec
);
268 entries
= ring_buffer_entries(buffer
);
269 overruns
= ring_buffer_overruns(buffer
);
272 trace_printk("ERROR!\n");
273 trace_printk("Time: %lld (usecs)\n", time
);
274 trace_printk("Overruns: %lld\n", overruns
);
276 trace_printk("Read: (reader disabled)\n");
278 trace_printk("Read: %ld (by %s)\n", read
,
279 read_events
? "events" : "pages");
280 trace_printk("Entries: %lld\n", entries
);
281 trace_printk("Total: %lld\n", entries
+ overruns
+ read
);
282 trace_printk("Missed: %ld\n", missed
);
283 trace_printk("Hit: %ld\n", hit
);
285 /* Convert time from usecs to millisecs */
286 do_div(time
, USEC_PER_MSEC
);
290 trace_printk("TIME IS ZERO??\n");
292 trace_printk("Entries per millisec: %ld\n", hit
);
295 /* Calculate the average time in nanosecs */
296 avg
= NSEC_PER_MSEC
/ hit
;
297 trace_printk("%ld ns per entry\n", avg
);
302 missed
/= (long)time
;
304 trace_printk("Total iterations per millisec: %ld\n",
307 /* it is possible that hit + missed will overflow and be zero */
308 if (!(hit
+ missed
)) {
309 trace_printk("hit + missed overflowed and totalled zero!\n");
310 hit
--; /* make it non zero */
313 /* Caculate the average time in nanosecs */
314 avg
= NSEC_PER_MSEC
/ (hit
+ missed
);
315 trace_printk("%ld ns per entry\n", avg
);
319 static void wait_to_die(void)
321 set_current_state(TASK_INTERRUPTIBLE
);
322 while (!kthread_should_stop()) {
324 set_current_state(TASK_INTERRUPTIBLE
);
326 __set_current_state(TASK_RUNNING
);
329 static int ring_buffer_consumer_thread(void *arg
)
331 while (!kthread_should_stop() && !kill_test
) {
332 complete(&read_start
);
334 ring_buffer_consumer();
336 set_current_state(TASK_INTERRUPTIBLE
);
337 if (kthread_should_stop() || kill_test
)
341 __set_current_state(TASK_RUNNING
);
343 __set_current_state(TASK_RUNNING
);
351 static int ring_buffer_producer_thread(void *arg
)
353 init_completion(&read_start
);
355 while (!kthread_should_stop() && !kill_test
) {
356 ring_buffer_reset(buffer
);
360 wake_up_process(consumer
);
361 wait_for_completion(&read_start
);
364 ring_buffer_producer();
366 trace_printk("Sleeping for 10 secs\n");
367 set_current_state(TASK_INTERRUPTIBLE
);
368 schedule_timeout(HZ
* SLEEP_TIME
);
369 __set_current_state(TASK_RUNNING
);
378 static int __init
ring_buffer_benchmark_init(void)
382 /* make a one meg buffer in overwite mode */
383 buffer
= ring_buffer_alloc(1000000, RB_FL_OVERWRITE
);
387 if (!disable_reader
) {
388 consumer
= kthread_create(ring_buffer_consumer_thread
,
389 NULL
, "rb_consumer");
390 ret
= PTR_ERR(consumer
);
391 if (IS_ERR(consumer
))
395 producer
= kthread_run(ring_buffer_producer_thread
,
396 NULL
, "rb_producer");
397 ret
= PTR_ERR(producer
);
399 if (IS_ERR(producer
))
403 * Run them as low-prio background tasks by default:
405 set_user_nice(consumer
, 19);
406 set_user_nice(producer
, 19);
412 kthread_stop(consumer
);
415 ring_buffer_free(buffer
);
419 static void __exit
ring_buffer_benchmark_exit(void)
421 kthread_stop(producer
);
423 kthread_stop(consumer
);
424 ring_buffer_free(buffer
);
427 module_init(ring_buffer_benchmark_init
);
428 module_exit(ring_buffer_benchmark_exit
);
430 MODULE_AUTHOR("Steven Rostedt");
431 MODULE_DESCRIPTION("ring_buffer_benchmark");
432 MODULE_LICENSE("GPL");