2 * padata.c - generic interface to process data streams in parallel
4 * Copyright (C) 2008, 2009 secunet Security Networks AG
5 * Copyright (C) 2008, 2009 Steffen Klassert <steffen.klassert@secunet.com>
7 * This program is free software; you can redistribute it and/or modify it
8 * under the terms and conditions of the GNU General Public License,
9 * version 2, as published by the Free Software Foundation.
11 * This program is distributed in the hope it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
16 * You should have received a copy of the GNU General Public License along with
17 * this program; if not, write to the Free Software Foundation, Inc.,
18 * 51 Franklin St - Fifth Floor, Boston, MA 02110-1301 USA.
21 #include <linux/module.h>
22 #include <linux/cpumask.h>
23 #include <linux/err.h>
24 #include <linux/cpu.h>
25 #include <linux/padata.h>
26 #include <linux/mutex.h>
27 #include <linux/sched.h>
28 #include <linux/slab.h>
29 #include <linux/rcupdate.h>
31 #define MAX_SEQ_NR INT_MAX - NR_CPUS
32 #define MAX_OBJ_NUM 1000
34 static int padata_index_to_cpu(struct parallel_data
*pd
, int cpu_index
)
38 target_cpu
= cpumask_first(pd
->cpumask
);
39 for (cpu
= 0; cpu
< cpu_index
; cpu
++)
40 target_cpu
= cpumask_next(target_cpu
, pd
->cpumask
);
45 static int padata_cpu_hash(struct padata_priv
*padata
)
48 struct parallel_data
*pd
;
53 * Hash the sequence numbers to the cpus by taking
54 * seq_nr mod. number of cpus in use.
56 cpu_index
= padata
->seq_nr
% cpumask_weight(pd
->cpumask
);
58 return padata_index_to_cpu(pd
, cpu_index
);
61 static void padata_parallel_worker(struct work_struct
*work
)
63 struct padata_queue
*queue
;
64 struct parallel_data
*pd
;
65 struct padata_instance
*pinst
;
66 LIST_HEAD(local_list
);
69 queue
= container_of(work
, struct padata_queue
, pwork
);
73 spin_lock(&queue
->parallel
.lock
);
74 list_replace_init(&queue
->parallel
.list
, &local_list
);
75 spin_unlock(&queue
->parallel
.lock
);
77 while (!list_empty(&local_list
)) {
78 struct padata_priv
*padata
;
80 padata
= list_entry(local_list
.next
,
81 struct padata_priv
, list
);
83 list_del_init(&padata
->list
);
85 padata
->parallel(padata
);
92 * padata_do_parallel - padata parallelization function
94 * @pinst: padata instance
95 * @padata: object to be parallelized
96 * @cb_cpu: cpu the serialization callback function will run on,
97 * must be in the cpumask of padata.
99 * The parallelization callback function will run with BHs off.
100 * Note: Every object which is parallelized by padata_do_parallel
101 * must be seen by padata_do_serial.
103 int padata_do_parallel(struct padata_instance
*pinst
,
104 struct padata_priv
*padata
, int cb_cpu
)
107 struct padata_queue
*queue
;
108 struct parallel_data
*pd
;
112 pd
= rcu_dereference(pinst
->pd
);
115 if (!(pinst
->flags
& PADATA_INIT
))
119 if ((pinst
->flags
& PADATA_RESET
))
122 if (atomic_read(&pd
->refcnt
) >= MAX_OBJ_NUM
)
126 if (!cpumask_test_cpu(cb_cpu
, pd
->cpumask
))
130 atomic_inc(&pd
->refcnt
);
132 padata
->cb_cpu
= cb_cpu
;
134 if (unlikely(atomic_read(&pd
->seq_nr
) == pd
->max_seq_nr
))
135 atomic_set(&pd
->seq_nr
, -1);
137 padata
->seq_nr
= atomic_inc_return(&pd
->seq_nr
);
139 target_cpu
= padata_cpu_hash(padata
);
140 queue
= per_cpu_ptr(pd
->queue
, target_cpu
);
142 spin_lock(&queue
->parallel
.lock
);
143 list_add_tail(&padata
->list
, &queue
->parallel
.list
);
144 spin_unlock(&queue
->parallel
.lock
);
146 queue_work_on(target_cpu
, pinst
->wq
, &queue
->pwork
);
149 rcu_read_unlock_bh();
153 EXPORT_SYMBOL(padata_do_parallel
);
156 * padata_get_next - Get the next object that needs serialization.
160 * A pointer to the control struct of the next object that needs
161 * serialization, if present in one of the percpu reorder queues.
163 * NULL, if all percpu reorder queues are empty.
165 * -EINPROGRESS, if the next object that needs serialization will
166 * be parallel processed by another cpu and is not yet present in
167 * the cpu's reorder queue.
169 * -ENODATA, if this cpu has to do the parallel processing for
172 static struct padata_priv
*padata_get_next(struct parallel_data
*pd
)
174 int cpu
, num_cpus
, empty
, calc_seq_nr
;
175 int seq_nr
, next_nr
, overrun
, next_overrun
;
176 struct padata_queue
*queue
, *next_queue
;
177 struct padata_priv
*padata
;
178 struct padata_list
*reorder
;
185 num_cpus
= cpumask_weight(pd
->cpumask
);
187 for_each_cpu(cpu
, pd
->cpumask
) {
188 queue
= per_cpu_ptr(pd
->queue
, cpu
);
189 reorder
= &queue
->reorder
;
192 * Calculate the seq_nr of the object that should be
193 * next in this reorder queue.
196 calc_seq_nr
= (atomic_read(&queue
->num_obj
) * num_cpus
)
199 if (unlikely(calc_seq_nr
> pd
->max_seq_nr
)) {
200 calc_seq_nr
= calc_seq_nr
- pd
->max_seq_nr
- 1;
204 if (!list_empty(&reorder
->list
)) {
205 padata
= list_entry(reorder
->list
.next
,
206 struct padata_priv
, list
);
208 seq_nr
= padata
->seq_nr
;
209 BUG_ON(calc_seq_nr
!= seq_nr
);
211 seq_nr
= calc_seq_nr
;
215 if (next_nr
< 0 || seq_nr
< next_nr
216 || (next_overrun
&& !overrun
)) {
218 next_overrun
= overrun
;
225 if (empty
== num_cpus
)
228 reorder
= &next_queue
->reorder
;
230 if (!list_empty(&reorder
->list
)) {
231 padata
= list_entry(reorder
->list
.next
,
232 struct padata_priv
, list
);
234 if (unlikely(next_overrun
)) {
235 for_each_cpu(cpu
, pd
->cpumask
) {
236 queue
= per_cpu_ptr(pd
->queue
, cpu
);
237 atomic_set(&queue
->num_obj
, 0);
241 spin_lock(&reorder
->lock
);
242 list_del_init(&padata
->list
);
243 atomic_dec(&pd
->reorder_objects
);
244 spin_unlock(&reorder
->lock
);
246 atomic_inc(&next_queue
->num_obj
);
251 queue
= per_cpu_ptr(pd
->queue
, smp_processor_id());
252 if (queue
->cpu_index
== next_queue
->cpu_index
) {
253 padata
= ERR_PTR(-ENODATA
);
257 padata
= ERR_PTR(-EINPROGRESS
);
262 static void padata_reorder(struct parallel_data
*pd
)
264 struct padata_priv
*padata
;
265 struct padata_queue
*queue
;
266 struct padata_instance
*pinst
= pd
->pinst
;
269 * We need to ensure that only one cpu can work on dequeueing of
270 * the reorder queue the time. Calculating in which percpu reorder
271 * queue the next object will arrive takes some time. A spinlock
272 * would be highly contended. Also it is not clear in which order
273 * the objects arrive to the reorder queues. So a cpu could wait to
274 * get the lock just to notice that there is nothing to do at the
275 * moment. Therefore we use a trylock and let the holder of the lock
276 * care for all the objects enqueued during the holdtime of the lock.
278 if (!spin_trylock_bh(&pd
->lock
))
282 padata
= padata_get_next(pd
);
285 * All reorder queues are empty, or the next object that needs
286 * serialization is parallel processed by another cpu and is
287 * still on it's way to the cpu's reorder queue, nothing to
290 if (!padata
|| PTR_ERR(padata
) == -EINPROGRESS
)
294 * This cpu has to do the parallel processing of the next
295 * object. It's waiting in the cpu's parallelization queue,
296 * so exit imediately.
298 if (PTR_ERR(padata
) == -ENODATA
) {
299 del_timer(&pd
->timer
);
300 spin_unlock_bh(&pd
->lock
);
304 queue
= per_cpu_ptr(pd
->queue
, padata
->cb_cpu
);
306 spin_lock(&queue
->serial
.lock
);
307 list_add_tail(&padata
->list
, &queue
->serial
.list
);
308 spin_unlock(&queue
->serial
.lock
);
310 queue_work_on(padata
->cb_cpu
, pinst
->wq
, &queue
->swork
);
313 spin_unlock_bh(&pd
->lock
);
316 * The next object that needs serialization might have arrived to
317 * the reorder queues in the meantime, we will be called again
318 * from the timer function if noone else cares for it.
320 if (atomic_read(&pd
->reorder_objects
)
321 && !(pinst
->flags
& PADATA_RESET
))
322 mod_timer(&pd
->timer
, jiffies
+ HZ
);
324 del_timer(&pd
->timer
);
329 static void padata_reorder_timer(unsigned long arg
)
331 struct parallel_data
*pd
= (struct parallel_data
*)arg
;
336 static void padata_serial_worker(struct work_struct
*work
)
338 struct padata_queue
*queue
;
339 struct parallel_data
*pd
;
340 LIST_HEAD(local_list
);
343 queue
= container_of(work
, struct padata_queue
, swork
);
346 spin_lock(&queue
->serial
.lock
);
347 list_replace_init(&queue
->serial
.list
, &local_list
);
348 spin_unlock(&queue
->serial
.lock
);
350 while (!list_empty(&local_list
)) {
351 struct padata_priv
*padata
;
353 padata
= list_entry(local_list
.next
,
354 struct padata_priv
, list
);
356 list_del_init(&padata
->list
);
358 padata
->serial(padata
);
359 atomic_dec(&pd
->refcnt
);
365 * padata_do_serial - padata serialization function
367 * @padata: object to be serialized.
369 * padata_do_serial must be called for every parallelized object.
370 * The serialization callback function will run with BHs off.
372 void padata_do_serial(struct padata_priv
*padata
)
375 struct padata_queue
*queue
;
376 struct parallel_data
*pd
;
381 queue
= per_cpu_ptr(pd
->queue
, cpu
);
383 spin_lock(&queue
->reorder
.lock
);
384 atomic_inc(&pd
->reorder_objects
);
385 list_add_tail(&padata
->list
, &queue
->reorder
.list
);
386 spin_unlock(&queue
->reorder
.lock
);
392 EXPORT_SYMBOL(padata_do_serial
);
394 /* Allocate and initialize the internal cpumask dependend resources. */
395 static struct parallel_data
*padata_alloc_pd(struct padata_instance
*pinst
,
396 const struct cpumask
*cpumask
)
398 int cpu
, cpu_index
, num_cpus
;
399 struct padata_queue
*queue
;
400 struct parallel_data
*pd
;
404 pd
= kzalloc(sizeof(struct parallel_data
), GFP_KERNEL
);
408 pd
->queue
= alloc_percpu(struct padata_queue
);
412 if (!alloc_cpumask_var(&pd
->cpumask
, GFP_KERNEL
))
415 cpumask_and(pd
->cpumask
, cpumask
, cpu_active_mask
);
417 for_each_cpu(cpu
, pd
->cpumask
) {
418 queue
= per_cpu_ptr(pd
->queue
, cpu
);
422 queue
->cpu_index
= cpu_index
;
425 INIT_LIST_HEAD(&queue
->reorder
.list
);
426 INIT_LIST_HEAD(&queue
->parallel
.list
);
427 INIT_LIST_HEAD(&queue
->serial
.list
);
428 spin_lock_init(&queue
->reorder
.lock
);
429 spin_lock_init(&queue
->parallel
.lock
);
430 spin_lock_init(&queue
->serial
.lock
);
432 INIT_WORK(&queue
->pwork
, padata_parallel_worker
);
433 INIT_WORK(&queue
->swork
, padata_serial_worker
);
434 atomic_set(&queue
->num_obj
, 0);
437 num_cpus
= cpumask_weight(pd
->cpumask
);
438 pd
->max_seq_nr
= (MAX_SEQ_NR
/ num_cpus
) * num_cpus
- 1;
440 setup_timer(&pd
->timer
, padata_reorder_timer
, (unsigned long)pd
);
441 atomic_set(&pd
->seq_nr
, -1);
442 atomic_set(&pd
->reorder_objects
, 0);
443 atomic_set(&pd
->refcnt
, 0);
445 spin_lock_init(&pd
->lock
);
450 free_percpu(pd
->queue
);
457 static void padata_free_pd(struct parallel_data
*pd
)
459 free_cpumask_var(pd
->cpumask
);
460 free_percpu(pd
->queue
);
464 /* Flush all objects out of the padata queues. */
465 static void padata_flush_queues(struct parallel_data
*pd
)
468 struct padata_queue
*queue
;
470 for_each_cpu(cpu
, pd
->cpumask
) {
471 queue
= per_cpu_ptr(pd
->queue
, cpu
);
472 flush_work(&queue
->pwork
);
475 del_timer_sync(&pd
->timer
);
477 if (atomic_read(&pd
->reorder_objects
))
480 for_each_cpu(cpu
, pd
->cpumask
) {
481 queue
= per_cpu_ptr(pd
->queue
, cpu
);
482 flush_work(&queue
->swork
);
485 BUG_ON(atomic_read(&pd
->refcnt
) != 0);
488 /* Replace the internal control stucture with a new one. */
489 static void padata_replace(struct padata_instance
*pinst
,
490 struct parallel_data
*pd_new
)
492 struct parallel_data
*pd_old
= pinst
->pd
;
494 pinst
->flags
|= PADATA_RESET
;
496 rcu_assign_pointer(pinst
->pd
, pd_new
);
500 padata_flush_queues(pd_old
);
501 padata_free_pd(pd_old
);
503 pinst
->flags
&= ~PADATA_RESET
;
507 * padata_set_cpumask - set the cpumask that padata should use
509 * @pinst: padata instance
510 * @cpumask: the cpumask to use
512 int padata_set_cpumask(struct padata_instance
*pinst
,
513 cpumask_var_t cpumask
)
515 struct parallel_data
*pd
;
518 mutex_lock(&pinst
->lock
);
522 pd
= padata_alloc_pd(pinst
, cpumask
);
528 cpumask_copy(pinst
->cpumask
, cpumask
);
530 padata_replace(pinst
, pd
);
535 mutex_unlock(&pinst
->lock
);
539 EXPORT_SYMBOL(padata_set_cpumask
);
541 static int __padata_add_cpu(struct padata_instance
*pinst
, int cpu
)
543 struct parallel_data
*pd
;
545 if (cpumask_test_cpu(cpu
, cpu_active_mask
)) {
546 pd
= padata_alloc_pd(pinst
, pinst
->cpumask
);
550 padata_replace(pinst
, pd
);
557 * padata_add_cpu - add a cpu to the padata cpumask
559 * @pinst: padata instance
562 int padata_add_cpu(struct padata_instance
*pinst
, int cpu
)
566 mutex_lock(&pinst
->lock
);
569 cpumask_set_cpu(cpu
, pinst
->cpumask
);
570 err
= __padata_add_cpu(pinst
, cpu
);
573 mutex_unlock(&pinst
->lock
);
577 EXPORT_SYMBOL(padata_add_cpu
);
579 static int __padata_remove_cpu(struct padata_instance
*pinst
, int cpu
)
581 struct parallel_data
*pd
;
583 if (cpumask_test_cpu(cpu
, cpu_online_mask
)) {
584 pd
= padata_alloc_pd(pinst
, pinst
->cpumask
);
588 padata_replace(pinst
, pd
);
595 * padata_remove_cpu - remove a cpu from the padata cpumask
597 * @pinst: padata instance
598 * @cpu: cpu to remove
600 int padata_remove_cpu(struct padata_instance
*pinst
, int cpu
)
604 mutex_lock(&pinst
->lock
);
607 cpumask_clear_cpu(cpu
, pinst
->cpumask
);
608 err
= __padata_remove_cpu(pinst
, cpu
);
611 mutex_unlock(&pinst
->lock
);
615 EXPORT_SYMBOL(padata_remove_cpu
);
618 * padata_start - start the parallel processing
620 * @pinst: padata instance to start
622 void padata_start(struct padata_instance
*pinst
)
624 mutex_lock(&pinst
->lock
);
625 pinst
->flags
|= PADATA_INIT
;
626 mutex_unlock(&pinst
->lock
);
628 EXPORT_SYMBOL(padata_start
);
631 * padata_stop - stop the parallel processing
633 * @pinst: padata instance to stop
635 void padata_stop(struct padata_instance
*pinst
)
637 mutex_lock(&pinst
->lock
);
638 pinst
->flags
&= ~PADATA_INIT
;
639 mutex_unlock(&pinst
->lock
);
641 EXPORT_SYMBOL(padata_stop
);
643 #ifdef CONFIG_HOTPLUG_CPU
644 static int padata_cpu_callback(struct notifier_block
*nfb
,
645 unsigned long action
, void *hcpu
)
648 struct padata_instance
*pinst
;
649 int cpu
= (unsigned long)hcpu
;
651 pinst
= container_of(nfb
, struct padata_instance
, cpu_notifier
);
655 case CPU_ONLINE_FROZEN
:
656 if (!cpumask_test_cpu(cpu
, pinst
->cpumask
))
658 mutex_lock(&pinst
->lock
);
659 err
= __padata_add_cpu(pinst
, cpu
);
660 mutex_unlock(&pinst
->lock
);
662 return notifier_from_errno(err
);
665 case CPU_DOWN_PREPARE
:
666 case CPU_DOWN_PREPARE_FROZEN
:
667 if (!cpumask_test_cpu(cpu
, pinst
->cpumask
))
669 mutex_lock(&pinst
->lock
);
670 err
= __padata_remove_cpu(pinst
, cpu
);
671 mutex_unlock(&pinst
->lock
);
673 return notifier_from_errno(err
);
676 case CPU_UP_CANCELED
:
677 case CPU_UP_CANCELED_FROZEN
:
678 if (!cpumask_test_cpu(cpu
, pinst
->cpumask
))
680 mutex_lock(&pinst
->lock
);
681 __padata_remove_cpu(pinst
, cpu
);
682 mutex_unlock(&pinst
->lock
);
684 case CPU_DOWN_FAILED
:
685 case CPU_DOWN_FAILED_FROZEN
:
686 if (!cpumask_test_cpu(cpu
, pinst
->cpumask
))
688 mutex_lock(&pinst
->lock
);
689 __padata_add_cpu(pinst
, cpu
);
690 mutex_unlock(&pinst
->lock
);
698 * padata_alloc - allocate and initialize a padata instance
700 * @cpumask: cpumask that padata uses for parallelization
701 * @wq: workqueue to use for the allocated padata instance
703 struct padata_instance
*padata_alloc(const struct cpumask
*cpumask
,
704 struct workqueue_struct
*wq
)
706 struct padata_instance
*pinst
;
707 struct parallel_data
*pd
;
709 pinst
= kzalloc(sizeof(struct padata_instance
), GFP_KERNEL
);
715 pd
= padata_alloc_pd(pinst
, cpumask
);
719 if (!alloc_cpumask_var(&pinst
->cpumask
, GFP_KERNEL
))
722 rcu_assign_pointer(pinst
->pd
, pd
);
726 cpumask_copy(pinst
->cpumask
, cpumask
);
730 #ifdef CONFIG_HOTPLUG_CPU
731 pinst
->cpu_notifier
.notifier_call
= padata_cpu_callback
;
732 pinst
->cpu_notifier
.priority
= 0;
733 register_hotcpu_notifier(&pinst
->cpu_notifier
);
738 mutex_init(&pinst
->lock
);
750 EXPORT_SYMBOL(padata_alloc
);
753 * padata_free - free a padata instance
755 * @padata_inst: padata instance to free
757 void padata_free(struct padata_instance
*pinst
)
763 #ifdef CONFIG_HOTPLUG_CPU
764 unregister_hotcpu_notifier(&pinst
->cpu_notifier
);
767 padata_flush_queues(pinst
->pd
);
770 padata_free_pd(pinst
->pd
);
771 free_cpumask_var(pinst
->cpumask
);
774 EXPORT_SYMBOL(padata_free
);