4 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>
6 #include <linux/ring_buffer.h>
7 #include <linux/spinlock.h>
8 #include <linux/debugfs.h>
9 #include <linux/uaccess.h>
10 #include <linux/module.h>
11 #include <linux/percpu.h>
12 #include <linux/mutex.h>
13 #include <linux/sched.h> /* used for sched_clock() (for now) */
14 #include <linux/init.h>
15 #include <linux/hash.h>
16 #include <linux/list.h>
21 /* Global flag to disable all recording to ring buffers */
22 static int ring_buffers_off __read_mostly
;
25 * tracing_on - enable all tracing buffers
27 * This function enables all tracing buffers that may have been
28 * disabled with tracing_off.
34 EXPORT_SYMBOL_GPL(tracing_on
);
37 * tracing_off - turn off all tracing buffers
39 * This function stops all tracing buffers from recording data.
40 * It does not disable any overhead the tracers themselves may
41 * be causing. This function simply causes all recording to
42 * the ring buffers to fail.
44 void tracing_off(void)
48 EXPORT_SYMBOL_GPL(tracing_off
);
50 /* Up this if you want to test the TIME_EXTENTS and normalization */
54 u64
ring_buffer_time_stamp(int cpu
)
58 preempt_disable_notrace();
59 /* shift to debug/test normalization and TIME_EXTENTS */
60 time
= sched_clock() << DEBUG_SHIFT
;
61 preempt_enable_notrace();
65 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp
);
67 void ring_buffer_normalize_time_stamp(int cpu
, u64
*ts
)
69 /* Just stupid testing the normalize function and deltas */
72 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp
);
74 #define RB_EVNT_HDR_SIZE (sizeof(struct ring_buffer_event))
75 #define RB_ALIGNMENT_SHIFT 2
76 #define RB_ALIGNMENT (1 << RB_ALIGNMENT_SHIFT)
77 #define RB_MAX_SMALL_DATA 28
80 RB_LEN_TIME_EXTEND
= 8,
81 RB_LEN_TIME_STAMP
= 16,
84 /* inline for ring buffer fast paths */
85 static inline unsigned
86 rb_event_length(struct ring_buffer_event
*event
)
90 switch (event
->type
) {
91 case RINGBUF_TYPE_PADDING
:
95 case RINGBUF_TYPE_TIME_EXTEND
:
96 return RB_LEN_TIME_EXTEND
;
98 case RINGBUF_TYPE_TIME_STAMP
:
99 return RB_LEN_TIME_STAMP
;
101 case RINGBUF_TYPE_DATA
:
103 length
= event
->len
<< RB_ALIGNMENT_SHIFT
;
105 length
= event
->array
[0];
106 return length
+ RB_EVNT_HDR_SIZE
;
115 * ring_buffer_event_length - return the length of the event
116 * @event: the event to get the length of
118 unsigned ring_buffer_event_length(struct ring_buffer_event
*event
)
120 unsigned length
= rb_event_length(event
);
121 if (event
->type
!= RINGBUF_TYPE_DATA
)
123 length
-= RB_EVNT_HDR_SIZE
;
124 if (length
> RB_MAX_SMALL_DATA
+ sizeof(event
->array
[0]))
125 length
-= sizeof(event
->array
[0]);
128 EXPORT_SYMBOL_GPL(ring_buffer_event_length
);
130 /* inline for ring buffer fast paths */
132 rb_event_data(struct ring_buffer_event
*event
)
134 BUG_ON(event
->type
!= RINGBUF_TYPE_DATA
);
135 /* If length is in len field, then array[0] has the data */
137 return (void *)&event
->array
[0];
138 /* Otherwise length is in array[0] and array[1] has the data */
139 return (void *)&event
->array
[1];
143 * ring_buffer_event_data - return the data of the event
144 * @event: the event to get the data from
146 void *ring_buffer_event_data(struct ring_buffer_event
*event
)
148 return rb_event_data(event
);
150 EXPORT_SYMBOL_GPL(ring_buffer_event_data
);
152 #define for_each_buffer_cpu(buffer, cpu) \
153 for_each_cpu_mask(cpu, buffer->cpumask)
156 #define TS_MASK ((1ULL << TS_SHIFT) - 1)
157 #define TS_DELTA_TEST (~TS_MASK)
160 * This hack stolen from mm/slob.c.
161 * We can store per page timing information in the page frame of the page.
162 * Thanks to Peter Zijlstra for suggesting this idea.
165 u64 time_stamp
; /* page time stamp */
166 local_t write
; /* index for next write */
167 local_t commit
; /* write commited index */
168 unsigned read
; /* index for next read */
169 struct list_head list
; /* list of free pages */
170 void *page
; /* Actual data page */
174 * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing
177 static inline void free_buffer_page(struct buffer_page
*bpage
)
180 free_page((unsigned long)bpage
->page
);
185 * We need to fit the time_stamp delta into 27 bits.
187 static inline int test_time_stamp(u64 delta
)
189 if (delta
& TS_DELTA_TEST
)
194 #define BUF_PAGE_SIZE PAGE_SIZE
197 * head_page == tail_page && head == tail then buffer is empty.
199 struct ring_buffer_per_cpu
{
201 struct ring_buffer
*buffer
;
203 struct lock_class_key lock_key
;
204 struct list_head pages
;
205 struct buffer_page
*head_page
; /* read from head */
206 struct buffer_page
*tail_page
; /* write to tail */
207 struct buffer_page
*commit_page
; /* commited pages */
208 struct buffer_page
*reader_page
;
209 unsigned long overrun
;
210 unsigned long entries
;
213 atomic_t record_disabled
;
222 atomic_t record_disabled
;
226 struct ring_buffer_per_cpu
**buffers
;
229 struct ring_buffer_iter
{
230 struct ring_buffer_per_cpu
*cpu_buffer
;
232 struct buffer_page
*head_page
;
236 #define RB_WARN_ON(buffer, cond) \
238 if (unlikely(cond)) { \
239 atomic_inc(&buffer->record_disabled); \
244 #define RB_WARN_ON_RET(buffer, cond) \
246 if (unlikely(cond)) { \
247 atomic_inc(&buffer->record_disabled); \
253 #define RB_WARN_ON_ONCE(buffer, cond) \
256 if (unlikely(cond) && !once) { \
258 atomic_inc(&buffer->record_disabled); \
264 * check_pages - integrity check of buffer pages
265 * @cpu_buffer: CPU buffer with pages to test
267 * As a safty measure we check to make sure the data pages have not
270 static int rb_check_pages(struct ring_buffer_per_cpu
*cpu_buffer
)
272 struct list_head
*head
= &cpu_buffer
->pages
;
273 struct buffer_page
*page
, *tmp
;
275 RB_WARN_ON_RET(cpu_buffer
, head
->next
->prev
!= head
);
276 RB_WARN_ON_RET(cpu_buffer
, head
->prev
->next
!= head
);
278 list_for_each_entry_safe(page
, tmp
, head
, list
) {
279 RB_WARN_ON_RET(cpu_buffer
,
280 page
->list
.next
->prev
!= &page
->list
);
281 RB_WARN_ON_RET(cpu_buffer
,
282 page
->list
.prev
->next
!= &page
->list
);
288 static int rb_allocate_pages(struct ring_buffer_per_cpu
*cpu_buffer
,
291 struct list_head
*head
= &cpu_buffer
->pages
;
292 struct buffer_page
*page
, *tmp
;
297 for (i
= 0; i
< nr_pages
; i
++) {
298 page
= kzalloc_node(ALIGN(sizeof(*page
), cache_line_size()),
299 GFP_KERNEL
, cpu_to_node(cpu_buffer
->cpu
));
302 list_add(&page
->list
, &pages
);
304 addr
= __get_free_page(GFP_KERNEL
);
307 page
->page
= (void *)addr
;
310 list_splice(&pages
, head
);
312 rb_check_pages(cpu_buffer
);
317 list_for_each_entry_safe(page
, tmp
, &pages
, list
) {
318 list_del_init(&page
->list
);
319 free_buffer_page(page
);
324 static struct ring_buffer_per_cpu
*
325 rb_allocate_cpu_buffer(struct ring_buffer
*buffer
, int cpu
)
327 struct ring_buffer_per_cpu
*cpu_buffer
;
328 struct buffer_page
*page
;
332 cpu_buffer
= kzalloc_node(ALIGN(sizeof(*cpu_buffer
), cache_line_size()),
333 GFP_KERNEL
, cpu_to_node(cpu
));
337 cpu_buffer
->cpu
= cpu
;
338 cpu_buffer
->buffer
= buffer
;
339 spin_lock_init(&cpu_buffer
->lock
);
340 INIT_LIST_HEAD(&cpu_buffer
->pages
);
342 page
= kzalloc_node(ALIGN(sizeof(*page
), cache_line_size()),
343 GFP_KERNEL
, cpu_to_node(cpu
));
345 goto fail_free_buffer
;
347 cpu_buffer
->reader_page
= page
;
348 addr
= __get_free_page(GFP_KERNEL
);
350 goto fail_free_reader
;
351 page
->page
= (void *)addr
;
353 INIT_LIST_HEAD(&cpu_buffer
->reader_page
->list
);
355 ret
= rb_allocate_pages(cpu_buffer
, buffer
->pages
);
357 goto fail_free_reader
;
359 cpu_buffer
->head_page
360 = list_entry(cpu_buffer
->pages
.next
, struct buffer_page
, list
);
361 cpu_buffer
->tail_page
= cpu_buffer
->commit_page
= cpu_buffer
->head_page
;
366 free_buffer_page(cpu_buffer
->reader_page
);
373 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu
*cpu_buffer
)
375 struct list_head
*head
= &cpu_buffer
->pages
;
376 struct buffer_page
*page
, *tmp
;
378 list_del_init(&cpu_buffer
->reader_page
->list
);
379 free_buffer_page(cpu_buffer
->reader_page
);
381 list_for_each_entry_safe(page
, tmp
, head
, list
) {
382 list_del_init(&page
->list
);
383 free_buffer_page(page
);
389 * Causes compile errors if the struct buffer_page gets bigger
390 * than the struct page.
392 extern int ring_buffer_page_too_big(void);
395 * ring_buffer_alloc - allocate a new ring_buffer
396 * @size: the size in bytes per cpu that is needed.
397 * @flags: attributes to set for the ring buffer.
399 * Currently the only flag that is available is the RB_FL_OVERWRITE
400 * flag. This flag means that the buffer will overwrite old data
401 * when the buffer wraps. If this flag is not set, the buffer will
402 * drop data when the tail hits the head.
404 struct ring_buffer
*ring_buffer_alloc(unsigned long size
, unsigned flags
)
406 struct ring_buffer
*buffer
;
410 /* Paranoid! Optimizes out when all is well */
411 if (sizeof(struct buffer_page
) > sizeof(struct page
))
412 ring_buffer_page_too_big();
415 /* keep it in its own cache line */
416 buffer
= kzalloc(ALIGN(sizeof(*buffer
), cache_line_size()),
421 buffer
->pages
= DIV_ROUND_UP(size
, BUF_PAGE_SIZE
);
422 buffer
->flags
= flags
;
424 /* need at least two pages */
425 if (buffer
->pages
== 1)
428 buffer
->cpumask
= cpu_possible_map
;
429 buffer
->cpus
= nr_cpu_ids
;
431 bsize
= sizeof(void *) * nr_cpu_ids
;
432 buffer
->buffers
= kzalloc(ALIGN(bsize
, cache_line_size()),
434 if (!buffer
->buffers
)
435 goto fail_free_buffer
;
437 for_each_buffer_cpu(buffer
, cpu
) {
438 buffer
->buffers
[cpu
] =
439 rb_allocate_cpu_buffer(buffer
, cpu
);
440 if (!buffer
->buffers
[cpu
])
441 goto fail_free_buffers
;
444 mutex_init(&buffer
->mutex
);
449 for_each_buffer_cpu(buffer
, cpu
) {
450 if (buffer
->buffers
[cpu
])
451 rb_free_cpu_buffer(buffer
->buffers
[cpu
]);
453 kfree(buffer
->buffers
);
459 EXPORT_SYMBOL_GPL(ring_buffer_alloc
);
462 * ring_buffer_free - free a ring buffer.
463 * @buffer: the buffer to free.
466 ring_buffer_free(struct ring_buffer
*buffer
)
470 for_each_buffer_cpu(buffer
, cpu
)
471 rb_free_cpu_buffer(buffer
->buffers
[cpu
]);
475 EXPORT_SYMBOL_GPL(ring_buffer_free
);
477 static void rb_reset_cpu(struct ring_buffer_per_cpu
*cpu_buffer
);
480 rb_remove_pages(struct ring_buffer_per_cpu
*cpu_buffer
, unsigned nr_pages
)
482 struct buffer_page
*page
;
486 atomic_inc(&cpu_buffer
->record_disabled
);
489 for (i
= 0; i
< nr_pages
; i
++) {
490 BUG_ON(list_empty(&cpu_buffer
->pages
));
491 p
= cpu_buffer
->pages
.next
;
492 page
= list_entry(p
, struct buffer_page
, list
);
493 list_del_init(&page
->list
);
494 free_buffer_page(page
);
496 BUG_ON(list_empty(&cpu_buffer
->pages
));
498 rb_reset_cpu(cpu_buffer
);
500 rb_check_pages(cpu_buffer
);
502 atomic_dec(&cpu_buffer
->record_disabled
);
507 rb_insert_pages(struct ring_buffer_per_cpu
*cpu_buffer
,
508 struct list_head
*pages
, unsigned nr_pages
)
510 struct buffer_page
*page
;
514 atomic_inc(&cpu_buffer
->record_disabled
);
517 for (i
= 0; i
< nr_pages
; i
++) {
518 BUG_ON(list_empty(pages
));
520 page
= list_entry(p
, struct buffer_page
, list
);
521 list_del_init(&page
->list
);
522 list_add_tail(&page
->list
, &cpu_buffer
->pages
);
524 rb_reset_cpu(cpu_buffer
);
526 rb_check_pages(cpu_buffer
);
528 atomic_dec(&cpu_buffer
->record_disabled
);
532 * ring_buffer_resize - resize the ring buffer
533 * @buffer: the buffer to resize.
534 * @size: the new size.
536 * The tracer is responsible for making sure that the buffer is
537 * not being used while changing the size.
538 * Note: We may be able to change the above requirement by using
539 * RCU synchronizations.
541 * Minimum size is 2 * BUF_PAGE_SIZE.
543 * Returns -1 on failure.
545 int ring_buffer_resize(struct ring_buffer
*buffer
, unsigned long size
)
547 struct ring_buffer_per_cpu
*cpu_buffer
;
548 unsigned nr_pages
, rm_pages
, new_pages
;
549 struct buffer_page
*page
, *tmp
;
550 unsigned long buffer_size
;
556 * Always succeed at resizing a non-existent buffer:
561 size
= DIV_ROUND_UP(size
, BUF_PAGE_SIZE
);
562 size
*= BUF_PAGE_SIZE
;
563 buffer_size
= buffer
->pages
* BUF_PAGE_SIZE
;
565 /* we need a minimum of two pages */
566 if (size
< BUF_PAGE_SIZE
* 2)
567 size
= BUF_PAGE_SIZE
* 2;
569 if (size
== buffer_size
)
572 mutex_lock(&buffer
->mutex
);
574 nr_pages
= DIV_ROUND_UP(size
, BUF_PAGE_SIZE
);
576 if (size
< buffer_size
) {
578 /* easy case, just free pages */
579 BUG_ON(nr_pages
>= buffer
->pages
);
581 rm_pages
= buffer
->pages
- nr_pages
;
583 for_each_buffer_cpu(buffer
, cpu
) {
584 cpu_buffer
= buffer
->buffers
[cpu
];
585 rb_remove_pages(cpu_buffer
, rm_pages
);
591 * This is a bit more difficult. We only want to add pages
592 * when we can allocate enough for all CPUs. We do this
593 * by allocating all the pages and storing them on a local
594 * link list. If we succeed in our allocation, then we
595 * add these pages to the cpu_buffers. Otherwise we just free
596 * them all and return -ENOMEM;
598 BUG_ON(nr_pages
<= buffer
->pages
);
599 new_pages
= nr_pages
- buffer
->pages
;
601 for_each_buffer_cpu(buffer
, cpu
) {
602 for (i
= 0; i
< new_pages
; i
++) {
603 page
= kzalloc_node(ALIGN(sizeof(*page
),
605 GFP_KERNEL
, cpu_to_node(cpu
));
608 list_add(&page
->list
, &pages
);
609 addr
= __get_free_page(GFP_KERNEL
);
612 page
->page
= (void *)addr
;
616 for_each_buffer_cpu(buffer
, cpu
) {
617 cpu_buffer
= buffer
->buffers
[cpu
];
618 rb_insert_pages(cpu_buffer
, &pages
, new_pages
);
621 BUG_ON(!list_empty(&pages
));
624 buffer
->pages
= nr_pages
;
625 mutex_unlock(&buffer
->mutex
);
630 list_for_each_entry_safe(page
, tmp
, &pages
, list
) {
631 list_del_init(&page
->list
);
632 free_buffer_page(page
);
634 mutex_unlock(&buffer
->mutex
);
637 EXPORT_SYMBOL_GPL(ring_buffer_resize
);
639 static inline int rb_null_event(struct ring_buffer_event
*event
)
641 return event
->type
== RINGBUF_TYPE_PADDING
;
644 static inline void *__rb_page_index(struct buffer_page
*page
, unsigned index
)
646 return page
->page
+ index
;
649 static inline struct ring_buffer_event
*
650 rb_reader_event(struct ring_buffer_per_cpu
*cpu_buffer
)
652 return __rb_page_index(cpu_buffer
->reader_page
,
653 cpu_buffer
->reader_page
->read
);
656 static inline struct ring_buffer_event
*
657 rb_head_event(struct ring_buffer_per_cpu
*cpu_buffer
)
659 return __rb_page_index(cpu_buffer
->head_page
,
660 cpu_buffer
->head_page
->read
);
663 static inline struct ring_buffer_event
*
664 rb_iter_head_event(struct ring_buffer_iter
*iter
)
666 return __rb_page_index(iter
->head_page
, iter
->head
);
669 static inline unsigned rb_page_write(struct buffer_page
*bpage
)
671 return local_read(&bpage
->write
);
674 static inline unsigned rb_page_commit(struct buffer_page
*bpage
)
676 return local_read(&bpage
->commit
);
679 /* Size is determined by what has been commited */
680 static inline unsigned rb_page_size(struct buffer_page
*bpage
)
682 return rb_page_commit(bpage
);
685 static inline unsigned
686 rb_commit_index(struct ring_buffer_per_cpu
*cpu_buffer
)
688 return rb_page_commit(cpu_buffer
->commit_page
);
691 static inline unsigned rb_head_size(struct ring_buffer_per_cpu
*cpu_buffer
)
693 return rb_page_commit(cpu_buffer
->head_page
);
697 * When the tail hits the head and the buffer is in overwrite mode,
698 * the head jumps to the next page and all content on the previous
699 * page is discarded. But before doing so, we update the overrun
700 * variable of the buffer.
702 static void rb_update_overflow(struct ring_buffer_per_cpu
*cpu_buffer
)
704 struct ring_buffer_event
*event
;
707 for (head
= 0; head
< rb_head_size(cpu_buffer
);
708 head
+= rb_event_length(event
)) {
710 event
= __rb_page_index(cpu_buffer
->head_page
, head
);
711 BUG_ON(rb_null_event(event
));
712 /* Only count data entries */
713 if (event
->type
!= RINGBUF_TYPE_DATA
)
715 cpu_buffer
->overrun
++;
716 cpu_buffer
->entries
--;
720 static inline void rb_inc_page(struct ring_buffer_per_cpu
*cpu_buffer
,
721 struct buffer_page
**page
)
723 struct list_head
*p
= (*page
)->list
.next
;
725 if (p
== &cpu_buffer
->pages
)
728 *page
= list_entry(p
, struct buffer_page
, list
);
731 static inline unsigned
732 rb_event_index(struct ring_buffer_event
*event
)
734 unsigned long addr
= (unsigned long)event
;
736 return (addr
& ~PAGE_MASK
) - (PAGE_SIZE
- BUF_PAGE_SIZE
);
740 rb_is_commit(struct ring_buffer_per_cpu
*cpu_buffer
,
741 struct ring_buffer_event
*event
)
743 unsigned long addr
= (unsigned long)event
;
746 index
= rb_event_index(event
);
749 return cpu_buffer
->commit_page
->page
== (void *)addr
&&
750 rb_commit_index(cpu_buffer
) == index
;
754 rb_set_commit_event(struct ring_buffer_per_cpu
*cpu_buffer
,
755 struct ring_buffer_event
*event
)
757 unsigned long addr
= (unsigned long)event
;
760 index
= rb_event_index(event
);
763 while (cpu_buffer
->commit_page
->page
!= (void *)addr
) {
764 RB_WARN_ON(cpu_buffer
,
765 cpu_buffer
->commit_page
== cpu_buffer
->tail_page
);
766 cpu_buffer
->commit_page
->commit
=
767 cpu_buffer
->commit_page
->write
;
768 rb_inc_page(cpu_buffer
, &cpu_buffer
->commit_page
);
769 cpu_buffer
->write_stamp
= cpu_buffer
->commit_page
->time_stamp
;
772 /* Now set the commit to the event's index */
773 local_set(&cpu_buffer
->commit_page
->commit
, index
);
777 rb_set_commit_to_write(struct ring_buffer_per_cpu
*cpu_buffer
)
780 * We only race with interrupts and NMIs on this CPU.
781 * If we own the commit event, then we can commit
782 * all others that interrupted us, since the interruptions
783 * are in stack format (they finish before they come
784 * back to us). This allows us to do a simple loop to
785 * assign the commit to the tail.
787 while (cpu_buffer
->commit_page
!= cpu_buffer
->tail_page
) {
788 cpu_buffer
->commit_page
->commit
=
789 cpu_buffer
->commit_page
->write
;
790 rb_inc_page(cpu_buffer
, &cpu_buffer
->commit_page
);
791 cpu_buffer
->write_stamp
= cpu_buffer
->commit_page
->time_stamp
;
792 /* add barrier to keep gcc from optimizing too much */
795 while (rb_commit_index(cpu_buffer
) !=
796 rb_page_write(cpu_buffer
->commit_page
)) {
797 cpu_buffer
->commit_page
->commit
=
798 cpu_buffer
->commit_page
->write
;
803 static void rb_reset_reader_page(struct ring_buffer_per_cpu
*cpu_buffer
)
805 cpu_buffer
->read_stamp
= cpu_buffer
->reader_page
->time_stamp
;
806 cpu_buffer
->reader_page
->read
= 0;
809 static inline void rb_inc_iter(struct ring_buffer_iter
*iter
)
811 struct ring_buffer_per_cpu
*cpu_buffer
= iter
->cpu_buffer
;
814 * The iterator could be on the reader page (it starts there).
815 * But the head could have moved, since the reader was
816 * found. Check for this case and assign the iterator
817 * to the head page instead of next.
819 if (iter
->head_page
== cpu_buffer
->reader_page
)
820 iter
->head_page
= cpu_buffer
->head_page
;
822 rb_inc_page(cpu_buffer
, &iter
->head_page
);
824 iter
->read_stamp
= iter
->head_page
->time_stamp
;
829 * ring_buffer_update_event - update event type and data
830 * @event: the even to update
831 * @type: the type of event
832 * @length: the size of the event field in the ring buffer
834 * Update the type and data fields of the event. The length
835 * is the actual size that is written to the ring buffer,
836 * and with this, we can determine what to place into the
840 rb_update_event(struct ring_buffer_event
*event
,
841 unsigned type
, unsigned length
)
847 case RINGBUF_TYPE_PADDING
:
850 case RINGBUF_TYPE_TIME_EXTEND
:
852 (RB_LEN_TIME_EXTEND
+ (RB_ALIGNMENT
-1))
853 >> RB_ALIGNMENT_SHIFT
;
856 case RINGBUF_TYPE_TIME_STAMP
:
858 (RB_LEN_TIME_STAMP
+ (RB_ALIGNMENT
-1))
859 >> RB_ALIGNMENT_SHIFT
;
862 case RINGBUF_TYPE_DATA
:
863 length
-= RB_EVNT_HDR_SIZE
;
864 if (length
> RB_MAX_SMALL_DATA
) {
866 event
->array
[0] = length
;
869 (length
+ (RB_ALIGNMENT
-1))
870 >> RB_ALIGNMENT_SHIFT
;
877 static inline unsigned rb_calculate_event_length(unsigned length
)
879 struct ring_buffer_event event
; /* Used only for sizeof array */
881 /* zero length can cause confusions */
885 if (length
> RB_MAX_SMALL_DATA
)
886 length
+= sizeof(event
.array
[0]);
888 length
+= RB_EVNT_HDR_SIZE
;
889 length
= ALIGN(length
, RB_ALIGNMENT
);
894 static struct ring_buffer_event
*
895 __rb_reserve_next(struct ring_buffer_per_cpu
*cpu_buffer
,
896 unsigned type
, unsigned long length
, u64
*ts
)
898 struct buffer_page
*tail_page
, *head_page
, *reader_page
;
899 unsigned long tail
, write
;
900 struct ring_buffer
*buffer
= cpu_buffer
->buffer
;
901 struct ring_buffer_event
*event
;
904 tail_page
= cpu_buffer
->tail_page
;
905 write
= local_add_return(length
, &tail_page
->write
);
906 tail
= write
- length
;
908 /* See if we shot pass the end of this buffer page */
909 if (write
> BUF_PAGE_SIZE
) {
910 struct buffer_page
*next_page
= tail_page
;
912 spin_lock_irqsave(&cpu_buffer
->lock
, flags
);
914 rb_inc_page(cpu_buffer
, &next_page
);
916 head_page
= cpu_buffer
->head_page
;
917 reader_page
= cpu_buffer
->reader_page
;
919 /* we grabbed the lock before incrementing */
920 RB_WARN_ON(cpu_buffer
, next_page
== reader_page
);
923 * If for some reason, we had an interrupt storm that made
924 * it all the way around the buffer, bail, and warn
927 if (unlikely(next_page
== cpu_buffer
->commit_page
)) {
932 if (next_page
== head_page
) {
933 if (!(buffer
->flags
& RB_FL_OVERWRITE
)) {
935 if (tail
<= BUF_PAGE_SIZE
)
936 local_set(&tail_page
->write
, tail
);
940 /* tail_page has not moved yet? */
941 if (tail_page
== cpu_buffer
->tail_page
) {
942 /* count overflows */
943 rb_update_overflow(cpu_buffer
);
945 rb_inc_page(cpu_buffer
, &head_page
);
946 cpu_buffer
->head_page
= head_page
;
947 cpu_buffer
->head_page
->read
= 0;
952 * If the tail page is still the same as what we think
953 * it is, then it is up to us to update the tail
956 if (tail_page
== cpu_buffer
->tail_page
) {
957 local_set(&next_page
->write
, 0);
958 local_set(&next_page
->commit
, 0);
959 cpu_buffer
->tail_page
= next_page
;
961 /* reread the time stamp */
962 *ts
= ring_buffer_time_stamp(cpu_buffer
->cpu
);
963 cpu_buffer
->tail_page
->time_stamp
= *ts
;
967 * The actual tail page has moved forward.
969 if (tail
< BUF_PAGE_SIZE
) {
970 /* Mark the rest of the page with padding */
971 event
= __rb_page_index(tail_page
, tail
);
972 event
->type
= RINGBUF_TYPE_PADDING
;
975 if (tail
<= BUF_PAGE_SIZE
)
976 /* Set the write back to the previous setting */
977 local_set(&tail_page
->write
, tail
);
980 * If this was a commit entry that failed,
983 if (tail_page
== cpu_buffer
->commit_page
&&
984 tail
== rb_commit_index(cpu_buffer
)) {
985 rb_set_commit_to_write(cpu_buffer
);
988 spin_unlock_irqrestore(&cpu_buffer
->lock
, flags
);
990 /* fail and let the caller try again */
991 return ERR_PTR(-EAGAIN
);
994 /* We reserved something on the buffer */
996 BUG_ON(write
> BUF_PAGE_SIZE
);
998 event
= __rb_page_index(tail_page
, tail
);
999 rb_update_event(event
, type
, length
);
1002 * If this is a commit and the tail is zero, then update
1003 * this page's time stamp.
1005 if (!tail
&& rb_is_commit(cpu_buffer
, event
))
1006 cpu_buffer
->commit_page
->time_stamp
= *ts
;
1011 spin_unlock_irqrestore(&cpu_buffer
->lock
, flags
);
1016 rb_add_time_stamp(struct ring_buffer_per_cpu
*cpu_buffer
,
1017 u64
*ts
, u64
*delta
)
1019 struct ring_buffer_event
*event
;
1023 if (unlikely(*delta
> (1ULL << 59) && !once
++)) {
1024 printk(KERN_WARNING
"Delta way too big! %llu"
1025 " ts=%llu write stamp = %llu\n",
1026 (unsigned long long)*delta
,
1027 (unsigned long long)*ts
,
1028 (unsigned long long)cpu_buffer
->write_stamp
);
1033 * The delta is too big, we to add a
1036 event
= __rb_reserve_next(cpu_buffer
,
1037 RINGBUF_TYPE_TIME_EXTEND
,
1043 if (PTR_ERR(event
) == -EAGAIN
)
1046 /* Only a commited time event can update the write stamp */
1047 if (rb_is_commit(cpu_buffer
, event
)) {
1049 * If this is the first on the page, then we need to
1050 * update the page itself, and just put in a zero.
1052 if (rb_event_index(event
)) {
1053 event
->time_delta
= *delta
& TS_MASK
;
1054 event
->array
[0] = *delta
>> TS_SHIFT
;
1056 cpu_buffer
->commit_page
->time_stamp
= *ts
;
1057 event
->time_delta
= 0;
1058 event
->array
[0] = 0;
1060 cpu_buffer
->write_stamp
= *ts
;
1061 /* let the caller know this was the commit */
1064 /* Darn, this is just wasted space */
1065 event
->time_delta
= 0;
1066 event
->array
[0] = 0;
1075 static struct ring_buffer_event
*
1076 rb_reserve_next_event(struct ring_buffer_per_cpu
*cpu_buffer
,
1077 unsigned type
, unsigned long length
)
1079 struct ring_buffer_event
*event
;
1086 * We allow for interrupts to reenter here and do a trace.
1087 * If one does, it will cause this original code to loop
1088 * back here. Even with heavy interrupts happening, this
1089 * should only happen a few times in a row. If this happens
1090 * 1000 times in a row, there must be either an interrupt
1091 * storm or we have something buggy.
1094 if (unlikely(++nr_loops
> 1000)) {
1095 RB_WARN_ON(cpu_buffer
, 1);
1099 ts
= ring_buffer_time_stamp(cpu_buffer
->cpu
);
1102 * Only the first commit can update the timestamp.
1103 * Yes there is a race here. If an interrupt comes in
1104 * just after the conditional and it traces too, then it
1105 * will also check the deltas. More than one timestamp may
1106 * also be made. But only the entry that did the actual
1107 * commit will be something other than zero.
1109 if (cpu_buffer
->tail_page
== cpu_buffer
->commit_page
&&
1110 rb_page_write(cpu_buffer
->tail_page
) ==
1111 rb_commit_index(cpu_buffer
)) {
1113 delta
= ts
- cpu_buffer
->write_stamp
;
1115 /* make sure this delta is calculated here */
1118 /* Did the write stamp get updated already? */
1119 if (unlikely(ts
< cpu_buffer
->write_stamp
))
1122 if (test_time_stamp(delta
)) {
1124 commit
= rb_add_time_stamp(cpu_buffer
, &ts
, &delta
);
1126 if (commit
== -EBUSY
)
1129 if (commit
== -EAGAIN
)
1132 RB_WARN_ON(cpu_buffer
, commit
< 0);
1135 /* Non commits have zero deltas */
1138 event
= __rb_reserve_next(cpu_buffer
, type
, length
, &ts
);
1139 if (PTR_ERR(event
) == -EAGAIN
)
1143 if (unlikely(commit
))
1145 * Ouch! We needed a timestamp and it was commited. But
1146 * we didn't get our event reserved.
1148 rb_set_commit_to_write(cpu_buffer
);
1153 * If the timestamp was commited, make the commit our entry
1154 * now so that we will update it when needed.
1157 rb_set_commit_event(cpu_buffer
, event
);
1158 else if (!rb_is_commit(cpu_buffer
, event
))
1161 event
->time_delta
= delta
;
1166 static DEFINE_PER_CPU(int, rb_need_resched
);
1169 * ring_buffer_lock_reserve - reserve a part of the buffer
1170 * @buffer: the ring buffer to reserve from
1171 * @length: the length of the data to reserve (excluding event header)
1172 * @flags: a pointer to save the interrupt flags
1174 * Returns a reseverd event on the ring buffer to copy directly to.
1175 * The user of this interface will need to get the body to write into
1176 * and can use the ring_buffer_event_data() interface.
1178 * The length is the length of the data needed, not the event length
1179 * which also includes the event header.
1181 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned.
1182 * If NULL is returned, then nothing has been allocated or locked.
1184 struct ring_buffer_event
*
1185 ring_buffer_lock_reserve(struct ring_buffer
*buffer
,
1186 unsigned long length
,
1187 unsigned long *flags
)
1189 struct ring_buffer_per_cpu
*cpu_buffer
;
1190 struct ring_buffer_event
*event
;
1193 if (ring_buffers_off
)
1196 if (atomic_read(&buffer
->record_disabled
))
1199 /* If we are tracing schedule, we don't want to recurse */
1200 resched
= need_resched();
1201 preempt_disable_notrace();
1203 cpu
= raw_smp_processor_id();
1205 if (!cpu_isset(cpu
, buffer
->cpumask
))
1208 cpu_buffer
= buffer
->buffers
[cpu
];
1210 if (atomic_read(&cpu_buffer
->record_disabled
))
1213 length
= rb_calculate_event_length(length
);
1214 if (length
> BUF_PAGE_SIZE
)
1217 event
= rb_reserve_next_event(cpu_buffer
, RINGBUF_TYPE_DATA
, length
);
1222 * Need to store resched state on this cpu.
1223 * Only the first needs to.
1226 if (preempt_count() == 1)
1227 per_cpu(rb_need_resched
, cpu
) = resched
;
1233 preempt_enable_no_resched_notrace();
1235 preempt_enable_notrace();
1238 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve
);
1240 static void rb_commit(struct ring_buffer_per_cpu
*cpu_buffer
,
1241 struct ring_buffer_event
*event
)
1243 cpu_buffer
->entries
++;
1245 /* Only process further if we own the commit */
1246 if (!rb_is_commit(cpu_buffer
, event
))
1249 cpu_buffer
->write_stamp
+= event
->time_delta
;
1251 rb_set_commit_to_write(cpu_buffer
);
1255 * ring_buffer_unlock_commit - commit a reserved
1256 * @buffer: The buffer to commit to
1257 * @event: The event pointer to commit.
1258 * @flags: the interrupt flags received from ring_buffer_lock_reserve.
1260 * This commits the data to the ring buffer, and releases any locks held.
1262 * Must be paired with ring_buffer_lock_reserve.
1264 int ring_buffer_unlock_commit(struct ring_buffer
*buffer
,
1265 struct ring_buffer_event
*event
,
1266 unsigned long flags
)
1268 struct ring_buffer_per_cpu
*cpu_buffer
;
1269 int cpu
= raw_smp_processor_id();
1271 cpu_buffer
= buffer
->buffers
[cpu
];
1273 rb_commit(cpu_buffer
, event
);
1276 * Only the last preempt count needs to restore preemption.
1278 if (preempt_count() == 1) {
1279 if (per_cpu(rb_need_resched
, cpu
))
1280 preempt_enable_no_resched_notrace();
1282 preempt_enable_notrace();
1284 preempt_enable_no_resched_notrace();
1288 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit
);
1291 * ring_buffer_write - write data to the buffer without reserving
1292 * @buffer: The ring buffer to write to.
1293 * @length: The length of the data being written (excluding the event header)
1294 * @data: The data to write to the buffer.
1296 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as
1297 * one function. If you already have the data to write to the buffer, it
1298 * may be easier to simply call this function.
1300 * Note, like ring_buffer_lock_reserve, the length is the length of the data
1301 * and not the length of the event which would hold the header.
1303 int ring_buffer_write(struct ring_buffer
*buffer
,
1304 unsigned long length
,
1307 struct ring_buffer_per_cpu
*cpu_buffer
;
1308 struct ring_buffer_event
*event
;
1309 unsigned long event_length
;
1314 if (ring_buffers_off
)
1317 if (atomic_read(&buffer
->record_disabled
))
1320 resched
= need_resched();
1321 preempt_disable_notrace();
1323 cpu
= raw_smp_processor_id();
1325 if (!cpu_isset(cpu
, buffer
->cpumask
))
1328 cpu_buffer
= buffer
->buffers
[cpu
];
1330 if (atomic_read(&cpu_buffer
->record_disabled
))
1333 event_length
= rb_calculate_event_length(length
);
1334 event
= rb_reserve_next_event(cpu_buffer
,
1335 RINGBUF_TYPE_DATA
, event_length
);
1339 body
= rb_event_data(event
);
1341 memcpy(body
, data
, length
);
1343 rb_commit(cpu_buffer
, event
);
1348 preempt_enable_no_resched_notrace();
1350 preempt_enable_notrace();
1354 EXPORT_SYMBOL_GPL(ring_buffer_write
);
1356 static inline int rb_per_cpu_empty(struct ring_buffer_per_cpu
*cpu_buffer
)
1358 struct buffer_page
*reader
= cpu_buffer
->reader_page
;
1359 struct buffer_page
*head
= cpu_buffer
->head_page
;
1360 struct buffer_page
*commit
= cpu_buffer
->commit_page
;
1362 return reader
->read
== rb_page_commit(reader
) &&
1363 (commit
== reader
||
1365 head
->read
== rb_page_commit(commit
)));
1369 * ring_buffer_record_disable - stop all writes into the buffer
1370 * @buffer: The ring buffer to stop writes to.
1372 * This prevents all writes to the buffer. Any attempt to write
1373 * to the buffer after this will fail and return NULL.
1375 * The caller should call synchronize_sched() after this.
1377 void ring_buffer_record_disable(struct ring_buffer
*buffer
)
1379 atomic_inc(&buffer
->record_disabled
);
1381 EXPORT_SYMBOL_GPL(ring_buffer_record_disable
);
1384 * ring_buffer_record_enable - enable writes to the buffer
1385 * @buffer: The ring buffer to enable writes
1387 * Note, multiple disables will need the same number of enables
1388 * to truely enable the writing (much like preempt_disable).
1390 void ring_buffer_record_enable(struct ring_buffer
*buffer
)
1392 atomic_dec(&buffer
->record_disabled
);
1394 EXPORT_SYMBOL_GPL(ring_buffer_record_enable
);
1397 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer
1398 * @buffer: The ring buffer to stop writes to.
1399 * @cpu: The CPU buffer to stop
1401 * This prevents all writes to the buffer. Any attempt to write
1402 * to the buffer after this will fail and return NULL.
1404 * The caller should call synchronize_sched() after this.
1406 void ring_buffer_record_disable_cpu(struct ring_buffer
*buffer
, int cpu
)
1408 struct ring_buffer_per_cpu
*cpu_buffer
;
1410 if (!cpu_isset(cpu
, buffer
->cpumask
))
1413 cpu_buffer
= buffer
->buffers
[cpu
];
1414 atomic_inc(&cpu_buffer
->record_disabled
);
1416 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu
);
1419 * ring_buffer_record_enable_cpu - enable writes to the buffer
1420 * @buffer: The ring buffer to enable writes
1421 * @cpu: The CPU to enable.
1423 * Note, multiple disables will need the same number of enables
1424 * to truely enable the writing (much like preempt_disable).
1426 void ring_buffer_record_enable_cpu(struct ring_buffer
*buffer
, int cpu
)
1428 struct ring_buffer_per_cpu
*cpu_buffer
;
1430 if (!cpu_isset(cpu
, buffer
->cpumask
))
1433 cpu_buffer
= buffer
->buffers
[cpu
];
1434 atomic_dec(&cpu_buffer
->record_disabled
);
1436 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu
);
1439 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer
1440 * @buffer: The ring buffer
1441 * @cpu: The per CPU buffer to get the entries from.
1443 unsigned long ring_buffer_entries_cpu(struct ring_buffer
*buffer
, int cpu
)
1445 struct ring_buffer_per_cpu
*cpu_buffer
;
1447 if (!cpu_isset(cpu
, buffer
->cpumask
))
1450 cpu_buffer
= buffer
->buffers
[cpu
];
1451 return cpu_buffer
->entries
;
1453 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu
);
1456 * ring_buffer_overrun_cpu - get the number of overruns in a cpu_buffer
1457 * @buffer: The ring buffer
1458 * @cpu: The per CPU buffer to get the number of overruns from
1460 unsigned long ring_buffer_overrun_cpu(struct ring_buffer
*buffer
, int cpu
)
1462 struct ring_buffer_per_cpu
*cpu_buffer
;
1464 if (!cpu_isset(cpu
, buffer
->cpumask
))
1467 cpu_buffer
= buffer
->buffers
[cpu
];
1468 return cpu_buffer
->overrun
;
1470 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu
);
1473 * ring_buffer_entries - get the number of entries in a buffer
1474 * @buffer: The ring buffer
1476 * Returns the total number of entries in the ring buffer
1479 unsigned long ring_buffer_entries(struct ring_buffer
*buffer
)
1481 struct ring_buffer_per_cpu
*cpu_buffer
;
1482 unsigned long entries
= 0;
1485 /* if you care about this being correct, lock the buffer */
1486 for_each_buffer_cpu(buffer
, cpu
) {
1487 cpu_buffer
= buffer
->buffers
[cpu
];
1488 entries
+= cpu_buffer
->entries
;
1493 EXPORT_SYMBOL_GPL(ring_buffer_entries
);
1496 * ring_buffer_overrun_cpu - get the number of overruns in buffer
1497 * @buffer: The ring buffer
1499 * Returns the total number of overruns in the ring buffer
1502 unsigned long ring_buffer_overruns(struct ring_buffer
*buffer
)
1504 struct ring_buffer_per_cpu
*cpu_buffer
;
1505 unsigned long overruns
= 0;
1508 /* if you care about this being correct, lock the buffer */
1509 for_each_buffer_cpu(buffer
, cpu
) {
1510 cpu_buffer
= buffer
->buffers
[cpu
];
1511 overruns
+= cpu_buffer
->overrun
;
1516 EXPORT_SYMBOL_GPL(ring_buffer_overruns
);
1519 * ring_buffer_iter_reset - reset an iterator
1520 * @iter: The iterator to reset
1522 * Resets the iterator, so that it will start from the beginning
1525 void ring_buffer_iter_reset(struct ring_buffer_iter
*iter
)
1527 struct ring_buffer_per_cpu
*cpu_buffer
= iter
->cpu_buffer
;
1529 /* Iterator usage is expected to have record disabled */
1530 if (list_empty(&cpu_buffer
->reader_page
->list
)) {
1531 iter
->head_page
= cpu_buffer
->head_page
;
1532 iter
->head
= cpu_buffer
->head_page
->read
;
1534 iter
->head_page
= cpu_buffer
->reader_page
;
1535 iter
->head
= cpu_buffer
->reader_page
->read
;
1538 iter
->read_stamp
= cpu_buffer
->read_stamp
;
1540 iter
->read_stamp
= iter
->head_page
->time_stamp
;
1542 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset
);
1545 * ring_buffer_iter_empty - check if an iterator has no more to read
1546 * @iter: The iterator to check
1548 int ring_buffer_iter_empty(struct ring_buffer_iter
*iter
)
1550 struct ring_buffer_per_cpu
*cpu_buffer
;
1552 cpu_buffer
= iter
->cpu_buffer
;
1554 return iter
->head_page
== cpu_buffer
->commit_page
&&
1555 iter
->head
== rb_commit_index(cpu_buffer
);
1557 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty
);
1560 rb_update_read_stamp(struct ring_buffer_per_cpu
*cpu_buffer
,
1561 struct ring_buffer_event
*event
)
1565 switch (event
->type
) {
1566 case RINGBUF_TYPE_PADDING
:
1569 case RINGBUF_TYPE_TIME_EXTEND
:
1570 delta
= event
->array
[0];
1572 delta
+= event
->time_delta
;
1573 cpu_buffer
->read_stamp
+= delta
;
1576 case RINGBUF_TYPE_TIME_STAMP
:
1577 /* FIXME: not implemented */
1580 case RINGBUF_TYPE_DATA
:
1581 cpu_buffer
->read_stamp
+= event
->time_delta
;
1591 rb_update_iter_read_stamp(struct ring_buffer_iter
*iter
,
1592 struct ring_buffer_event
*event
)
1596 switch (event
->type
) {
1597 case RINGBUF_TYPE_PADDING
:
1600 case RINGBUF_TYPE_TIME_EXTEND
:
1601 delta
= event
->array
[0];
1603 delta
+= event
->time_delta
;
1604 iter
->read_stamp
+= delta
;
1607 case RINGBUF_TYPE_TIME_STAMP
:
1608 /* FIXME: not implemented */
1611 case RINGBUF_TYPE_DATA
:
1612 iter
->read_stamp
+= event
->time_delta
;
1621 static struct buffer_page
*
1622 rb_get_reader_page(struct ring_buffer_per_cpu
*cpu_buffer
)
1624 struct buffer_page
*reader
= NULL
;
1625 unsigned long flags
;
1628 spin_lock_irqsave(&cpu_buffer
->lock
, flags
);
1632 * This should normally only loop twice. But because the
1633 * start of the reader inserts an empty page, it causes
1634 * a case where we will loop three times. There should be no
1635 * reason to loop four times (that I know of).
1637 if (unlikely(++nr_loops
> 3)) {
1638 RB_WARN_ON(cpu_buffer
, 1);
1643 reader
= cpu_buffer
->reader_page
;
1645 /* If there's more to read, return this page */
1646 if (cpu_buffer
->reader_page
->read
< rb_page_size(reader
))
1649 /* Never should we have an index greater than the size */
1650 RB_WARN_ON(cpu_buffer
,
1651 cpu_buffer
->reader_page
->read
> rb_page_size(reader
));
1653 /* check if we caught up to the tail */
1655 if (cpu_buffer
->commit_page
== cpu_buffer
->reader_page
)
1659 * Splice the empty reader page into the list around the head.
1660 * Reset the reader page to size zero.
1663 reader
= cpu_buffer
->head_page
;
1664 cpu_buffer
->reader_page
->list
.next
= reader
->list
.next
;
1665 cpu_buffer
->reader_page
->list
.prev
= reader
->list
.prev
;
1667 local_set(&cpu_buffer
->reader_page
->write
, 0);
1668 local_set(&cpu_buffer
->reader_page
->commit
, 0);
1670 /* Make the reader page now replace the head */
1671 reader
->list
.prev
->next
= &cpu_buffer
->reader_page
->list
;
1672 reader
->list
.next
->prev
= &cpu_buffer
->reader_page
->list
;
1675 * If the tail is on the reader, then we must set the head
1676 * to the inserted page, otherwise we set it one before.
1678 cpu_buffer
->head_page
= cpu_buffer
->reader_page
;
1680 if (cpu_buffer
->commit_page
!= reader
)
1681 rb_inc_page(cpu_buffer
, &cpu_buffer
->head_page
);
1683 /* Finally update the reader page to the new head */
1684 cpu_buffer
->reader_page
= reader
;
1685 rb_reset_reader_page(cpu_buffer
);
1690 spin_unlock_irqrestore(&cpu_buffer
->lock
, flags
);
1695 static void rb_advance_reader(struct ring_buffer_per_cpu
*cpu_buffer
)
1697 struct ring_buffer_event
*event
;
1698 struct buffer_page
*reader
;
1701 reader
= rb_get_reader_page(cpu_buffer
);
1703 /* This function should not be called when buffer is empty */
1706 event
= rb_reader_event(cpu_buffer
);
1708 if (event
->type
== RINGBUF_TYPE_DATA
)
1709 cpu_buffer
->entries
--;
1711 rb_update_read_stamp(cpu_buffer
, event
);
1713 length
= rb_event_length(event
);
1714 cpu_buffer
->reader_page
->read
+= length
;
1717 static void rb_advance_iter(struct ring_buffer_iter
*iter
)
1719 struct ring_buffer
*buffer
;
1720 struct ring_buffer_per_cpu
*cpu_buffer
;
1721 struct ring_buffer_event
*event
;
1724 cpu_buffer
= iter
->cpu_buffer
;
1725 buffer
= cpu_buffer
->buffer
;
1728 * Check if we are at the end of the buffer.
1730 if (iter
->head
>= rb_page_size(iter
->head_page
)) {
1731 BUG_ON(iter
->head_page
== cpu_buffer
->commit_page
);
1736 event
= rb_iter_head_event(iter
);
1738 length
= rb_event_length(event
);
1741 * This should not be called to advance the header if we are
1742 * at the tail of the buffer.
1744 BUG_ON((iter
->head_page
== cpu_buffer
->commit_page
) &&
1745 (iter
->head
+ length
> rb_commit_index(cpu_buffer
)));
1747 rb_update_iter_read_stamp(iter
, event
);
1749 iter
->head
+= length
;
1751 /* check for end of page padding */
1752 if ((iter
->head
>= rb_page_size(iter
->head_page
)) &&
1753 (iter
->head_page
!= cpu_buffer
->commit_page
))
1754 rb_advance_iter(iter
);
1758 * ring_buffer_peek - peek at the next event to be read
1759 * @buffer: The ring buffer to read
1760 * @cpu: The cpu to peak at
1761 * @ts: The timestamp counter of this event.
1763 * This will return the event that will be read next, but does
1764 * not consume the data.
1766 struct ring_buffer_event
*
1767 ring_buffer_peek(struct ring_buffer
*buffer
, int cpu
, u64
*ts
)
1769 struct ring_buffer_per_cpu
*cpu_buffer
;
1770 struct ring_buffer_event
*event
;
1771 struct buffer_page
*reader
;
1774 if (!cpu_isset(cpu
, buffer
->cpumask
))
1777 cpu_buffer
= buffer
->buffers
[cpu
];
1781 * We repeat when a timestamp is encountered. It is possible
1782 * to get multiple timestamps from an interrupt entering just
1783 * as one timestamp is about to be written. The max times
1784 * that this can happen is the number of nested interrupts we
1785 * can have. Nesting 10 deep of interrupts is clearly
1788 if (unlikely(++nr_loops
> 10)) {
1789 RB_WARN_ON(cpu_buffer
, 1);
1793 reader
= rb_get_reader_page(cpu_buffer
);
1797 event
= rb_reader_event(cpu_buffer
);
1799 switch (event
->type
) {
1800 case RINGBUF_TYPE_PADDING
:
1801 RB_WARN_ON(cpu_buffer
, 1);
1802 rb_advance_reader(cpu_buffer
);
1805 case RINGBUF_TYPE_TIME_EXTEND
:
1806 /* Internal data, OK to advance */
1807 rb_advance_reader(cpu_buffer
);
1810 case RINGBUF_TYPE_TIME_STAMP
:
1811 /* FIXME: not implemented */
1812 rb_advance_reader(cpu_buffer
);
1815 case RINGBUF_TYPE_DATA
:
1817 *ts
= cpu_buffer
->read_stamp
+ event
->time_delta
;
1818 ring_buffer_normalize_time_stamp(cpu_buffer
->cpu
, ts
);
1828 EXPORT_SYMBOL_GPL(ring_buffer_peek
);
1831 * ring_buffer_iter_peek - peek at the next event to be read
1832 * @iter: The ring buffer iterator
1833 * @ts: The timestamp counter of this event.
1835 * This will return the event that will be read next, but does
1836 * not increment the iterator.
1838 struct ring_buffer_event
*
1839 ring_buffer_iter_peek(struct ring_buffer_iter
*iter
, u64
*ts
)
1841 struct ring_buffer
*buffer
;
1842 struct ring_buffer_per_cpu
*cpu_buffer
;
1843 struct ring_buffer_event
*event
;
1846 if (ring_buffer_iter_empty(iter
))
1849 cpu_buffer
= iter
->cpu_buffer
;
1850 buffer
= cpu_buffer
->buffer
;
1854 * We repeat when a timestamp is encountered. It is possible
1855 * to get multiple timestamps from an interrupt entering just
1856 * as one timestamp is about to be written. The max times
1857 * that this can happen is the number of nested interrupts we
1858 * can have. Nesting 10 deep of interrupts is clearly
1861 if (unlikely(++nr_loops
> 10)) {
1862 RB_WARN_ON(cpu_buffer
, 1);
1866 if (rb_per_cpu_empty(cpu_buffer
))
1869 event
= rb_iter_head_event(iter
);
1871 switch (event
->type
) {
1872 case RINGBUF_TYPE_PADDING
:
1876 case RINGBUF_TYPE_TIME_EXTEND
:
1877 /* Internal data, OK to advance */
1878 rb_advance_iter(iter
);
1881 case RINGBUF_TYPE_TIME_STAMP
:
1882 /* FIXME: not implemented */
1883 rb_advance_iter(iter
);
1886 case RINGBUF_TYPE_DATA
:
1888 *ts
= iter
->read_stamp
+ event
->time_delta
;
1889 ring_buffer_normalize_time_stamp(cpu_buffer
->cpu
, ts
);
1899 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek
);
1902 * ring_buffer_consume - return an event and consume it
1903 * @buffer: The ring buffer to get the next event from
1905 * Returns the next event in the ring buffer, and that event is consumed.
1906 * Meaning, that sequential reads will keep returning a different event,
1907 * and eventually empty the ring buffer if the producer is slower.
1909 struct ring_buffer_event
*
1910 ring_buffer_consume(struct ring_buffer
*buffer
, int cpu
, u64
*ts
)
1912 struct ring_buffer_per_cpu
*cpu_buffer
;
1913 struct ring_buffer_event
*event
;
1915 if (!cpu_isset(cpu
, buffer
->cpumask
))
1918 event
= ring_buffer_peek(buffer
, cpu
, ts
);
1922 cpu_buffer
= buffer
->buffers
[cpu
];
1923 rb_advance_reader(cpu_buffer
);
1927 EXPORT_SYMBOL_GPL(ring_buffer_consume
);
1930 * ring_buffer_read_start - start a non consuming read of the buffer
1931 * @buffer: The ring buffer to read from
1932 * @cpu: The cpu buffer to iterate over
1934 * This starts up an iteration through the buffer. It also disables
1935 * the recording to the buffer until the reading is finished.
1936 * This prevents the reading from being corrupted. This is not
1937 * a consuming read, so a producer is not expected.
1939 * Must be paired with ring_buffer_finish.
1941 struct ring_buffer_iter
*
1942 ring_buffer_read_start(struct ring_buffer
*buffer
, int cpu
)
1944 struct ring_buffer_per_cpu
*cpu_buffer
;
1945 struct ring_buffer_iter
*iter
;
1946 unsigned long flags
;
1948 if (!cpu_isset(cpu
, buffer
->cpumask
))
1951 iter
= kmalloc(sizeof(*iter
), GFP_KERNEL
);
1955 cpu_buffer
= buffer
->buffers
[cpu
];
1957 iter
->cpu_buffer
= cpu_buffer
;
1959 atomic_inc(&cpu_buffer
->record_disabled
);
1960 synchronize_sched();
1962 spin_lock_irqsave(&cpu_buffer
->lock
, flags
);
1963 ring_buffer_iter_reset(iter
);
1964 spin_unlock_irqrestore(&cpu_buffer
->lock
, flags
);
1968 EXPORT_SYMBOL_GPL(ring_buffer_read_start
);
1971 * ring_buffer_finish - finish reading the iterator of the buffer
1972 * @iter: The iterator retrieved by ring_buffer_start
1974 * This re-enables the recording to the buffer, and frees the
1978 ring_buffer_read_finish(struct ring_buffer_iter
*iter
)
1980 struct ring_buffer_per_cpu
*cpu_buffer
= iter
->cpu_buffer
;
1982 atomic_dec(&cpu_buffer
->record_disabled
);
1985 EXPORT_SYMBOL_GPL(ring_buffer_read_finish
);
1988 * ring_buffer_read - read the next item in the ring buffer by the iterator
1989 * @iter: The ring buffer iterator
1990 * @ts: The time stamp of the event read.
1992 * This reads the next event in the ring buffer and increments the iterator.
1994 struct ring_buffer_event
*
1995 ring_buffer_read(struct ring_buffer_iter
*iter
, u64
*ts
)
1997 struct ring_buffer_event
*event
;
1999 event
= ring_buffer_iter_peek(iter
, ts
);
2003 rb_advance_iter(iter
);
2007 EXPORT_SYMBOL_GPL(ring_buffer_read
);
2010 * ring_buffer_size - return the size of the ring buffer (in bytes)
2011 * @buffer: The ring buffer.
2013 unsigned long ring_buffer_size(struct ring_buffer
*buffer
)
2015 return BUF_PAGE_SIZE
* buffer
->pages
;
2017 EXPORT_SYMBOL_GPL(ring_buffer_size
);
2020 rb_reset_cpu(struct ring_buffer_per_cpu
*cpu_buffer
)
2022 cpu_buffer
->head_page
2023 = list_entry(cpu_buffer
->pages
.next
, struct buffer_page
, list
);
2024 local_set(&cpu_buffer
->head_page
->write
, 0);
2025 local_set(&cpu_buffer
->head_page
->commit
, 0);
2027 cpu_buffer
->head_page
->read
= 0;
2029 cpu_buffer
->tail_page
= cpu_buffer
->head_page
;
2030 cpu_buffer
->commit_page
= cpu_buffer
->head_page
;
2032 INIT_LIST_HEAD(&cpu_buffer
->reader_page
->list
);
2033 local_set(&cpu_buffer
->reader_page
->write
, 0);
2034 local_set(&cpu_buffer
->reader_page
->commit
, 0);
2035 cpu_buffer
->reader_page
->read
= 0;
2037 cpu_buffer
->overrun
= 0;
2038 cpu_buffer
->entries
= 0;
2042 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer
2043 * @buffer: The ring buffer to reset a per cpu buffer of
2044 * @cpu: The CPU buffer to be reset
2046 void ring_buffer_reset_cpu(struct ring_buffer
*buffer
, int cpu
)
2048 struct ring_buffer_per_cpu
*cpu_buffer
= buffer
->buffers
[cpu
];
2049 unsigned long flags
;
2051 if (!cpu_isset(cpu
, buffer
->cpumask
))
2054 spin_lock_irqsave(&cpu_buffer
->lock
, flags
);
2056 rb_reset_cpu(cpu_buffer
);
2058 spin_unlock_irqrestore(&cpu_buffer
->lock
, flags
);
2060 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu
);
2063 * ring_buffer_reset - reset a ring buffer
2064 * @buffer: The ring buffer to reset all cpu buffers
2066 void ring_buffer_reset(struct ring_buffer
*buffer
)
2070 for_each_buffer_cpu(buffer
, cpu
)
2071 ring_buffer_reset_cpu(buffer
, cpu
);
2073 EXPORT_SYMBOL_GPL(ring_buffer_reset
);
2076 * rind_buffer_empty - is the ring buffer empty?
2077 * @buffer: The ring buffer to test
2079 int ring_buffer_empty(struct ring_buffer
*buffer
)
2081 struct ring_buffer_per_cpu
*cpu_buffer
;
2084 /* yes this is racy, but if you don't like the race, lock the buffer */
2085 for_each_buffer_cpu(buffer
, cpu
) {
2086 cpu_buffer
= buffer
->buffers
[cpu
];
2087 if (!rb_per_cpu_empty(cpu_buffer
))
2092 EXPORT_SYMBOL_GPL(ring_buffer_empty
);
2095 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty?
2096 * @buffer: The ring buffer
2097 * @cpu: The CPU buffer to test
2099 int ring_buffer_empty_cpu(struct ring_buffer
*buffer
, int cpu
)
2101 struct ring_buffer_per_cpu
*cpu_buffer
;
2103 if (!cpu_isset(cpu
, buffer
->cpumask
))
2106 cpu_buffer
= buffer
->buffers
[cpu
];
2107 return rb_per_cpu_empty(cpu_buffer
);
2109 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu
);
2112 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers
2113 * @buffer_a: One buffer to swap with
2114 * @buffer_b: The other buffer to swap with
2116 * This function is useful for tracers that want to take a "snapshot"
2117 * of a CPU buffer and has another back up buffer lying around.
2118 * it is expected that the tracer handles the cpu buffer not being
2119 * used at the moment.
2121 int ring_buffer_swap_cpu(struct ring_buffer
*buffer_a
,
2122 struct ring_buffer
*buffer_b
, int cpu
)
2124 struct ring_buffer_per_cpu
*cpu_buffer_a
;
2125 struct ring_buffer_per_cpu
*cpu_buffer_b
;
2127 if (!cpu_isset(cpu
, buffer_a
->cpumask
) ||
2128 !cpu_isset(cpu
, buffer_b
->cpumask
))
2131 /* At least make sure the two buffers are somewhat the same */
2132 if (buffer_a
->size
!= buffer_b
->size
||
2133 buffer_a
->pages
!= buffer_b
->pages
)
2136 cpu_buffer_a
= buffer_a
->buffers
[cpu
];
2137 cpu_buffer_b
= buffer_b
->buffers
[cpu
];
2140 * We can't do a synchronize_sched here because this
2141 * function can be called in atomic context.
2142 * Normally this will be called from the same CPU as cpu.
2143 * If not it's up to the caller to protect this.
2145 atomic_inc(&cpu_buffer_a
->record_disabled
);
2146 atomic_inc(&cpu_buffer_b
->record_disabled
);
2148 buffer_a
->buffers
[cpu
] = cpu_buffer_b
;
2149 buffer_b
->buffers
[cpu
] = cpu_buffer_a
;
2151 cpu_buffer_b
->buffer
= buffer_a
;
2152 cpu_buffer_a
->buffer
= buffer_b
;
2154 atomic_dec(&cpu_buffer_a
->record_disabled
);
2155 atomic_dec(&cpu_buffer_b
->record_disabled
);
2159 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu
);
2162 rb_simple_read(struct file
*filp
, char __user
*ubuf
,
2163 size_t cnt
, loff_t
*ppos
)
2165 int *p
= filp
->private_data
;
2169 /* !ring_buffers_off == tracing_on */
2170 r
= sprintf(buf
, "%d\n", !*p
);
2172 return simple_read_from_buffer(ubuf
, cnt
, ppos
, buf
, r
);
2176 rb_simple_write(struct file
*filp
, const char __user
*ubuf
,
2177 size_t cnt
, loff_t
*ppos
)
2179 int *p
= filp
->private_data
;
2184 if (cnt
>= sizeof(buf
))
2187 if (copy_from_user(&buf
, ubuf
, cnt
))
2192 ret
= strict_strtoul(buf
, 10, &val
);
2196 /* !ring_buffers_off == tracing_on */
2204 static struct file_operations rb_simple_fops
= {
2205 .open
= tracing_open_generic
,
2206 .read
= rb_simple_read
,
2207 .write
= rb_simple_write
,
2211 static __init
int rb_init_debugfs(void)
2213 struct dentry
*d_tracer
;
2214 struct dentry
*entry
;
2216 d_tracer
= tracing_init_dentry();
2218 entry
= debugfs_create_file("tracing_on", 0644, d_tracer
,
2219 &ring_buffers_off
, &rb_simple_fops
);
2221 pr_warning("Could not create debugfs 'tracing_on' entry\n");
2226 fs_initcall(rb_init_debugfs
);