restructuring
[lwes-java.git] / src / main / java / org / lwes / listener / DatagramDequeuer.java
blob2c34fbb33e3d31f73b924ff8fec40c6f37fd8a88
1 package org.lwes.listener;
3 import java.io.IOException;
4 import java.net.DatagramPacket;
6 import org.lwes.Event;
7 import org.lwes.EventFactory;
8 import org.lwes.util.IPAddress;
9 import org.lwes.util.Log;
11 public class DatagramDequeuer extends ThreadedDequeuer {
12 private boolean running = false;
14 /* an event factory */
15 private EventFactory factory = new EventFactory();
17 public DatagramDequeuer() {
20 public void initialize() throws IOException {
21 super.initialize();
24 public synchronized void shutdown() {
25 running = false;
28 public void run() {
29 running = true;
31 while (running) {
32 if(hasPending()) {
33 try {
34 DatagramQueueElement element = (DatagramQueueElement) queue.remove(0);
35 handleElement(element);
36 } catch(UnsupportedOperationException uoe) {
37 // not a problem, someone grabbed the event before we did
38 } catch(Exception e) {
39 Log.error("Error in dequeueing event for processing", e);
41 } else {
42 synchronized(queue) {
43 try {
44 queue.wait();
45 } catch(InterruptedException e) {}
51 /**
52 * Determines whether the collection of pending tasks has any elements. Note
53 * that if this returns false at some point and then someone calls
54 * <code>addTask()</code> the state will change to true. Do not use this
55 * function to determine when there cannot be any more tasks.
57 * @return true iff there are any pending tasks.
59 protected final boolean hasPending() {
60 return queue.size() > 0;
63 protected void handleElement(DatagramQueueElement element) {
64 if(element == null) return;
66 DatagramPacket packet = element.getPacket();
67 if(packet == null) return;
69 /* get some metadata */
70 long timestamp = element.getTimestamp();
71 IPAddress address = new IPAddress(packet.getAddress());
72 int port = packet.getPort();
74 /* now try to deserialize the packet */
75 try {
76 /* don't validate the event for now to save time */
77 Event event = factory.createEvent(packet.getData(), false);
78 event.setInt64(Event.RECEIPT_TIME, timestamp);
79 event.setIPAddress(Event.SENDER_IP, address);
80 event.setUInt16(Event.SENDER_PORT, port);
81 Log.trace("Dispatching event " + event.toString());
82 dispatchEvent(event);
83 } catch(Exception e) {
84 Log.warning("Unable to deserialize event in handleElement()", e);