1 package org
.lwes
.hadoop
.io
;
4 import java
.io
.DataInputStream
;
5 import java
.io
.EOFException
;
6 import java
.io
.IOException
;
7 import java
.io
.InputStream
;
8 import java
.io
.StreamCorruptedException
;
9 import java
.text
.SimpleDateFormat
;
10 import java
.util
.Date
;
12 import org
.lwes
.serializer
.DeserializerState
;
13 import org
.lwes
.serializer
.Deserializer
;
14 import org
.lwes
.Event
;
15 import org
.lwes
.EventFactory
;
16 import org
.lwes
.util
.IPAddress
;
18 public class DatagramPacketInputStream
extends DataInputStream
20 static int HEADER_LENGTH
= 22;
21 SimpleDateFormat dateFormatter
= new SimpleDateFormat();
22 private EventFactory ef
= null;
23 private boolean typeChecking
= false; /* act as it did before */
25 public void setEventFactory(EventFactory eventFactory
)
26 { this.ef
= eventFactory
; }
27 public EventFactory
getEventFactory()
30 public void setTypeChecking(boolean typeChecking
)
31 { this.typeChecking
= typeChecking
; }
32 public boolean isTypeChecking()
33 { return typeChecking
; }
35 public DatagramPacketInputStream(InputStream is
)
36 throws IOException
, StreamCorruptedException
41 public Event
readEvent()
47 ef
= new EventFactory();
50 DeserializerState myState
= new DeserializerState();
52 byte[] header
= new byte[HEADER_LENGTH
];
53 readFully(header
,0,HEADER_LENGTH
);
54 int length
= (int)(Deserializer
.deserializeUINT16(myState
,header
));
55 long time
= Deserializer
.deserializeINT64(myState
,header
);
56 IPAddress source_inet
=
57 new IPAddress(Deserializer
.deserializeIPADDR(myState
,header
));
59 (int)(Deserializer
.deserializeUINT16(myState
,header
));
62 (int)(Deserializer
.deserializeUINT16(myState
,header
));
64 byte [] bytes
= new byte[length
];
65 readFully(bytes
,0,length
);
66 if ( isTypeChecking() )
68 event
= ef
.createEvent(bytes
);
72 // create event without checks
73 event
= new Event(bytes
,false,null);
75 event
.setInt64("ReceiptTime",time
);
76 event
.setIPAddress("SenderIP",source_inet
);
77 event
.setUInt16("SenderPort",source_port
);
78 event
.setUInt16("SiteID",site_id
);
79 } catch ( EOFException e
) {
81 } catch ( Exception e
) {
82 System
.err
.println("Problem with OutputStreamFromDataGrams "+e
);
89 public byte[] readDataGramBytes()
93 DeserializerState myState
= new DeserializerState();
95 byte[] header
= new byte[HEADER_LENGTH
];
96 readFully(header
,0,HEADER_LENGTH
);
97 int length
= (int)(Deserializer
.deserializeUINT16(myState
,header
));
98 long time
= Deserializer
.deserializeINT64(myState
,header
);
99 IPAddress source_inet
=
100 new IPAddress(Deserializer
.deserializeIPADDR(myState
,header
));
102 (int)(Deserializer
.deserializeUINT16(myState
,header
));
104 (int)(Deserializer
.deserializeUINT16(myState
,header
));
106 bytes
= new byte[length
];
107 readFully(bytes
,0,length
);
108 } catch ( EOFException e
) {
109 } catch ( Exception e
) {
110 System
.err
.println("Problem with OutputStreamFromDataGrams "+e
);
115 public String
readDataGram()
117 StringBuffer sb
= new StringBuffer();
120 DeserializerState myState
= new DeserializerState();
122 byte[] header
= new byte[HEADER_LENGTH
];
123 total
= read(header
,0,HEADER_LENGTH
);
127 int length
= (int)(Deserializer
.deserializeUINT16(myState
,header
));
128 long time
= Deserializer
.deserializeINT64(myState
,header
);
129 IPAddress source_inet
=
130 new IPAddress(Deserializer
.deserializeIPADDR(myState
,header
));
132 (int)(Deserializer
.deserializeUINT16(myState
,header
));
134 (int)(Deserializer
.deserializeUINT16(myState
,header
));
136 dateFormatter
.applyPattern("MM/dd/yyyy HH:mm:ss.SSS");
137 Date date
= new Date(time
);
138 sb
.append(dateFormatter
.format(date
));
140 byte [] bytes
= new byte[length
];
141 total
= read(bytes
,0,length
);
144 } catch ( Exception e
) {
145 System
.err
.println("Problem with OutputStreamFromDataGrams "+e
);
148 if ( total
== -1 ) return null;
150 return sb
.toString();