2 * jnettop, network online traffic visualiser
3 * Copyright (C) 2002-2005 Jakub Skopal
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
25 #include "jprocessor.h"
28 #include "jresolver.h"
30 GThread
*processorThread
;
31 GThread
*heartbeatThread
;
32 GHashTable
*streamTable
;
33 GMutex
*streamTableMutex
;
34 GPtrArray
*streamArray
;
35 GMutex
*streamArrayMutex
;
37 jprocessor_stats jprocessor_Stats
;
38 guint jprocessor_LocalAggregation
;
39 guint jprocessor_RemoteAggregation
;
40 gboolean jprocessor_ContentFiltering
;
41 gboolean jprocessor_Sorting
;
42 GCompareFunc jprocessor_SortingFunction
;
43 gint jprocessor_MaxDeadTime
;
44 ProcessStreamsFunc jprocessor_ProcessStreamsFunc
;
46 static void freeStream(gpointer ptr
) {
47 jbase_stream
*s
= (jbase_stream
*)ptr
;
48 if (s
->filterDataFreeFunc
)
49 s
->filterDataFreeFunc(s
);
53 static void markAllAsDead() {
55 g_mutex_lock(streamArrayMutex
);
56 for (i
=0; i
<streamArray
->len
; i
++) {
57 jbase_stream
*s
= (jbase_stream
*)g_ptr_array_index(streamArray
, i
);
60 g_mutex_unlock(streamArrayMutex
);
63 void jprocessor_SetLocalAggregation(guint localAggregation
) {
64 if (localAggregation
== jprocessor_LocalAggregation
)
67 jprocessor_LocalAggregation
= localAggregation
;
70 void jprocessor_SetRemoteAggregation(guint remoteAggregation
) {
71 if (remoteAggregation
== jprocessor_RemoteAggregation
)
74 jprocessor_RemoteAggregation
= remoteAggregation
;
77 void jprocessor_SetContentFiltering(gboolean value
) {
78 jprocessor_ContentFiltering
= value
;
81 void jprocessor_SetProcessStreamsFunc(ProcessStreamsFunc function
) {
82 g_mutex_lock(streamArrayMutex
);
83 jprocessor_ProcessStreamsFunc
= function
;
84 g_mutex_unlock(streamArrayMutex
);
87 static guint
hashStream(gconstpointer key
) {
88 const jbase_stream
*stream
= (const jbase_stream
*)key
;
90 hash
= stream
->src
.addr6
.ntop_s6_addr32
[0];
91 hash
^= stream
->src
.addr6
.ntop_s6_addr32
[1];
92 hash
^= stream
->src
.addr6
.ntop_s6_addr32
[2];
93 hash
^= stream
->src
.addr6
.ntop_s6_addr32
[3];
94 hash
^= stream
->dst
.addr6
.ntop_s6_addr32
[0];
95 hash
^= stream
->dst
.addr6
.ntop_s6_addr32
[1];
96 hash
^= stream
->dst
.addr6
.ntop_s6_addr32
[2];
97 hash
^= stream
->dst
.addr6
.ntop_s6_addr32
[3];
98 hash
^= (((guint
)stream
->srcport
) << 16) + (guint
)stream
->dstport
;
102 static gboolean
compareStream(gconstpointer a
, gconstpointer b
) {
103 const jbase_stream
*astr
= (const jbase_stream
*)a
;
104 const jbase_stream
*bstr
= (const jbase_stream
*)b
;
105 if (astr
->proto
== bstr
->proto
&&
106 astr
->srcport
== bstr
->srcport
&&
107 astr
->dstport
== bstr
->dstport
&&
108 IN6_ARE_ADDR_EQUAL(&astr
->src
.addr6
, &bstr
->src
.addr6
) &&
109 IN6_ARE_ADDR_EQUAL(&astr
->dst
.addr6
, &bstr
->dst
.addr6
)
115 gboolean
jprocessor_Setup() {
116 streamTable
= g_hash_table_new((GHashFunc
)hashStream
, (GEqualFunc
)compareStream
);
117 streamTableMutex
= g_mutex_new();
118 streamArray
= g_ptr_array_new();
119 streamArrayMutex
= g_mutex_new();
120 jprocessor_ResetStats();
121 jprocessor_Sorting
= TRUE
;
122 jprocessor_SortingFunction
= (GCompareFunc
) jprocessor_compare_ByBytesStat
;
123 jprocessor_MaxDeadTime
= 7;
127 static gboolean
removeStreamTableEntry(gpointer key
, gpointer value
, gpointer user_data
) {
129 // value is the same pointer as key
133 void jprocessor_ResetStats() {
136 memset(&jprocessor_Stats
, 0, sizeof(jprocessor_Stats
));
137 g_get_current_time(&jprocessor_Stats
.startTime
);
139 for (i
=streamArray
->len
-1; i
>=0; i
--) {
140 g_ptr_array_remove_index_fast(streamArray
, i
);
142 g_hash_table_foreach_remove(streamTable
, (GHRFunc
)removeStreamTableEntry
, NULL
);
145 void jprocessor_UpdateBPS() {
146 GTimeVal currentDateTime
;
153 g_get_current_time(¤tDateTime
);
154 currentDateTime
.tv_sec
++;
156 for (i
=0; i
<streamArray
->len
; i
++) {
157 jbase_stream
*s
= (jbase_stream
*)g_ptr_array_index(streamArray
, i
);
158 int byteswindow
= ( currentDateTime
.tv_sec
- s
->firstSeen
.tv_sec
);
159 if (byteswindow
> HISTORY_LENGTH
)
160 byteswindow
= HISTORY_LENGTH
;
162 srcbps
+= (s
->srcbps
= s
->hsrcbytessum
/ byteswindow
);
163 s
->hsrcbytessum
-= s
->hsrcbytes
[HISTORY_LENGTH
-1];
164 memmove(s
->hsrcbytes
+1, s
->hsrcbytes
, sizeof(guint
)*(HISTORY_LENGTH
-1));
166 dstbps
+= (s
->dstbps
= s
->hdstbytessum
/ byteswindow
);
167 s
->hdstbytessum
-= s
->hdstbytes
[HISTORY_LENGTH
-1];
168 memmove(s
->hdstbytes
+1, s
->hdstbytes
, sizeof(guint
)*(HISTORY_LENGTH
-1));
170 s
->totalbps
= s
->srcbps
+ s
->dstbps
;
172 srcpps
+= (s
->srcpps
= s
->hsrcpacketssum
/ byteswindow
);
173 s
->hsrcpacketssum
-= s
->hsrcpackets
[HISTORY_LENGTH
-1];
174 memmove(s
->hsrcpackets
+1, s
->hsrcpackets
, sizeof(guint
)*(HISTORY_LENGTH
-1));
175 s
->hsrcpackets
[0] = 0;
176 dstpps
+= (s
->dstpps
= s
->hdstpacketssum
/ byteswindow
);
177 s
->hdstpacketssum
-= s
->hdstpackets
[HISTORY_LENGTH
-1];
178 memmove(s
->hdstpackets
+1, s
->hdstpackets
, sizeof(guint
)*(HISTORY_LENGTH
-1));
179 s
->hdstpackets
[0] = 0;
180 s
->totalpps
= s
->srcpps
+ s
->dstpps
;
182 if (!s
->dead
&& currentDateTime
.tv_sec
- s
->lastSeen
.tv_sec
> 10) {
187 jprocessor_Stats
.totalSrcBPS
= srcbps
;
188 jprocessor_Stats
.totalDstBPS
= dstbps
;
189 jprocessor_Stats
.totalBPS
= srcbps
+ dstbps
;
190 jprocessor_Stats
.totalSrcPPS
= srcpps
;
191 jprocessor_Stats
.totalDstPPS
= dstpps
;
192 jprocessor_Stats
.totalPPS
= srcpps
+ dstpps
;
195 static void setToHostAggregation(int af
, jbase_mutableaddress
*addr
) {
198 addr
->addr4
.s_addr
= htonl(0x01000000);
201 addr
->addr6
.ntop_s6_addr32
[0] = 0x0;
202 addr
->addr6
.ntop_s6_addr32
[1] = 0x0;
203 addr
->addr6
.ntop_s6_addr32
[2] = 0x0;
204 addr
->addr6
.ntop_s6_addr32
[3] = htonl(0x01000000);
210 static void aggregateStream(jbase_stream
*stream
) {
211 switch (jprocessor_LocalAggregation
) {
213 setToHostAggregation(JBASE_AF(stream
->proto
), &stream
->src
);
215 stream
->srcport
= -1;
217 switch (jprocessor_RemoteAggregation
) {
219 setToHostAggregation(JBASE_AF(stream
->proto
), &stream
->dst
);
221 stream
->dstport
= -1;
225 static void sortPacket(const jbase_packet
*packet
) {
226 static volatile guint64 packetUidCounter
= 0;
228 jbase_stream packetStream
;
230 jbase_payload_info payloadInfo
[JBASE_PROTO_MAX
];
231 jprocessor_Stats
.totalBytes
+= packet
->header
.len
;
232 jprocessor_Stats
.totalPackets
++;
233 memset(&packetStream
, 0, sizeof(jbase_stream
));
234 jresolv_ResolveStream(packet
, &packetStream
, payloadInfo
);
235 aggregateStream(&packetStream
);
236 g_mutex_lock(streamTableMutex
);
237 stat
= (jbase_stream
*)g_hash_table_lookup(streamTable
, &packetStream
);
239 stat
= g_new0(jbase_stream
, 1);
240 memcpy(stat
, &packetStream
, sizeof(jbase_stream
));
241 g_get_current_time(&stat
->firstSeen
);
242 stat
->uid
= packetUidCounter
++;
243 g_hash_table_insert(streamTable
, stat
, stat
);
244 g_mutex_unlock(streamTableMutex
);
246 if (jprocessor_ContentFiltering
)
247 jfilter_AssignDataFilter(stat
);
249 stat
->srcresolv
= jresolver_Lookup(JBASE_AF(packetStream
.proto
), &packetStream
.src
);
250 stat
->dstresolv
= jresolver_Lookup(JBASE_AF(packetStream
.proto
), &packetStream
.dst
);
252 g_mutex_lock(streamArrayMutex
);
253 g_ptr_array_add(streamArray
, stat
);
254 g_mutex_unlock(streamArrayMutex
);
256 g_mutex_unlock(streamTableMutex
);
258 if (packetStream
.direction
) {
259 stat
->dstbytes
+= packet
->header
.len
;
261 *stat
->hdstbytes
+= packet
->header
.len
;
262 stat
->hdstpackets
[0]++;
263 stat
->hdstbytessum
+= packet
->header
.len
;
264 stat
->hdstpacketssum
++;
265 jprocessor_Stats
.totalDstBytes
+= packet
->header
.len
;
266 jprocessor_Stats
.totalDstPackets
++;
268 stat
->srcbytes
+= packet
->header
.len
;
270 *stat
->hsrcbytes
+= packet
->header
.len
;
271 stat
->hsrcpackets
[0]++;
272 stat
->hsrcbytessum
+= packet
->header
.len
;
273 stat
->hsrcpacketssum
++;
274 jprocessor_Stats
.totalSrcBytes
+= packet
->header
.len
;
275 jprocessor_Stats
.totalSrcPackets
++;
277 stat
->totalbytes
+= packet
->header
.len
;
278 stat
->totalpackets
++;
279 g_get_current_time(&stat
->lastSeen
);
281 if (jprocessor_ContentFiltering
&& stat
->filterDataFunc
) {
282 stat
->filterDataFunc(stat
, packet
, packetStream
.direction
, payloadInfo
);
286 static gpointer
processorThreadFunc(gpointer data
) {
288 g_mutex_lock(jcapture_PacketQueueMutex
);
289 while (jcapture_IsRunning
) {
290 jbase_packet
*packet
;
291 packet
= (jbase_packet
*)g_queue_pop_tail(jcapture_PacketQueue
);
292 if (packet
== NULL
) {
293 g_cond_wait(jcapture_PacketQueueCond
, jcapture_PacketQueueMutex
);
297 g_mutex_unlock(jcapture_PacketQueueMutex
);
300 jcapture_packet_Free(packet
);
302 g_mutex_lock(jcapture_PacketQueueMutex
);
304 g_mutex_unlock(jcapture_PacketQueueMutex
);
310 static gpointer
heartbeatThreadFunc(gpointer data
) {
313 while (jcapture_IsRunning
) {
317 g_mutex_lock(streamArrayMutex
);
319 if (streamArray
->len
> 0) {
320 jprocessor_UpdateBPS();
321 if (jprocessor_Sorting
)
322 g_ptr_array_sort(streamArray
, jprocessor_SortingFunction
);
325 g_mutex_lock(streamTableMutex
);
327 for (i
=0; i
<streamArray
->len
; i
++) {
328 jbase_stream
*s
= (jbase_stream
*)g_ptr_array_index(streamArray
, i
);
329 if (s
->dead
&& ++s
->dead
> jprocessor_MaxDeadTime
&& !s
->displayed
) {
330 g_ptr_array_remove_index_fast ( streamArray
, i
);
331 g_hash_table_remove ( streamTable
, s
);
337 g_mutex_unlock(streamTableMutex
);
339 if (jprocessor_ProcessStreamsFunc
!= NULL
) {
340 jprocessor_ProcessStreamsFunc(streamArray
);
343 g_mutex_unlock(streamArrayMutex
);
345 g_get_current_time(&t
);
346 g_usleep(1000000 - t
.tv_usec
);
353 gboolean
jprocessor_Start() {
354 processorThread
= g_thread_create((GThreadFunc
)processorThreadFunc
, NULL
, FALSE
, NULL
);
355 heartbeatThread
= g_thread_create((GThreadFunc
)heartbeatThreadFunc
, NULL
, FALSE
, NULL
);
359 #define COMPAREINT(x,y) ( ((x)<(y)) ? -1 : (((x)==(y)) ? 0 : 1) )
360 #define COMPARESTREAMBY(x,y,prop) \
361 const jbase_stream *astr = *(const jbase_stream **)x; \
362 const jbase_stream *bstr = *(const jbase_stream **)y; \
363 return COMPAREINT(bstr->prop, astr->prop); \
365 gint jprocessor_compare_ByPacketsStat(gconstpointer a, gconstpointer b) {
366 COMPARESTREAMBY(a
, b
, totalpps
);
369 gint
jprocessor_compare_ByBytesStat(gconstpointer a
, gconstpointer b
) {
370 COMPARESTREAMBY(a
, b
, totalbps
);
373 gint
jprocessor_compare_ByTxBytesStat(gconstpointer a
, gconstpointer b
) {
374 COMPARESTREAMBY(a
, b
, srcbps
);
377 gint
jprocessor_compare_ByRxBytesStat(gconstpointer a
, gconstpointer b
) {
378 COMPARESTREAMBY(a
, b
, dstbps
);
381 gint
jprocessor_compare_ByTxPacketsStat(gconstpointer a
, gconstpointer b
) {
382 COMPARESTREAMBY(a
, b
, srcpps
);
385 gint
jprocessor_compare_ByRxPacketsStat(gconstpointer a
, gconstpointer b
) {
386 COMPARESTREAMBY(a
, b
, dstpps
);
389 void jprocessor_SetSorting(gboolean onoff
, GCompareFunc compareFunction
) {
391 jprocessor_Sorting
= onoff
;
392 if (compareFunction
!= NULL
)
393 jprocessor_SortingFunction
= compareFunction
;