Badar's initial import for his Multicast code
[tourist.git] / App / src / Multicast.cpp
blob30e8fd56504217ca008b90f0c7054a354758f1df
1 #include "Tourist/App/Multicast.h"
2 #include "Poco/Logger.h"
3 #include "Tourist/Constants.h"
4 #include "Tourist/Util/Util.h"
5 #include "Poco/Instantiator.h"
6 #include <typeinfo> //Remove This - Only for testing
9 using namespace Tourist::Util;
11 using Poco::Instantiator;
13 namespace Tourist{
14 namespace App{
17 Multicast::Multicast(Application *_app, int objCount) :
18 logger(Logger::get("Tourist.App.Multicast." + Util::toString(objCount) ))
20 this->app = _app;
21 this->shouldBeRunning = true;
23 //this->objCount = objCount;
26 Multicast::~Multicast()
30 void Multicast::run()
32 //Register for incoming multicast messages
33 receiveMulticastMsg();
35 while(shouldBeRunning)
37 processQueue();
38 Poco::Thread::sleep(500);
43 void Multicast::stop()
45 this -> shouldBeRunning = false;
49 string Multicast::getObjCountStr()
51 if(debugFlag)
53 return "[Node " + Util::toString(
56 }*/
58 void Multicast::processQueue()
60 //debug(logger, "Multicast::processQueue()");
62 //consume dispatched message
63 while (queue.size() != 0)
65 //info(logger, (debugFlag?(Util::toString(objCount)):"") + "Processing multicast message");
66 info(logger, "Processing multicast message");
67 //We Got the message.. let's process it
68 MessageNotification *msgNotify = dynamic_cast<MessageNotification*>
69 (queue.dequeueNotification());
70 if (msgNotify == NULL)
72 //error(logger, debugFlag ? ("[Node " + Util::toString(objCount) + "]") : "" +
73 //"Notification for multicast response is NULL!");
74 error(logger, "Notification for multicast response is NULL!");
75 continue; //let's see if there is any other message in queue.
78 debug(logger, "Multicast Message Received. Parsing message ....");
80 MsgMulticast *msgMulticast = dynamic_cast<MsgMulticast*> (msgNotify -> message);
81 debug(logger, "Multicast Initiated by: " + msgMulticast->getInitiatingPeer().toString());
82 RemoteNode thisNode;
83 app->getLocalNodeInfo(thisNode);
84 debug(logger, "This node is: " + thisNode.toString());
87 //Initiate Tree based Multicast
88 // Call localNode method findRemoteLinks with approprate flags
89 // in a loop send multicast event to all nodes in NodeSet
90 // end condition: if findRemoteLinks reuturns no node then no sendEvent & multicast finished.
92 // TODO: PRoblem, get Localnode
94 RemoteNode initNode = msgMulticast->getInitiatingPeer();
95 multicastRouting(&initNode, msgMulticast->getEventType(), msgMulticast->getStep());
100 void Multicast::multicastRouting(Node* source, int event_type, int step)
102 NodeSet audienceSet;
103 int type = TYPE_PREFIX;
104 int multicastCount = 0;
105 bool status=false;
107 if (event_type >= 1 && event_type <= 4) {
108 //findCandidatesMulticast(this, source, audienceSet,LOWER_LEVELS|EQUAL_LEVELS|FIND_PREFIXES|FIND_ALL);
109 status = app->searchRemoteLinks(*source, audienceSet, LOWER_LEVELS|EQUAL_LEVELS|FIND_PREFIXES|FIND_ALL);
110 } else {
111 //findCandidatesMulticast(this, source, audienceSet,LOWER_LEVELS|EQUAL_LEVELS|FIND_SUFFIXES|FIND_ALL);
112 status = app->searchRemoteLinks(*source, audienceSet, LOWER_LEVELS|EQUAL_LEVELS|FIND_SUFFIXES|FIND_ALL);
113 type = TYPE_SUFFIX;
116 debug(logger, status?"Audience Set Found":"No Audience Set Found");
118 if(status) {
119 //TODO: Use variable instead of 64
120 for (int i=step+1; i<=64; i++) {
121 Node *nextHop = app->getNextHop(audienceSet, source, i, type);
122 if (nextHop == NULL || nextHop->hexstring() == source->hexstring()) {
123 continue;
126 debug(logger, "Sending event to an audience set node ..... ");
127 int ack = sendEvent(source, nextHop, event_type, i);
129 if (ack > 0)
130 multicastCount++;
137 // TODO: LocalNode or Node* -- ???
138 //void Multicast::levelChangeEvent(int newLevel, LocalNode *localNode)
139 void Multicast::levelChangeEvent(int newLevel, Node *localNode)
141 //Initiating prefix multicast
142 int prefixEvent, suffixEvent;
143 if (localNode->getLevel() > newLevel)
145 prefixEvent = EVENT_PREFIX_LEVEL_UP;
146 suffixEvent = EVENT_SUFFIX_LEVEL_UP;
148 else
150 prefixEvent = EVENT_PREFIX_LEVEL_DOWN;
151 suffixEvent = EVENT_SUFFIX_LEVEL_DOWN;
154 /* Node **prefixTopNode;
155 localNode -> getTopNode(*localNode, prefixTopNode, TYPE_PREFIX);
156 int prefixAck = sendEvent(localNode, *prefixTopNode, prefixEvent, 0);
158 Node *pTopNode;
159 if(! (app -> getTopNode(*localNode, &pTopNode, PREFIX)))
160 error(logger, "Cannot find Top Node!");
161 else debug(logger, "Top Node found");
162 int prefixAck = sendEvent(localNode, pTopNode, prefixEvent, 0);
164 //Node *n=*prefixTopNode;
165 //debug(logger, "Prefix TN Level: " + Util::toString( n->getLevel() ));
166 //debug(logger, "Prefix TN Level: " + Util::toString(localNode->getLevel()));
168 //pTopNode = NULL;
169 if(! (app -> getTopNode(*localNode, &pTopNode, SUFFIX)))
170 error(logger, "Cannot find Top Node!");
171 else debug(logger, "Top Node found");
172 int suffixAck = sendEvent(localNode, pTopNode, suffixEvent, 0);
176 Node **suffixTopNode;
177 localNode -> getTopNode(*localNode, suffixTopNode, TYPE_SUFFIX);
178 int suffixAck = sendEvent(localNode, *suffixTopNode, suffixEvent, 0);
184 //TODO: debug & error method's of Poco Logger lib to be used.
185 int Multicast::sendEvent(Node *source, Node *target, int event_type, int step)
189 if (source == NULL || target == NULL)
190 return -1;
192 unsigned int seqN = 0;
193 string log;
194 const char* method = "eventOut_sendEvent";
196 /* ------------------ LOG [Send Event]----------------------- */
198 string eventType = "Un-Known";
199 switch(event_type)
201 case EVENT_PREFIX_JOIN:
202 eventType = "Prefix join";
203 break;
205 case EVENT_PREFIX_DISCONNECT:
206 eventType = "Prefix disconnect";
207 break;
209 case EVENT_PREFIX_LEVEL_UP:
210 eventType = "Prefix level up";
211 break;
213 case EVENT_PREFIX_LEVEL_DOWN:
214 eventType = "Prefix level down";
215 break;
217 case EVENT_SUFFIX_JOIN:
218 eventType = "Suffix join";
219 break;
221 case EVENT_SUFFIX_DISCONNECT:
222 eventType = "Suffix disconnect";
223 break;
225 case EVENT_SUFFIX_LEVEL_UP:
226 eventType = "Suffix level up";
227 break;
229 case EVENT_SUFFIX_LEVEL_DOWN:
230 eventType = "Suffix level down";
231 break;
235 debug(logger,
236 "source:source_level= " + source->hexstring() + ":" + Util::toString(source->getLevel()) +
237 "\ntarget:target_level= " + target->hexstring() + ":" + Util::toString(target->getLevel()) +
238 "\nevent:step= " + eventType + ":" + Util::toString(step) );
241 /* ---------------------------------------------------------- */
243 //Get pointer to Topnode
245 RemoteNode *topNode;
247 RemoteNode *rNodePtr = (RemoteNode*) target;
248 RemoteNode rNode = *rNodePtr;
250 int statusConn = app->getConnection(rNode, &topNode);
252 //Register Message Type
253 MessageTypeData msgType;
254 msgType.registerMessageObj(MessageTypeData::MULTICAST, new Instantiator<MsgMulticast, AbstractMessage>);
256 RemoteNode srcNode;
257 app->getLocalNodeInfo(srcNode);
259 MsgMulticast *msgMulticast = new MsgMulticast("Multicast","Multicast", srcNode, event_type, step);
260 string str;
261 msgMulticast->getString(str);
263 //Instantiate MessageHeader object
264 MessageHeader *msgHeader = new MessageHeader(MessageHeader::HEADER_VERSION, str.length(), MessageTypeData::MULTICAST);
266 //Send Multicast Request Message to Top Node
267 topNode->sendMessage(msgHeader, msgMulticast);
269 return true;
274 //------------------------------------------------------------------------------
275 void Multicast::receiveMulticastMsg()
277 /*MessageDispatcher dispatcher;
278 dispatcher.registerNotification("onMulticastMsg",
279 NObserver<Multicast, MessageNotification>(*this, &Multicast::onMulticastMsg));*/
281 app->getConfig()->getMessageBus()->registerNotification("Multicast",
282 NObserver<Multicast, MessageNotification>(*this, &Multicast::onMulticastMsg));
286 void Multicast::onMulticastMsg(const AutoPtr<MessageNotification>& notification)
288 debug(logger, "Multicast::onMulticastMsg()");
290 MessageNotification *notificationObj = const_cast<MessageNotification*>(notification.get());
291 queue.enqueueNotification(notificationObj);
294 string component = notification -> message -> getComponentName();
296 string message;
297 notification -> message -> getString(message);
299 debug(logger, "Message Received. Parsing message ....");
301 MsgMulticast *multicast = dynamic_cast<MsgMulticast*> (notification -> message);
302 debug(logger, "Multicast Initiated by: " + multicast->getInitiatingPeer().toString());
304 RemoteNode thisNode;
305 app->getLocalNodeInfo(thisNode);
306 debug(logger, "This node is: " + thisNode.toString());
309 //Initiate Tree based Multicast
315 //------------------------------------------------------------------------------
316 // Find the next recipient in the chain to forward a message to
317 Node * findRouter(LocalNode* sender, Node *recipient) {
318 unsigned int i;
319 AvliSetEl< Node *> *result;
320 Node *retval;
322 // Test whether the sender is also the recipient
323 if (*sender == *recipient){
324 DoutLevel(2,cerr,"Recipient is Sender");
325 return recipient;
328 // Now search the prefix table
329 DebugLevel(6,
330 for (int j=0; j<MAX_NODE_LEVEL;j++){
331 DoutLevel(6,cerr," prefixTable["<<j<<"]="<<
332 (void *)(&sender->prefixTable[j])<<" len="<<
333 sender->prefixTable[j].length());
337 for (i=0; i<MAX_NODE_LEVEL;i++){
338 DoutLevel(6,cerr,"i="<<i<<" prefixTable="<<
339 (void *)&sender->prefixTable[i]);
340 result=sender->prefixTable[i].find(recipient);
341 if (result){
342 DoutLevel(6,cerr,"Recipient found in prefix table, level = "<<i);
343 return (Node *)result->key;
347 // Search the suffix table for candidates
348 NodeCompSet remoteNodes;
349 remoteNodes.setNode(recipient);
350 findCandidatesSuffix(sender, recipient, &remoteNodes);
352 // Node found, search the backup table
353 if (remoteNodes.length()==0){
354 DoutLevel(6,cerr,"NodePSet "<<
355 " has zero length, falling back to backup_table"<<endl);
356 /*DebugLevel(4,
357 outstream<<"["<<sender->level<<"] [findRouter] [backuptable]";
358 netLogger.warning(outstream.str());
359 outstream.str(string(""));
360 );*/
361 /*for (i=1;i<=sender->level;i++){
362 DoutLevel(6,cerr,i<<": ["<<i<<"] = "<<hex<<sender->BackupTable[i]);
363 if (sender->BackupTable[i]){
364 DoutLevel(6,cerr,"Forwarding to backup level "<<
365 sender->BackupTable[i]->level<<" "<<sender->BackupTable[i]);
366 DoutLevel(6,cerr,"BackupTable="<<hex<<&sender->BackupTable);
367 remoteNodes.insert(sender->BackupTable[i]);
372 if (remoteNodes.length()==0) retval=NULL;
373 else{
374 NodeCompSet::Iter iter(remoteNodes);
375 retval=(Node *)iter->key; // Simply return the closest candidate in node space
377 remoteNodes.empty();
378 return retval;
382 } //namespace Tourist
383 } //namespace App