1 package org
.lwes
.listener
;
3 import java
.io
.IOException
;
4 import java
.net
.DatagramPacket
;
7 import org
.lwes
.EventFactory
;
8 import org
.lwes
.util
.IPAddress
;
9 import org
.lwes
.util
.Log
;
11 public class DatagramDequeuer
extends ThreadedDequeuer
{
12 private boolean running
= false;
14 /* an event factory */
15 private EventFactory factory
= new EventFactory();
17 public DatagramDequeuer() {
20 public void initialize() throws IOException
{
24 public synchronized void shutdown() {
34 DatagramQueueElement element
= (DatagramQueueElement
) queue
.remove(0);
35 handleElement(element
);
36 } catch(UnsupportedOperationException uoe
) {
37 // not a problem, someone grabbed the event before we did
38 } catch(Exception e
) {
39 Log
.error("Error in dequeueing event for processing", e
);
45 } catch(InterruptedException e
) {}
52 * Determines whether the collection of pending tasks has any elements. Note
53 * that if this returns false at some point and then someone calls
54 * <code>addTask()</code> the state will change to true. Do not use this
55 * function to determine when there cannot be any more tasks.
57 * @return true iff there are any pending tasks.
59 protected final boolean hasPending() {
60 return queue
.size() > 0;
63 protected void handleElement(DatagramQueueElement element
) {
64 if(element
== null) return;
66 DatagramPacket packet
= element
.getPacket();
67 if(packet
== null) return;
69 /* get some metadata */
70 long timestamp
= element
.getTimestamp();
71 IPAddress address
= new IPAddress(packet
.getAddress());
72 int port
= packet
.getPort();
74 /* now try to deserialize the packet */
76 /* don't validate the event for now to save time */
77 Event event
= factory
.createEvent(packet
.getData(), false);
78 event
.setInt64(Event
.RECEIPT_TIME
, timestamp
);
79 event
.setIPAddress(Event
.SENDER_IP
, address
);
80 event
.setUInt16(Event
.SENDER_PORT
, port
);
81 Log
.trace("Dispatching event " + event
.toString());
83 } catch(Exception e
) {
84 Log
.warning("Unable to deserialize event in handleElement()", e
);