1 #include "Tourist/App/Multicast.h"
2 #include "Poco/Logger.h"
3 #include "Tourist/Constants.h"
4 #include "Tourist/Util/Util.h"
5 #include "Poco/Instantiator.h"
7 using namespace Tourist::Util
;
9 using Poco::Instantiator
;
15 Multicast::Multicast(Application
*_app
, int objCount
) :
16 logger(Logger::get("Tourist.App.Multicast." + Util::toString(objCount
) ))
19 this->shouldBeRunning
= true;
22 Multicast::~Multicast()
28 //Register for incoming multicast messages
29 receiveMulticastMsg();
31 while(shouldBeRunning
)
34 Poco::Thread::sleep(500);
39 void Multicast::stop()
41 this -> shouldBeRunning
= false;
44 void Multicast::processQueue()
46 //debug(logger, "Multicast::processQueue()");
48 //consume dispatched message
49 while (queue
.size() != 0)
51 info(logger
, "Processing multicast message");
52 //We Got the message.. let's process it
53 MessageNotification
*msgNotify
= dynamic_cast<MessageNotification
*>
54 (queue
.dequeueNotification());
55 if (msgNotify
== NULL
)
57 error(logger
, "Notification for multicast response is NULL!");
58 continue; //let's see if there is any other message in queue.
61 debug(logger
, "Multicast Message Received. Parsing message ....");
63 MsgMulticast
*msgMulticast
= dynamic_cast<MsgMulticast
*> (msgNotify
-> message
);
64 debug(logger
, "Multicast Initiated by: " + msgMulticast
->getInitiatingPeer().toString());
66 app
->getLocalNodeInfo(thisNode
);
67 debug(logger
, "This node is: " + thisNode
.toString());
70 //Initiate Tree based Multicast
71 // Call localNode method findRemoteLinks with approprate flags
72 // in a loop send multicast event to all nodes in NodeSet
73 // end condition: if findRemoteLinks reuturns no node then no sendEvent & multicast finished.
75 RemoteNode initNode
= msgMulticast
->getInitiatingPeer();
76 multicastRouting(&initNode
, msgMulticast
->getEventType(), msgMulticast
->getStep());
81 void Multicast::multicastRouting(Node
* source
, int event_type
, int step
)
84 int type
= TYPE_PREFIX
;
85 int multicastCount
= 0;
88 if (event_type
>= 1 && event_type
<= 4) {
89 status
= app
->searchRemoteLinks(*source
, audienceSet
, LOWER_LEVELS
|EQUAL_LEVELS
|FIND_PREFIXES
|FIND_ALL
);
91 status
= app
->searchRemoteLinks(*source
, audienceSet
, LOWER_LEVELS
|EQUAL_LEVELS
|FIND_SUFFIXES
|FIND_ALL
);
95 debug(logger
, status
?"Audience Set Found":"No Audience Set Found");
98 //TODO: Use variable instead of 64
99 for (int i
=step
+1; i
<=64; i
++) {
100 Node
*nextHop
= app
->getNextHop(audienceSet
, source
, i
, type
);
101 if (nextHop
== NULL
|| nextHop
->hexstring() == source
->hexstring()) {
105 debug(logger
, "Sending event to an audience set node ..... ");
106 int ack
= sendEvent(source
, nextHop
, event_type
, i
);
116 void Multicast::levelChangeEvent(int newLevel
, Node
*localNode
)
118 //Initiating prefix multicast
119 int prefixEvent
, suffixEvent
;
120 if (localNode
->getLevel() > newLevel
)
122 prefixEvent
= EVENT_PREFIX_LEVEL_UP
;
123 suffixEvent
= EVENT_SUFFIX_LEVEL_UP
;
127 prefixEvent
= EVENT_PREFIX_LEVEL_DOWN
;
128 suffixEvent
= EVENT_SUFFIX_LEVEL_DOWN
;
132 if(! (app
-> getTopNode(*localNode
, &pTopNode
, PREFIX
)))
133 error(logger
, "Cannot find Top Node!");
134 else debug(logger
, "Top Node found");
135 int prefixAck
= sendEvent(localNode
, pTopNode
, prefixEvent
, 0);
137 if(! (app
-> getTopNode(*localNode
, &pTopNode
, SUFFIX
)))
138 error(logger
, "Cannot find Top Node!");
139 else debug(logger
, "Top Node found");
140 int suffixAck
= sendEvent(localNode
, pTopNode
, suffixEvent
, 0);
145 int Multicast::sendEvent(Node
*source
, Node
*target
, int event_type
, int step
)
147 if (source
== NULL
|| target
== NULL
)
150 unsigned int seqN
= 0;
152 const char* method
= "eventOut_sendEvent";
154 /* ------------------ LOG [Send Event]----------------------- */
156 string eventType
= "Un-Known";
159 case EVENT_PREFIX_JOIN
:
160 eventType
= "Prefix join";
163 case EVENT_PREFIX_DISCONNECT
:
164 eventType
= "Prefix disconnect";
167 case EVENT_PREFIX_LEVEL_UP
:
168 eventType
= "Prefix level up";
171 case EVENT_PREFIX_LEVEL_DOWN
:
172 eventType
= "Prefix level down";
175 case EVENT_SUFFIX_JOIN
:
176 eventType
= "Suffix join";
179 case EVENT_SUFFIX_DISCONNECT
:
180 eventType
= "Suffix disconnect";
183 case EVENT_SUFFIX_LEVEL_UP
:
184 eventType
= "Suffix level up";
187 case EVENT_SUFFIX_LEVEL_DOWN
:
188 eventType
= "Suffix level down";
194 "source:source_level= " + source
->hexstring() + ":" + Util::toString(source
->getLevel()) +
195 "\ntarget:target_level= " + target
->hexstring() + ":" + Util::toString(target
->getLevel()) +
196 "\nevent:step= " + eventType
+ ":" + Util::toString(step
) );
199 /* ---------------------------------------------------------- */
201 //Get pointer to Topnode
205 RemoteNode
*rNodePtr
= (RemoteNode
*) target
;
206 RemoteNode rNode
= *rNodePtr
;
208 int statusConn
= app
->getConnection(rNode
, &topNode
);
210 //Register Message Type
211 MessageTypeData msgType
;
212 msgType
.registerMessageObj(MessageTypeData::MULTICAST
, new Instantiator
<MsgMulticast
, AbstractMessage
>);
215 app
->getLocalNodeInfo(srcNode
);
217 MsgMulticast
*msgMulticast
= new MsgMulticast("Multicast","Multicast", srcNode
, event_type
, step
);
219 msgMulticast
->getString(str
);
221 //Instantiate MessageHeader object
222 MessageHeader
*msgHeader
= new MessageHeader(MessageHeader::HEADER_VERSION
, str
.length(), MessageTypeData::MULTICAST
);
224 //Send Multicast Request Message to Top Node
225 topNode
->sendMessage(msgHeader
, msgMulticast
);
232 //------------------------------------------------------------------------------
233 void Multicast::receiveMulticastMsg()
235 app
->getConfig()->getMessageBus()->registerNotification("Multicast",
236 NObserver
<Multicast
, MessageNotification
>(*this, &Multicast::onMulticastMsg
));
239 void Multicast::onMulticastMsg(const AutoPtr
<MessageNotification
>& notification
)
241 debug(logger
, "Multicast::onMulticastMsg()");
243 MessageNotification
*notificationObj
= const_cast<MessageNotification
*>(notification
.get());
244 queue
.enqueueNotification(notificationObj
);
248 } //namespace Tourist