Some code fixes done to multicast service classes.
[tourist.git] / App / src / Multicast.cpp
blob9900c7bb1e9f2bb2a1f3ba498e8feff9ae1d833b
1 #include "Tourist/App/Multicast.h"
2 #include "Poco/Logger.h"
3 #include "Tourist/Constants.h"
4 #include "Tourist/Util/Util.h"
5 #include "Poco/Instantiator.h"
7 using namespace Tourist::Util;
9 using Poco::Instantiator;
11 namespace Tourist{
12 namespace App{
15 Multicast::Multicast(Application *_app, int objCount) :
16 logger(Logger::get("Tourist.App.Multicast." + Util::toString(objCount) ))
18 this->app = _app;
19 this->shouldBeRunning = true;
22 Multicast::~Multicast()
26 void Multicast::run()
28 //Register for incoming multicast messages
29 receiveMulticastMsg();
31 while(shouldBeRunning)
33 processQueue();
34 Poco::Thread::sleep(500);
39 void Multicast::stop()
41 this -> shouldBeRunning = false;
44 void Multicast::processQueue()
46 //debug(logger, "Multicast::processQueue()");
48 //consume dispatched message
49 while (queue.size() != 0)
51 info(logger, "Processing multicast message");
52 //We Got the message.. let's process it
53 MessageNotification *msgNotify = dynamic_cast<MessageNotification*>
54 (queue.dequeueNotification());
55 if (msgNotify == NULL)
57 error(logger, "Notification for multicast response is NULL!");
58 continue; //let's see if there is any other message in queue.
61 debug(logger, "Multicast Message Received. Parsing message ....");
63 MsgMulticast *msgMulticast = dynamic_cast<MsgMulticast*> (msgNotify -> message);
64 debug(logger, "Multicast Initiated by: " + msgMulticast->getInitiatingPeer().toString());
65 RemoteNode thisNode;
66 app->getLocalNodeInfo(thisNode);
67 debug(logger, "This node is: " + thisNode.toString());
70 //Initiate Tree based Multicast
71 // Call localNode method findRemoteLinks with approprate flags
72 // in a loop send multicast event to all nodes in NodeSet
73 // end condition: if findRemoteLinks reuturns no node then no sendEvent & multicast finished.
75 RemoteNode initNode = msgMulticast->getInitiatingPeer();
76 multicastRouting(&initNode, msgMulticast->getEventType(), msgMulticast->getStep());
81 void Multicast::multicastRouting(Node* source, int event_type, int step)
83 NodeSet audienceSet;
84 int type = TYPE_PREFIX;
85 int multicastCount = 0;
86 bool status=false;
88 if (event_type >= 1 && event_type <= 4) {
89 status = app->searchRemoteLinks(*source, audienceSet, LOWER_LEVELS|EQUAL_LEVELS|FIND_PREFIXES|FIND_ALL);
90 } else {
91 status = app->searchRemoteLinks(*source, audienceSet, LOWER_LEVELS|EQUAL_LEVELS|FIND_SUFFIXES|FIND_ALL);
92 type = TYPE_SUFFIX;
95 debug(logger, status?"Audience Set Found":"No Audience Set Found");
97 if(status) {
98 //TODO: Use variable instead of 64
99 for (int i=step+1; i<=64; i++) {
100 Node *nextHop = app->getNextHop(audienceSet, source, i, type);
101 if (nextHop == NULL || nextHop->hexstring() == source->hexstring()) {
102 continue;
105 debug(logger, "Sending event to an audience set node ..... ");
106 int ack = sendEvent(source, nextHop, event_type, i);
108 if (ack > 0)
109 multicastCount++;
116 void Multicast::levelChangeEvent(int newLevel, Node *localNode)
118 //Initiating prefix multicast
119 int prefixEvent, suffixEvent;
120 if (localNode->getLevel() > newLevel)
122 prefixEvent = EVENT_PREFIX_LEVEL_UP;
123 suffixEvent = EVENT_SUFFIX_LEVEL_UP;
125 else
127 prefixEvent = EVENT_PREFIX_LEVEL_DOWN;
128 suffixEvent = EVENT_SUFFIX_LEVEL_DOWN;
131 Node *pTopNode;
132 if(! (app -> getTopNode(*localNode, &pTopNode, PREFIX)))
133 error(logger, "Cannot find Top Node!");
134 else debug(logger, "Top Node found");
135 int prefixAck = sendEvent(localNode, pTopNode, prefixEvent, 0);
137 if(! (app -> getTopNode(*localNode, &pTopNode, SUFFIX)))
138 error(logger, "Cannot find Top Node!");
139 else debug(logger, "Top Node found");
140 int suffixAck = sendEvent(localNode, pTopNode, suffixEvent, 0);
145 int Multicast::sendEvent(Node *source, Node *target, int event_type, int step)
147 if (source == NULL || target == NULL)
148 return -1;
150 unsigned int seqN = 0;
151 string log;
152 const char* method = "eventOut_sendEvent";
154 /* ------------------ LOG [Send Event]----------------------- */
156 string eventType = "Un-Known";
157 switch(event_type)
159 case EVENT_PREFIX_JOIN:
160 eventType = "Prefix join";
161 break;
163 case EVENT_PREFIX_DISCONNECT:
164 eventType = "Prefix disconnect";
165 break;
167 case EVENT_PREFIX_LEVEL_UP:
168 eventType = "Prefix level up";
169 break;
171 case EVENT_PREFIX_LEVEL_DOWN:
172 eventType = "Prefix level down";
173 break;
175 case EVENT_SUFFIX_JOIN:
176 eventType = "Suffix join";
177 break;
179 case EVENT_SUFFIX_DISCONNECT:
180 eventType = "Suffix disconnect";
181 break;
183 case EVENT_SUFFIX_LEVEL_UP:
184 eventType = "Suffix level up";
185 break;
187 case EVENT_SUFFIX_LEVEL_DOWN:
188 eventType = "Suffix level down";
189 break;
193 debug(logger,
194 "source:source_level= " + source->hexstring() + ":" + Util::toString(source->getLevel()) +
195 "\ntarget:target_level= " + target->hexstring() + ":" + Util::toString(target->getLevel()) +
196 "\nevent:step= " + eventType + ":" + Util::toString(step) );
199 /* ---------------------------------------------------------- */
201 //Get pointer to Topnode
203 RemoteNode *topNode;
205 RemoteNode *rNodePtr = (RemoteNode*) target;
206 RemoteNode rNode = *rNodePtr;
208 int statusConn = app->getConnection(rNode, &topNode);
210 //Register Message Type
211 MessageTypeData msgType;
212 msgType.registerMessageObj(MessageTypeData::MULTICAST, new Instantiator<MsgMulticast, AbstractMessage>);
214 RemoteNode srcNode;
215 app->getLocalNodeInfo(srcNode);
217 MsgMulticast *msgMulticast = new MsgMulticast("Multicast","Multicast", srcNode, event_type, step);
218 string str;
219 msgMulticast->getString(str);
221 //Instantiate MessageHeader object
222 MessageHeader *msgHeader = new MessageHeader(MessageHeader::HEADER_VERSION, str.length(), MessageTypeData::MULTICAST);
224 //Send Multicast Request Message to Top Node
225 topNode->sendMessage(msgHeader, msgMulticast);
227 return true;
232 //------------------------------------------------------------------------------
233 void Multicast::receiveMulticastMsg()
235 app->getConfig()->getMessageBus()->registerNotification("Multicast",
236 NObserver<Multicast, MessageNotification>(*this, &Multicast::onMulticastMsg));
239 void Multicast::onMulticastMsg(const AutoPtr<MessageNotification>& notification)
241 debug(logger, "Multicast::onMulticastMsg()");
243 MessageNotification *notificationObj = const_cast<MessageNotification*>(notification.get());
244 queue.enqueueNotification(notificationObj);
248 } //namespace Tourist
249 } //namespace App