restructuring
[lwes-java.git] / src / main / java / org / lwes / listener / ThreadedDequeuer.java
blobfd5439f34617995107dcf5dcf90aee82cdf101b0
1 package org.lwes.listener;
3 import java.io.IOException;
4 import java.util.Collections;
5 import java.util.Iterator;
6 import java.util.LinkedList;
7 import java.util.List;
8 import java.util.HashMap;
10 import org.lwes.Event;
12 /**
13 * An abstract consumer of events.
15 * @author Michael P. Lum
18 public abstract class ThreadedDequeuer implements Runnable {
19 /* the minimum and maximum threads to allow */
20 private static final int MIN_THREADS = 1;
21 private static final int MAX_THREADS = 64;
23 protected List<QueueElement> queue = null;
24 private HashMap<String, EventHandler> handlers = null;
26 /* the maximum number of threads allowed */
27 private int maxThreads = 20;
29 /* the event dispatchers */
30 private List<ThreadedEventDispatcher> idleProcessors = null;
32 /**
33 * Default constructor.
35 public ThreadedDequeuer() {
38 /**
39 * Returns the queue to use for this dequeuer
41 * @return the List queue
43 public synchronized List<QueueElement> getQueue() {
44 return this.queue;
47 /**
48 * Sets the queue to use for this dequeuer. Warning: this List must be
49 * thread-synchronized!
51 * @param queue
52 * the thread-synchronized List element
54 public synchronized void setQueue(List<QueueElement> queue) {
55 this.queue = queue;
58 /**
59 * Gets the maximum number of threads allowed in the system
60 * @return the number of threads
62 public int getMaxThreads() {
63 return this.maxThreads;
66 /**
67 * Sets the maximum number of threads allowed in the system, up to 64. The default is 20.
68 * @param threads the number of threads to allow.
70 public synchronized void setMaxThreads(int threads) {
71 if(threads >= MIN_THREADS && threads <= MAX_THREADS) {
72 this.maxThreads = threads;
76 /**
77 * Get an event handler by name. Returns null if the handler does not exist.
78 * @param name the name of the event handler to fetch
79 * @return the EventHandler
81 public EventHandler getHandler(String name) {
82 if(handlers == null || name == null) return null;
83 return (EventHandler) handlers.get(name);
86 /**
87 * Add an event handler to this dequeuer. Events coming into the system will
88 * call all handlers via their callback.
90 * @param handler
91 * the actual handler
93 public void addHandler(EventHandler handler) {
94 if (handler == null)
95 return;
96 addHandler("handler" + handler.hashCode(), handler);
99 /**
100 * Adds a handler to this dequeuer with a specified name. Events coming into
101 * the system will call all handlers via their callback.
103 * @param name
104 * the name of this handler
105 * @param handler
106 * the actual handler
108 public void addHandler(String name, EventHandler handler) {
109 if (name == null || handler == null)
110 return;
111 if (handlers == null)
112 handlers = new HashMap<String, EventHandler>();
113 if (!handlers.containsKey(name)) {
114 handlers.put(name, handler);
119 * Removes a handler so it no longer is processing events
121 * @param handler
122 * the handler to remove
124 public void removeHandler(EventHandler handler) {
125 if (handlers == null || handler == null)
126 return;
127 handlers.remove("handler" + handler.hashCode());
131 * Removes a handler by name so it no longer is processing events
133 * @param name
134 * the name of the handler to remove
136 public void removeHandler(String name) {
137 if (handlers == null || name == null)
138 return;
139 handlers.remove(name);
143 * Default initialize() method. Should be overridden by classes extending
144 * ThreadedEnqueuer.
146 public void initialize() throws IOException {
147 idleProcessors = Collections.synchronizedList(new LinkedList<ThreadedEventDispatcher>());
148 while(idleProcessors.size() < maxThreads) {
149 makeAvailable(new ThreadedEventDispatcher(this));
154 * Default shutdown() method. Should be overridden by classes extending
155 * ThreadedDequeuer.
157 public void shutdown() {
161 * Default run loop. Should be overridden by classes extending ThreadedDequeuer
163 public void run() {
167 * Handles events and calls EventHandler handlers. Typically called from the dequeuer implementation
168 * to invoke the handler callbacks.
169 * @param event the Event to dispatch to the EventHandlers
171 protected void dispatchEvent(Event event) {
172 if(handlers == null) return;
174 Iterator<String> iterator = handlers.keySet().iterator();
175 while(iterator.hasNext()) {
176 EventHandler handler = (EventHandler) handlers.get(iterator.next());
177 ThreadedEventDispatcher d = getIdleProcessor();
178 d.setTask(handler, event);
183 * Gets an idle processor from the list
184 * @return a ThreadedEventDispatcher
186 protected ThreadedEventDispatcher getIdleProcessor() {
187 synchronized(idleProcessors) {
188 while(idleProcessors.isEmpty()) {
189 try {
190 idleProcessors.wait();
191 } catch(InterruptedException ie) {}
194 return (ThreadedEventDispatcher) idleProcessors.remove(0);
199 * Makes a process available for dispatch
200 * @param dispatcher the dispatcher to make available
202 public synchronized void makeAvailable(ThreadedEventDispatcher dispatcher) {
203 if(dispatcher == null) return;
205 if(dispatcher.isIdle()) {
206 idleProcessors.add(dispatcher);
207 synchronized(idleProcessors) {
208 idleProcessors.notifyAll();
210 } else {
211 throw new RuntimeException("Active processor made available");