setting svn:exports
[jnettop.git] / jnettop / jprocessor.c
blob2bac85cd51959b116dd741879d387fff8d532333
1 /*
2 * jnettop, network online traffic visualiser
3 * Copyright (C) 2002-2005 Jakub Skopal
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 * $Header$
23 #include "jbase.h"
24 #include "jcapture.h"
25 #include "jprocessor.h"
26 #include "jresolv.h"
27 #include "jfilter.h"
28 #include "jresolver.h"
30 GThread *processorThread;
31 GThread *heartbeatThread;
32 GHashTable *streamTable;
33 GMutex *streamTableMutex;
34 GPtrArray *streamArray;
35 GMutex *streamArrayMutex;
37 jprocessor_stats jprocessor_Stats;
38 guint jprocessor_LocalAggregation;
39 guint jprocessor_RemoteAggregation;
40 gboolean jprocessor_ContentFiltering;
41 gboolean jprocessor_Sorting;
42 GCompareFunc jprocessor_SortingFunction;
43 gint jprocessor_MaxDeadTime;
44 ProcessStreamsFunc jprocessor_ProcessStreamsFunc;
46 static void freeStream(gpointer ptr) {
47 jbase_stream *s = (jbase_stream *)ptr;
48 if (s->filterDataFreeFunc)
49 s->filterDataFreeFunc(s);
50 g_free(s);
53 static void markAllAsDead() {
54 int i;
55 g_mutex_lock(streamArrayMutex);
56 for (i=0; i<streamArray->len; i++) {
57 jbase_stream *s = (jbase_stream *)g_ptr_array_index(streamArray, i);
58 s->dead=6;
60 g_mutex_unlock(streamArrayMutex);
63 void jprocessor_SetLocalAggregation(guint localAggregation) {
64 if (localAggregation == jprocessor_LocalAggregation)
65 return;
66 markAllAsDead();
67 jprocessor_LocalAggregation = localAggregation;
70 void jprocessor_SetRemoteAggregation(guint remoteAggregation) {
71 if (remoteAggregation == jprocessor_RemoteAggregation)
72 return;
73 markAllAsDead();
74 jprocessor_RemoteAggregation = remoteAggregation;
77 void jprocessor_SetContentFiltering(gboolean value) {
78 jprocessor_ContentFiltering = value;
81 void jprocessor_SetProcessStreamsFunc(ProcessStreamsFunc function) {
82 g_mutex_lock(streamArrayMutex);
83 jprocessor_ProcessStreamsFunc = function;
84 g_mutex_unlock(streamArrayMutex);
87 static guint hashStream(gconstpointer key) {
88 const jbase_stream *stream = (const jbase_stream *)key;
89 guint hash = 0;
90 hash = stream->src.addr6.ntop_s6_addr32[0];
91 hash ^= stream->src.addr6.ntop_s6_addr32[1];
92 hash ^= stream->src.addr6.ntop_s6_addr32[2];
93 hash ^= stream->src.addr6.ntop_s6_addr32[3];
94 hash ^= stream->dst.addr6.ntop_s6_addr32[0];
95 hash ^= stream->dst.addr6.ntop_s6_addr32[1];
96 hash ^= stream->dst.addr6.ntop_s6_addr32[2];
97 hash ^= stream->dst.addr6.ntop_s6_addr32[3];
98 hash ^= (((guint)stream->srcport) << 16) + (guint)stream->dstport;
99 return hash;
102 static gboolean compareStream(gconstpointer a, gconstpointer b) {
103 const jbase_stream *astr = (const jbase_stream *)a;
104 const jbase_stream *bstr = (const jbase_stream *)b;
105 if (astr->proto == bstr->proto &&
106 astr->srcport == bstr->srcport &&
107 astr->dstport == bstr->dstport &&
108 IN6_ARE_ADDR_EQUAL(&astr->src.addr6, &bstr->src.addr6) &&
109 IN6_ARE_ADDR_EQUAL(&astr->dst.addr6, &bstr->dst.addr6)
111 return TRUE;
112 return FALSE;
115 gboolean jprocessor_Setup() {
116 streamTable = g_hash_table_new((GHashFunc)hashStream, (GEqualFunc)compareStream);
117 streamTableMutex = g_mutex_new();
118 streamArray = g_ptr_array_new();
119 streamArrayMutex = g_mutex_new();
120 jprocessor_ResetStats();
121 jprocessor_Sorting = TRUE;
122 jprocessor_SortingFunction = (GCompareFunc) jprocessor_compare_ByBytesStat;
123 jprocessor_MaxDeadTime = 7;
124 return TRUE;
127 static gboolean removeStreamTableEntry(gpointer key, gpointer value, gpointer user_data) {
128 freeStream(key);
129 // value is the same pointer as key
130 return TRUE;
133 void jprocessor_ResetStats() {
134 int i;
136 memset(&jprocessor_Stats, 0, sizeof(jprocessor_Stats));
137 g_get_current_time(&jprocessor_Stats.startTime);
139 for (i=streamArray->len-1; i>=0; i--) {
140 g_ptr_array_remove_index_fast(streamArray, i);
142 g_hash_table_foreach_remove(streamTable, (GHRFunc)removeStreamTableEntry, NULL);
145 void jprocessor_UpdateBPS() {
146 GTimeVal currentDateTime;
147 uint i;
148 guint32 srcbps = 0;
149 guint32 dstbps = 0;
150 guint32 srcpps = 0;
151 guint32 dstpps = 0;
153 g_get_current_time(&currentDateTime);
154 currentDateTime.tv_sec ++;
156 for (i=0; i<streamArray->len; i++) {
157 jbase_stream *s = (jbase_stream *)g_ptr_array_index(streamArray, i);
158 int byteswindow = ( currentDateTime.tv_sec - s->firstSeen.tv_sec );
159 if (byteswindow > HISTORY_LENGTH)
160 byteswindow = HISTORY_LENGTH;
162 srcbps += (s->srcbps = s->hsrcbytessum / byteswindow);
163 s->hsrcbytessum -= s->hsrcbytes[HISTORY_LENGTH-1];
164 memmove(s->hsrcbytes+1, s->hsrcbytes, sizeof(guint)*(HISTORY_LENGTH-1));
165 s->hsrcbytes[0] = 0;
166 dstbps += (s->dstbps = s->hdstbytessum / byteswindow);
167 s->hdstbytessum -= s->hdstbytes[HISTORY_LENGTH-1];
168 memmove(s->hdstbytes+1, s->hdstbytes, sizeof(guint)*(HISTORY_LENGTH-1));
169 s->hdstbytes[0] = 0;
170 s->totalbps = s->srcbps + s->dstbps;
172 srcpps += (s->srcpps = s->hsrcpacketssum / byteswindow);
173 s->hsrcpacketssum -= s->hsrcpackets[HISTORY_LENGTH-1];
174 memmove(s->hsrcpackets+1, s->hsrcpackets, sizeof(guint)*(HISTORY_LENGTH-1));
175 s->hsrcpackets[0] = 0;
176 dstpps += (s->dstpps = s->hdstpacketssum / byteswindow);
177 s->hdstpacketssum -= s->hdstpackets[HISTORY_LENGTH-1];
178 memmove(s->hdstpackets+1, s->hdstpackets, sizeof(guint)*(HISTORY_LENGTH-1));
179 s->hdstpackets[0] = 0;
180 s->totalpps = s->srcpps + s->dstpps;
182 if (!s->dead && currentDateTime.tv_sec - s->lastSeen.tv_sec > 10) {
183 s->dead ++;
187 jprocessor_Stats.totalSrcBPS = srcbps;
188 jprocessor_Stats.totalDstBPS = dstbps;
189 jprocessor_Stats.totalBPS = srcbps + dstbps;
190 jprocessor_Stats.totalSrcPPS = srcpps;
191 jprocessor_Stats.totalDstPPS = dstpps;
192 jprocessor_Stats.totalPPS = srcpps + dstpps;
195 static void setToHostAggregation(int af, jbase_mutableaddress *addr) {
196 switch (af) {
197 case AF_INET:
198 addr->addr4.s_addr = htonl(0x01000000);
199 break;
200 case AF_INET6:
201 addr->addr6.ntop_s6_addr32[0] = 0x0;
202 addr->addr6.ntop_s6_addr32[1] = 0x0;
203 addr->addr6.ntop_s6_addr32[2] = 0x0;
204 addr->addr6.ntop_s6_addr32[3] = htonl(0x01000000);
205 break;
210 static void aggregateStream(jbase_stream *stream) {
211 switch (jprocessor_LocalAggregation) {
212 case AGG_HOST:
213 setToHostAggregation(JBASE_AF(stream->proto), &stream->src);
214 case AGG_PORT:
215 stream->srcport = -1;
217 switch (jprocessor_RemoteAggregation) {
218 case AGG_HOST:
219 setToHostAggregation(JBASE_AF(stream->proto), &stream->dst);
220 case AGG_PORT:
221 stream->dstport = -1;
225 static void sortPacket(const jbase_packet *packet) {
226 static volatile guint64 packetUidCounter = 0;
228 jbase_stream packetStream;
229 jbase_stream *stat;
230 jbase_payload_info payloadInfo[JBASE_PROTO_MAX];
231 jprocessor_Stats.totalBytes += packet->header.len;
232 jprocessor_Stats.totalPackets ++;
233 memset(&packetStream, 0, sizeof(jbase_stream));
234 jresolv_ResolveStream(packet, &packetStream, payloadInfo);
235 aggregateStream(&packetStream);
236 g_mutex_lock(streamTableMutex);
237 stat = (jbase_stream *)g_hash_table_lookup(streamTable, &packetStream);
238 if (stat == NULL) {
239 stat = g_new0(jbase_stream, 1);
240 memcpy(stat, &packetStream, sizeof(jbase_stream));
241 g_get_current_time(&stat->firstSeen);
242 stat->uid = packetUidCounter++;
243 g_hash_table_insert(streamTable, stat, stat);
244 g_mutex_unlock(streamTableMutex);
246 if (jprocessor_ContentFiltering)
247 jfilter_AssignDataFilter(stat);
249 stat->srcresolv = jresolver_Lookup(JBASE_AF(packetStream.proto), &packetStream.src);
250 stat->dstresolv = jresolver_Lookup(JBASE_AF(packetStream.proto), &packetStream.dst);
252 g_mutex_lock(streamArrayMutex);
253 g_ptr_array_add(streamArray, stat);
254 g_mutex_unlock(streamArrayMutex);
255 } else {
256 g_mutex_unlock(streamTableMutex);
258 if (packetStream.direction) {
259 stat->dstbytes += packet->header.len;
260 stat->dstpackets ++;
261 *stat->hdstbytes += packet->header.len;
262 stat->hdstpackets[0]++;
263 stat->hdstbytessum += packet->header.len;
264 stat->hdstpacketssum++;
265 jprocessor_Stats.totalDstBytes += packet->header.len;
266 jprocessor_Stats.totalDstPackets++;
267 } else {
268 stat->srcbytes += packet->header.len;
269 stat->srcpackets ++;
270 *stat->hsrcbytes += packet->header.len;
271 stat->hsrcpackets[0]++;
272 stat->hsrcbytessum += packet->header.len;
273 stat->hsrcpacketssum++;
274 jprocessor_Stats.totalSrcBytes += packet->header.len;
275 jprocessor_Stats.totalSrcPackets++;
277 stat->totalbytes += packet->header.len;
278 stat->totalpackets ++;
279 g_get_current_time(&stat->lastSeen);
281 if (jprocessor_ContentFiltering && stat->filterDataFunc) {
282 stat->filterDataFunc(stat, packet, packetStream.direction, payloadInfo);
286 static gpointer processorThreadFunc(gpointer data) {
287 threadCount ++;
288 g_mutex_lock(jcapture_PacketQueueMutex);
289 while (jcapture_IsRunning) {
290 jbase_packet *packet;
291 packet = (jbase_packet *)g_queue_pop_tail(jcapture_PacketQueue);
292 if (packet == NULL) {
293 g_cond_wait(jcapture_PacketQueueCond, jcapture_PacketQueueMutex);
294 continue;
297 g_mutex_unlock(jcapture_PacketQueueMutex);
299 sortPacket(packet);
300 jcapture_packet_Free(packet);
302 g_mutex_lock(jcapture_PacketQueueMutex);
304 g_mutex_unlock(jcapture_PacketQueueMutex);
305 threadCount --;
307 return NULL;
310 static gpointer heartbeatThreadFunc(gpointer data) {
311 threadCount ++;
313 while (jcapture_IsRunning) {
314 guint i;
315 GTimeVal t;
317 g_mutex_lock(streamArrayMutex);
319 if (streamArray->len > 0) {
320 jprocessor_UpdateBPS();
321 if (jprocessor_Sorting)
322 g_ptr_array_sort(streamArray, jprocessor_SortingFunction);
325 g_mutex_lock(streamTableMutex);
327 for (i=0; i<streamArray->len; i++) {
328 jbase_stream *s = (jbase_stream *)g_ptr_array_index(streamArray, i);
329 if (s->dead && ++s->dead > jprocessor_MaxDeadTime && !s->displayed) {
330 g_ptr_array_remove_index_fast ( streamArray, i );
331 g_hash_table_remove ( streamTable, s );
332 freeStream(s);
333 i--;
337 g_mutex_unlock(streamTableMutex);
339 if (jprocessor_ProcessStreamsFunc != NULL) {
340 jprocessor_ProcessStreamsFunc(streamArray);
343 g_mutex_unlock(streamArrayMutex);
345 g_get_current_time(&t);
346 g_usleep(1000000 - t.tv_usec);
349 threadCount --;
350 return NULL;
353 gboolean jprocessor_Start() {
354 processorThread = g_thread_create((GThreadFunc)processorThreadFunc, NULL, FALSE, NULL);
355 heartbeatThread = g_thread_create((GThreadFunc)heartbeatThreadFunc, NULL, FALSE, NULL);
356 return TRUE;
359 #define COMPAREINT(x,y) ( ((x)<(y)) ? -1 : (((x)==(y)) ? 0 : 1) )
360 #define COMPARESTREAMBY(x,y,prop) \
361 const jbase_stream *astr = *(const jbase_stream **)x; \
362 const jbase_stream *bstr = *(const jbase_stream **)y; \
363 return COMPAREINT(bstr->prop, astr->prop); \
365 gint jprocessor_compare_ByPacketsStat(gconstpointer a, gconstpointer b) {
366 COMPARESTREAMBY(a, b, totalpps);
369 gint jprocessor_compare_ByBytesStat(gconstpointer a, gconstpointer b) {
370 COMPARESTREAMBY(a, b, totalbps);
373 gint jprocessor_compare_ByTxBytesStat(gconstpointer a, gconstpointer b) {
374 COMPARESTREAMBY(a, b, srcbps);
377 gint jprocessor_compare_ByRxBytesStat(gconstpointer a, gconstpointer b) {
378 COMPARESTREAMBY(a, b, dstbps);
381 gint jprocessor_compare_ByTxPacketsStat(gconstpointer a, gconstpointer b) {
382 COMPARESTREAMBY(a, b, srcpps);
385 gint jprocessor_compare_ByRxPacketsStat(gconstpointer a, gconstpointer b) {
386 COMPARESTREAMBY(a, b, dstpps);
389 void jprocessor_SetSorting(gboolean onoff, GCompareFunc compareFunction) {
390 if (onoff != -1)
391 jprocessor_Sorting = onoff;
392 if (compareFunction != NULL)
393 jprocessor_SortingFunction = compareFunction;