initial sourceforge import
[lwes-contrib-hive-serde.git] / src / main / java / org / lwes / hadoop / io / DatagramPacketInputStream.java
blobccdea5fcda8c92a03e71df66cbf07e45fbec8867
1 package org.lwes.hadoop.io;
4 import java.io.DataInputStream;
5 import java.io.EOFException;
6 import java.io.IOException;
7 import java.io.InputStream;
8 import java.io.StreamCorruptedException;
9 import java.text.SimpleDateFormat;
10 import java.util.Date;
12 import org.lwes.serializer.DeserializerState;
13 import org.lwes.serializer.Deserializer;
14 import org.lwes.Event;
15 import org.lwes.EventFactory;
16 import org.lwes.util.IPAddress;
18 public class DatagramPacketInputStream extends DataInputStream
20 static int HEADER_LENGTH = 22;
21 SimpleDateFormat dateFormatter = new SimpleDateFormat();
22 private EventFactory ef = null;
23 private boolean typeChecking = false; /* act as it did before */
25 public void setEventFactory(EventFactory eventFactory)
26 { this.ef = eventFactory; }
27 public EventFactory getEventFactory()
28 { return ef; }
30 public void setTypeChecking(boolean typeChecking)
31 { this.typeChecking = typeChecking; }
32 public boolean isTypeChecking()
33 { return typeChecking; }
35 public DatagramPacketInputStream(InputStream is)
36 throws IOException, StreamCorruptedException
38 super(is);
41 public Event readEvent()
43 Event event = null;
45 if ( ef == null )
47 ef = new EventFactory();
49 try {
50 DeserializerState myState = new DeserializerState();
52 byte[] header = new byte[HEADER_LENGTH];
53 readFully(header,0,HEADER_LENGTH);
54 int length = (int)(Deserializer.deserializeUINT16(myState,header));
55 long time = Deserializer.deserializeINT64(myState,header);
56 IPAddress source_inet =
57 new IPAddress(Deserializer.deserializeIPADDR(myState,header));
58 int source_port =
59 (int)(Deserializer.deserializeUINT16(myState,header));
61 int site_id =
62 (int)(Deserializer.deserializeUINT16(myState,header));
64 byte [] bytes = new byte[length];
65 readFully(bytes,0,length);
66 if ( isTypeChecking() )
68 event = ef.createEvent(bytes);
70 else
72 // create event without checks
73 event = new Event(bytes,false,null);
75 event.setInt64("ReceiptTime",time);
76 event.setIPAddress("SenderIP",source_inet);
77 event.setUInt16("SenderPort",source_port);
78 event.setUInt16("SiteID",site_id);
79 } catch ( EOFException e ) {
80 event=null;
81 } catch ( Exception e ) {
82 System.err.println("Problem with OutputStreamFromDataGrams "+e);
83 e.printStackTrace();
84 event=null;
86 return event;
89 public byte[] readDataGramBytes()
91 byte [] bytes = null;
92 try {
93 DeserializerState myState = new DeserializerState();
95 byte[] header = new byte[HEADER_LENGTH];
96 readFully(header,0,HEADER_LENGTH);
97 int length = (int)(Deserializer.deserializeUINT16(myState,header));
98 long time = Deserializer.deserializeINT64(myState,header);
99 IPAddress source_inet =
100 new IPAddress(Deserializer.deserializeIPADDR(myState,header));
101 int source_port =
102 (int)(Deserializer.deserializeUINT16(myState,header));
103 int site_id =
104 (int)(Deserializer.deserializeUINT16(myState,header));
106 bytes = new byte[length];
107 readFully(bytes,0,length);
108 } catch ( EOFException e ) {
109 } catch ( Exception e ) {
110 System.err.println("Problem with OutputStreamFromDataGrams "+e);
112 return bytes;
115 public String readDataGram()
117 StringBuffer sb = new StringBuffer();
118 int total = -1;
119 try {
120 DeserializerState myState = new DeserializerState();
122 byte[] header = new byte[HEADER_LENGTH];
123 total = read(header,0,HEADER_LENGTH);
125 if ( total != -1 )
127 int length = (int)(Deserializer.deserializeUINT16(myState,header));
128 long time = Deserializer.deserializeINT64(myState,header);
129 IPAddress source_inet =
130 new IPAddress(Deserializer.deserializeIPADDR(myState,header));
131 int source_port =
132 (int)(Deserializer.deserializeUINT16(myState,header));
133 int site_id =
134 (int)(Deserializer.deserializeUINT16(myState,header));
136 dateFormatter.applyPattern("MM/dd/yyyy HH:mm:ss.SSS");
137 Date date = new Date(time);
138 sb.append(dateFormatter.format(date));
140 byte [] bytes = new byte[length];
141 total = read(bytes,0,length);
144 } catch ( Exception e ) {
145 System.err.println("Problem with OutputStreamFromDataGrams "+e);
146 e.printStackTrace();
148 if ( total == -1 ) return null;
150 return sb.toString();