restructuring
[lwes-java.git] / src / main / java / org / lwes / listener / ThreadedProcessor.java
blobb5ccce1b3db734f1d2c65f201a8a23ba862e1bf2
1 package org.lwes.listener;
3 import org.lwes.EventSystemException;
4 import org.lwes.util.Log;
6 import java.util.Collections;
7 import java.util.LinkedList;
8 import java.util.List;
10 /**
11 * A threaded, queueing event processor. This class requires setting a class to
12 * enqueue events (for example, a network listener) and a class to dequeue
13 * events (for example, writing to disk).
15 * @author Anthony Molinaro
16 * @author Michael P. Lum
18 public class ThreadedProcessor implements Runnable {
19 /* a flag to tell whether or not the thread is running */
20 private boolean running = false;
22 /* the number of seconds to sleep */
23 private int seconds = 30;
25 /* the thread placing events into the queue */
26 private ThreadedEnqueuer enqueuer = null;
28 /* the thread dispatching events from the queue */
29 private ThreadedDequeuer dequeuer = null;
31 /* the enqueuer thread */
32 private Thread enqueuerThread = null;
34 /* the dequeuer thread */
35 private Thread dequeuerThread = null;
37 /* a watcher thread (myself) */
38 private Thread watcherThread = null;
40 /* the queue for events */
41 private List<QueueElement> queue = null;
43 /* the priority for the enqueuing thread */
44 int enqueuerPriority = Thread.NORM_PRIORITY;
46 /* the priority for the dequeuing thread */
47 int dequeuerPriority = Thread.NORM_PRIORITY;
49 /* the priority for the watcher thread */
50 int watcherPriority = Thread.MIN_PRIORITY;
52 /**
53 * Default constructor.
55 public ThreadedProcessor() {
58 /**
59 * Gets the enqueuer being used by this event processor
60 * @return the ThreadedEnqueuer being used by this processor
62 public ThreadedEnqueuer getEnqueuer() {
63 return this.enqueuer;
66 /**
67 * Sets the enqueuer to use for this event processor.
68 * @param enqueuer the ThreadedEnqueuer to use
70 public void setEnqueuer(ThreadedEnqueuer enqueuer) {
71 this.enqueuer = enqueuer;
74 /**
75 * Gets the dequeuer being used by this event processor
76 * @return the ThreadedDequeuer being used by this processor
78 public ThreadedDequeuer getDequeuer() {
79 return this.dequeuer;
82 /**
83 * Sets the dequeuer to use for this event processor.
84 * @param dequeuer the ThreadedDequeuer to use
86 public void setDequeuer(ThreadedDequeuer dequeuer) {
87 this.dequeuer = dequeuer;
90 /**
91 * Returns the List being used as the queue
92 * @return the List object
94 public synchronized List<QueueElement> getQueue() {
95 return this.queue;
98 /**
99 * Sets the List being used as the queue.
100 * Warning: this list needs to be thread-synchronized!
101 * @param queue the List to use for this processor
103 public synchronized void setQueue(List<QueueElement> queue) {
104 this.queue = queue;
108 * Returns the thread priority of the enqueuer.
109 * @return the thread priority
111 public int getEnqueuerPriority() {
112 return this.enqueuerPriority;
116 * Sets the thread priority of the enqueuer.
117 * @param priority the thread priority to use
119 public void setEnqueuerPriority(int priority) {
120 this.enqueuerPriority = priority;
124 * Returns the thread priority of the dequeuer.
125 * @return the thread priority
127 public int getDequeuerPriority() {
128 return this.dequeuerPriority;
132 * Sets the thread priority of the dequeuer
133 * @param priority the thread priority to use
135 public void setDequeuerPriority(int priority) {
136 this.dequeuerPriority = priority;
140 * Initializes the processor to handle events. Starts the enqueuer and
141 * dequeuer threads.
143 * @throws EventSystemException
144 * if there is a problem setting up the processor
146 public void initialize() throws EventSystemException {
147 if (enqueuer == null) {
148 throw new EventSystemException(
149 "Event enqueuer is not set, call setEnqueuer() first");
152 if (dequeuer == null) {
153 throw new EventSystemException(
154 "Event dequeuer is not set call setDequeuer() first");
157 /* create a queue if it doesn't exist */
158 if (queue == null) {
159 queue = Collections
160 .synchronizedList(new LinkedList<QueueElement>());
163 /* make the queue available to the enqueuer and dequeuer */
164 dequeuer.setQueue(queue);
165 enqueuer.setQueue(queue);
167 try {
168 dequeuer.initialize();
169 dequeuerThread = new Thread(dequeuer, "Dequeueing Thread");
170 dequeuerThread.setPriority(dequeuerPriority);
171 dequeuerThread.start();
173 enqueuer.initialize();
174 enqueuerThread = new Thread(enqueuer, "Enqueueing Thread");
175 enqueuerThread.setPriority(enqueuerPriority);
176 enqueuerThread.start();
178 watcherThread = new Thread(this, "Watcher Thread");
179 watcherThread.setPriority(watcherPriority);
180 watcherThread.start();
181 } catch (Exception ie) {
182 throw new EventSystemException("Unable to start ThreadedProcessor",
183 ie);
188 * Shuts down the event listener. Stops the enqueuer and dequeuer threads.
190 public synchronized void shutdown() {
191 running = false;
192 dequeuer.shutdown();
193 enqueuer.shutdown();
197 * The thread's execution loop. Doesn't do much because the enqueue and
198 * dequeue threads do the heavy lifting.
200 public final void run() {
201 running = true;
202 while (running) {
203 try {
204 Thread.sleep(seconds * 1000L);
205 } catch (InterruptedException ie) {
206 Log.warning("ThreadedProcessor interrupted", ie);