1 package org
.lwes
.listener
;
3 import org
.lwes
.EventSystemException
;
4 import org
.lwes
.util
.Log
;
6 import java
.util
.Collections
;
7 import java
.util
.LinkedList
;
11 * A threaded, queueing event processor. This class requires setting a class to
12 * enqueue events (for example, a network listener) and a class to dequeue
13 * events (for example, writing to disk).
15 * @author Anthony Molinaro
16 * @author Michael P. Lum
18 public class ThreadedProcessor
implements Runnable
{
19 /* a flag to tell whether or not the thread is running */
20 private boolean running
= false;
22 /* the number of seconds to sleep */
23 private int seconds
= 30;
25 /* the thread placing events into the queue */
26 private ThreadedEnqueuer enqueuer
= null;
28 /* the thread dispatching events from the queue */
29 private ThreadedDequeuer dequeuer
= null;
31 /* the enqueuer thread */
32 private Thread enqueuerThread
= null;
34 /* the dequeuer thread */
35 private Thread dequeuerThread
= null;
37 /* a watcher thread (myself) */
38 private Thread watcherThread
= null;
40 /* the queue for events */
41 private List
<QueueElement
> queue
= null;
43 /* the priority for the enqueuing thread */
44 int enqueuerPriority
= Thread
.NORM_PRIORITY
;
46 /* the priority for the dequeuing thread */
47 int dequeuerPriority
= Thread
.NORM_PRIORITY
;
49 /* the priority for the watcher thread */
50 int watcherPriority
= Thread
.MIN_PRIORITY
;
53 * Default constructor.
55 public ThreadedProcessor() {
59 * Gets the enqueuer being used by this event processor
60 * @return the ThreadedEnqueuer being used by this processor
62 public ThreadedEnqueuer
getEnqueuer() {
67 * Sets the enqueuer to use for this event processor.
68 * @param enqueuer the ThreadedEnqueuer to use
70 public void setEnqueuer(ThreadedEnqueuer enqueuer
) {
71 this.enqueuer
= enqueuer
;
75 * Gets the dequeuer being used by this event processor
76 * @return the ThreadedDequeuer being used by this processor
78 public ThreadedDequeuer
getDequeuer() {
83 * Sets the dequeuer to use for this event processor.
84 * @param dequeuer the ThreadedDequeuer to use
86 public void setDequeuer(ThreadedDequeuer dequeuer
) {
87 this.dequeuer
= dequeuer
;
91 * Returns the List being used as the queue
92 * @return the List object
94 public synchronized List
<QueueElement
> getQueue() {
99 * Sets the List being used as the queue.
100 * Warning: this list needs to be thread-synchronized!
101 * @param queue the List to use for this processor
103 public synchronized void setQueue(List
<QueueElement
> queue
) {
108 * Returns the thread priority of the enqueuer.
109 * @return the thread priority
111 public int getEnqueuerPriority() {
112 return this.enqueuerPriority
;
116 * Sets the thread priority of the enqueuer.
117 * @param priority the thread priority to use
119 public void setEnqueuerPriority(int priority
) {
120 this.enqueuerPriority
= priority
;
124 * Returns the thread priority of the dequeuer.
125 * @return the thread priority
127 public int getDequeuerPriority() {
128 return this.dequeuerPriority
;
132 * Sets the thread priority of the dequeuer
133 * @param priority the thread priority to use
135 public void setDequeuerPriority(int priority
) {
136 this.dequeuerPriority
= priority
;
140 * Initializes the processor to handle events. Starts the enqueuer and
143 * @throws EventSystemException
144 * if there is a problem setting up the processor
146 public void initialize() throws EventSystemException
{
147 if (enqueuer
== null) {
148 throw new EventSystemException(
149 "Event enqueuer is not set, call setEnqueuer() first");
152 if (dequeuer
== null) {
153 throw new EventSystemException(
154 "Event dequeuer is not set call setDequeuer() first");
157 /* create a queue if it doesn't exist */
160 .synchronizedList(new LinkedList
<QueueElement
>());
163 /* make the queue available to the enqueuer and dequeuer */
164 dequeuer
.setQueue(queue
);
165 enqueuer
.setQueue(queue
);
168 dequeuer
.initialize();
169 dequeuerThread
= new Thread(dequeuer
, "Dequeueing Thread");
170 dequeuerThread
.setPriority(dequeuerPriority
);
171 dequeuerThread
.start();
173 enqueuer
.initialize();
174 enqueuerThread
= new Thread(enqueuer
, "Enqueueing Thread");
175 enqueuerThread
.setPriority(enqueuerPriority
);
176 enqueuerThread
.start();
178 watcherThread
= new Thread(this, "Watcher Thread");
179 watcherThread
.setPriority(watcherPriority
);
180 watcherThread
.start();
181 } catch (Exception ie
) {
182 throw new EventSystemException("Unable to start ThreadedProcessor",
188 * Shuts down the event listener. Stops the enqueuer and dequeuer threads.
190 public synchronized void shutdown() {
197 * The thread's execution loop. Doesn't do much because the enqueue and
198 * dequeue threads do the heavy lifting.
200 public final void run() {
204 Thread
.sleep(seconds
* 1000L);
205 } catch (InterruptedException ie
) {
206 Log
.warning("ThreadedProcessor interrupted", ie
);