1 #include "Tourist/App/Multicast.h"
2 #include "Poco/Logger.h"
3 #include "Tourist/Constants.h"
4 #include "Tourist/Util/Util.h"
5 #include "Poco/Instantiator.h"
6 #include <typeinfo> //Remove This - Only for testing
9 using namespace Tourist::Util
;
11 using Poco::Instantiator
;
17 Multicast::Multicast(Application
*_app
, int objCount
) :
18 logger(Logger::get("Tourist.App.Multicast." + Util::toString(objCount
) ))
21 this->shouldBeRunning
= true;
23 //this->objCount = objCount;
26 Multicast::~Multicast()
32 //Register for incoming multicast messages
33 receiveMulticastMsg();
35 while(shouldBeRunning
)
38 Poco::Thread::sleep(500);
43 void Multicast::stop()
45 this -> shouldBeRunning
= false;
49 string Multicast::getObjCountStr()
53 return "[Node " + Util::toString(
58 void Multicast::processQueue()
60 //debug(logger, "Multicast::processQueue()");
62 //consume dispatched message
63 while (queue
.size() != 0)
65 //info(logger, (debugFlag?(Util::toString(objCount)):"") + "Processing multicast message");
66 info(logger
, "Processing multicast message");
67 //We Got the message.. let's process it
68 MessageNotification
*msgNotify
= dynamic_cast<MessageNotification
*>
69 (queue
.dequeueNotification());
70 if (msgNotify
== NULL
)
72 //error(logger, debugFlag ? ("[Node " + Util::toString(objCount) + "]") : "" +
73 //"Notification for multicast response is NULL!");
74 error(logger
, "Notification for multicast response is NULL!");
75 continue; //let's see if there is any other message in queue.
78 debug(logger
, "Multicast Message Received. Parsing message ....");
80 MsgMulticast
*msgMulticast
= dynamic_cast<MsgMulticast
*> (msgNotify
-> message
);
81 debug(logger
, "Multicast Initiated by: " + msgMulticast
->getInitiatingPeer().toString());
83 app
->getLocalNodeInfo(thisNode
);
84 debug(logger
, "This node is: " + thisNode
.toString());
87 //Initiate Tree based Multicast
88 // Call localNode method findRemoteLinks with approprate flags
89 // in a loop send multicast event to all nodes in NodeSet
90 // end condition: if findRemoteLinks reuturns no node then no sendEvent & multicast finished.
92 // TODO: PRoblem, get Localnode
94 RemoteNode initNode
= msgMulticast
->getInitiatingPeer();
95 multicastRouting(&initNode
, msgMulticast
->getEventType(), msgMulticast
->getStep());
100 void Multicast::multicastRouting(Node
* source
, int event_type
, int step
)
103 int type
= TYPE_PREFIX
;
104 int multicastCount
= 0;
107 if (event_type
>= 1 && event_type
<= 4) {
108 //findCandidatesMulticast(this, source, audienceSet,LOWER_LEVELS|EQUAL_LEVELS|FIND_PREFIXES|FIND_ALL);
109 status
= app
->searchRemoteLinks(*source
, audienceSet
, LOWER_LEVELS
|EQUAL_LEVELS
|FIND_PREFIXES
|FIND_ALL
);
111 //findCandidatesMulticast(this, source, audienceSet,LOWER_LEVELS|EQUAL_LEVELS|FIND_SUFFIXES|FIND_ALL);
112 status
= app
->searchRemoteLinks(*source
, audienceSet
, LOWER_LEVELS
|EQUAL_LEVELS
|FIND_SUFFIXES
|FIND_ALL
);
116 debug(logger
, status
?"Audience Set Found":"No Audience Set Found");
119 //TODO: Use variable instead of 64
120 for (int i
=step
+1; i
<=64; i
++) {
121 Node
*nextHop
= app
->getNextHop(audienceSet
, source
, i
, type
);
122 if (nextHop
== NULL
|| nextHop
->hexstring() == source
->hexstring()) {
126 debug(logger
, "Sending event to an audience set node ..... ");
127 int ack
= sendEvent(source
, nextHop
, event_type
, i
);
137 // TODO: LocalNode or Node* -- ???
138 //void Multicast::levelChangeEvent(int newLevel, LocalNode *localNode)
139 void Multicast::levelChangeEvent(int newLevel
, Node
*localNode
)
141 //Initiating prefix multicast
142 int prefixEvent
, suffixEvent
;
143 if (localNode
->getLevel() > newLevel
)
145 prefixEvent
= EVENT_PREFIX_LEVEL_UP
;
146 suffixEvent
= EVENT_SUFFIX_LEVEL_UP
;
150 prefixEvent
= EVENT_PREFIX_LEVEL_DOWN
;
151 suffixEvent
= EVENT_SUFFIX_LEVEL_DOWN
;
154 /* Node **prefixTopNode;
155 localNode -> getTopNode(*localNode, prefixTopNode, TYPE_PREFIX);
156 int prefixAck = sendEvent(localNode, *prefixTopNode, prefixEvent, 0);
159 if(! (app
-> getTopNode(*localNode
, &pTopNode
, PREFIX
)))
160 error(logger
, "Cannot find Top Node!");
161 else debug(logger
, "Top Node found");
162 int prefixAck
= sendEvent(localNode
, pTopNode
, prefixEvent
, 0);
164 //Node *n=*prefixTopNode;
165 //debug(logger, "Prefix TN Level: " + Util::toString( n->getLevel() ));
166 //debug(logger, "Prefix TN Level: " + Util::toString(localNode->getLevel()));
169 if(! (app
-> getTopNode(*localNode
, &pTopNode
, SUFFIX
)))
170 error(logger
, "Cannot find Top Node!");
171 else debug(logger
, "Top Node found");
172 int suffixAck
= sendEvent(localNode
, pTopNode
, suffixEvent
, 0);
176 Node **suffixTopNode;
177 localNode -> getTopNode(*localNode, suffixTopNode, TYPE_SUFFIX);
178 int suffixAck = sendEvent(localNode, *suffixTopNode, suffixEvent, 0);
184 //TODO: debug & error method's of Poco Logger lib to be used.
185 int Multicast::sendEvent(Node
*source
, Node
*target
, int event_type
, int step
)
189 if (source
== NULL
|| target
== NULL
)
192 unsigned int seqN
= 0;
194 const char* method
= "eventOut_sendEvent";
196 /* ------------------ LOG [Send Event]----------------------- */
198 string eventType
= "Un-Known";
201 case EVENT_PREFIX_JOIN
:
202 eventType
= "Prefix join";
205 case EVENT_PREFIX_DISCONNECT
:
206 eventType
= "Prefix disconnect";
209 case EVENT_PREFIX_LEVEL_UP
:
210 eventType
= "Prefix level up";
213 case EVENT_PREFIX_LEVEL_DOWN
:
214 eventType
= "Prefix level down";
217 case EVENT_SUFFIX_JOIN
:
218 eventType
= "Suffix join";
221 case EVENT_SUFFIX_DISCONNECT
:
222 eventType
= "Suffix disconnect";
225 case EVENT_SUFFIX_LEVEL_UP
:
226 eventType
= "Suffix level up";
229 case EVENT_SUFFIX_LEVEL_DOWN
:
230 eventType
= "Suffix level down";
236 "source:source_level= " + source
->hexstring() + ":" + Util::toString(source
->getLevel()) +
237 "\ntarget:target_level= " + target
->hexstring() + ":" + Util::toString(target
->getLevel()) +
238 "\nevent:step= " + eventType
+ ":" + Util::toString(step
) );
241 /* ---------------------------------------------------------- */
243 //Get pointer to Topnode
247 RemoteNode
*rNodePtr
= (RemoteNode
*) target
;
248 RemoteNode rNode
= *rNodePtr
;
250 int statusConn
= app
->getConnection(rNode
, &topNode
);
252 //Register Message Type
253 MessageTypeData msgType
;
254 msgType
.registerMessageObj(MessageTypeData::MULTICAST
, new Instantiator
<MsgMulticast
, AbstractMessage
>);
257 app
->getLocalNodeInfo(srcNode
);
259 MsgMulticast
*msgMulticast
= new MsgMulticast("Multicast","Multicast", srcNode
, event_type
, step
);
261 msgMulticast
->getString(str
);
263 //Instantiate MessageHeader object
264 MessageHeader
*msgHeader
= new MessageHeader(MessageHeader::HEADER_VERSION
, str
.length(), MessageTypeData::MULTICAST
);
266 //Send Multicast Request Message to Top Node
267 topNode
->sendMessage(msgHeader
, msgMulticast
);
274 //------------------------------------------------------------------------------
275 void Multicast::receiveMulticastMsg()
277 /*MessageDispatcher dispatcher;
278 dispatcher.registerNotification("onMulticastMsg",
279 NObserver<Multicast, MessageNotification>(*this, &Multicast::onMulticastMsg));*/
281 app
->getConfig()->getMessageBus()->registerNotification("Multicast",
282 NObserver
<Multicast
, MessageNotification
>(*this, &Multicast::onMulticastMsg
));
286 void Multicast::onMulticastMsg(const AutoPtr
<MessageNotification
>& notification
)
288 debug(logger
, "Multicast::onMulticastMsg()");
290 MessageNotification
*notificationObj
= const_cast<MessageNotification
*>(notification
.get());
291 queue
.enqueueNotification(notificationObj
);
294 string component = notification -> message -> getComponentName();
297 notification -> message -> getString(message);
299 debug(logger, "Message Received. Parsing message ....");
301 MsgMulticast *multicast = dynamic_cast<MsgMulticast*> (notification -> message);
302 debug(logger, "Multicast Initiated by: " + multicast->getInitiatingPeer().toString());
305 app->getLocalNodeInfo(thisNode);
306 debug(logger, "This node is: " + thisNode.toString());
309 //Initiate Tree based Multicast
315 //------------------------------------------------------------------------------
316 // Find the next recipient in the chain to forward a message to
317 Node * findRouter(LocalNode* sender, Node *recipient) {
319 AvliSetEl< Node *> *result;
322 // Test whether the sender is also the recipient
323 if (*sender == *recipient){
324 DoutLevel(2,cerr,"Recipient is Sender");
328 // Now search the prefix table
330 for (int j=0; j<MAX_NODE_LEVEL;j++){
331 DoutLevel(6,cerr," prefixTable["<<j<<"]="<<
332 (void *)(&sender->prefixTable[j])<<" len="<<
333 sender->prefixTable[j].length());
337 for (i=0; i<MAX_NODE_LEVEL;i++){
338 DoutLevel(6,cerr,"i="<<i<<" prefixTable="<<
339 (void *)&sender->prefixTable[i]);
340 result=sender->prefixTable[i].find(recipient);
342 DoutLevel(6,cerr,"Recipient found in prefix table, level = "<<i);
343 return (Node *)result->key;
347 // Search the suffix table for candidates
348 NodeCompSet remoteNodes;
349 remoteNodes.setNode(recipient);
350 findCandidatesSuffix(sender, recipient, &remoteNodes);
352 // Node found, search the backup table
353 if (remoteNodes.length()==0){
354 DoutLevel(6,cerr,"NodePSet "<<
355 " has zero length, falling back to backup_table"<<endl);
357 outstream<<"["<<sender->level<<"] [findRouter] [backuptable]";
358 netLogger.warning(outstream.str());
359 outstream.str(string(""));
361 /*for (i=1;i<=sender->level;i++){
362 DoutLevel(6,cerr,i<<": ["<<i<<"] = "<<hex<<sender->BackupTable[i]);
363 if (sender->BackupTable[i]){
364 DoutLevel(6,cerr,"Forwarding to backup level "<<
365 sender->BackupTable[i]->level<<" "<<sender->BackupTable[i]);
366 DoutLevel(6,cerr,"BackupTable="<<hex<<&sender->BackupTable);
367 remoteNodes.insert(sender->BackupTable[i]);
372 if (remoteNodes.length()==0) retval=NULL;
374 NodeCompSet::Iter iter(remoteNodes);
375 retval=(Node *)iter->key; // Simply return the closest candidate in node space
382 } //namespace Tourist