1 package org
.lwes
.listener
;
3 import org
.lwes
.util
.Log
;
5 import java
.io
.IOException
;
6 import java
.net
.DatagramPacket
;
7 import java
.net
.DatagramSocket
;
8 import java
.net
.InetAddress
;
9 import java
.net
.MulticastSocket
;
12 * This class listens to packets sent via UDP, and enqueues them for processing.
13 * It detects multicast addresses and listens to multicast groups if one is detected, otherwise
14 * it listens for unicast datagrams.
16 * @author Anthony Molinaro
17 * @author Michael P. Lum
20 public class DatagramEnqueuer
extends ThreadedEnqueuer
{
21 /* max datagram size in bytes */
22 private static final int MAX_DATAGRAM_SIZE
= 65535;
23 private String DEFAULT_ADDRESS
= "224.0.0.69";
25 /* the default network settings */
26 private InetAddress address
= null;
27 private int port
= 9191;
28 private InetAddress iface
= null;
31 /* the network socket */
32 private DatagramSocket socket
= null;
34 /* a running buffer */
35 private byte[] buffer
= null;
38 private boolean running
= false;
40 public DatagramEnqueuer() {
42 buffer
= new byte[MAX_DATAGRAM_SIZE
];
46 * Gets the network address being used for this listener.
49 public InetAddress
getAddress() {
54 * Sets the address being used for this listener.
55 * @param address the address to use
57 public void setAddress(InetAddress address
) {
58 this.address
= address
;
62 * Gets the port being used for this listener.
63 * @return the port number
65 public int getPort() {
70 * Sets the port being used for this listener.
71 * @param port the port number
73 public void setPort(int port
) {
78 * Gets the network interface being used for this listener.
79 * @return the interface
81 public InetAddress
getInterface() {
86 * Sets the network interface to use for this listener.
87 * @param iface the interface
89 public void setInterface(InetAddress iface
) {
94 * Returns the multicast TTL (if applicable).
95 * Applies to multicast listeners only.
96 * @return the TTL value
98 public int getTimeToLive() {
103 * Sets the multicast TTL. This typically does not need to be modified.
104 * Applies to multicast listeners only.
105 * @param ttl the multicast TTL value.
107 public void setTimeToLive(int ttl
) {
111 public void initialize() throws IOException
{
112 if (address
== null) {
113 address
= InetAddress
.getByName(DEFAULT_ADDRESS
);
116 if (address
.isMulticastAddress()) {
117 socket
= new MulticastSocket(port
);
118 ((MulticastSocket
) socket
).setTimeToLive(ttl
);
120 ((MulticastSocket
) socket
).setInterface(iface
);
122 ((MulticastSocket
) socket
).joinGroup(address
);
126 socket
= new DatagramSocket(port
, iface
);
129 socket
= new DatagramSocket(port
, address
);
134 public synchronized void shutdown() {
139 * While running, repeatedly read datagrams and insert them into the queue along with the
140 * receipt time and other metadata.
147 DatagramPacket datagram
= new DatagramPacket(buffer
, buffer
.length
);
148 socket
.receive(datagram
);
150 /* we record the time *after* the receive because it blocks */
151 long receiptTime
= System
.currentTimeMillis();
153 /* copy the data into a tight buffer so we can release the loose buffer */
154 final byte[] tightBuffer
= new byte[datagram
.getLength()];
155 System
.arraycopy(datagram
.getData(), 0, tightBuffer
, 0, tightBuffer
.length
);
156 datagram
.setData(tightBuffer
);
158 /* create an element for the queue */
159 DatagramQueueElement element
= new DatagramQueueElement();
160 element
.setPacket(datagram
);
161 element
.setTimestamp(receiptTime
);
163 /* add the element to the queue and notify everyone there's work to do */
164 queue
.add(0, element
);
165 synchronized(queue
) {
168 } catch(Exception e
) {
169 Log
.warning("Unable to read datagram", e
);