Initial commit
[minnow.git] / src / aquarium / aquarium.cpp
blob505b56c88a605814c690cfd02a0df773f39be8ed
1 #include <deque>
2 #include <ext/hash_map>
3 #include <iostream>
4 #include <map>
5 #include <vector>
7 #include <boost/bind.hpp>
8 #include <boost/function.hpp>
9 #include <boost/thread.hpp>
10 #include <boost/thread/xtime.hpp>
12 #include <ctime>
14 #include "aquarium.hpp"
16 Actor *Thread::ActorIfLocal(actorId_t actorId, Actor *local) {
17 Actor *foundActor = NULL;
19 for (int i = 0; i < RECEIVER_CACHE_SIZE; ++i) {
20 if (local->receiverCache[i].containedId == actorId) {
21 if (local->receiverCache[i].actorPoolRevId == this->actorPoolRevId) {
22 return local->receiverCache[i].actor;
27 __gnu_cxx::hash_map<actorId_t, Actor*>::iterator finder = actorIds.find(actorId);
28 if (finder != actorIds.end()) {
29 foundActor = finder->second;
30 int index = local->nextCacheIndex + 1;
31 if (index >= RECEIVER_CACHE_SIZE) {
32 index = 0;
34 local->receiverCache[index].actorPoolRevId = this->actorPoolRevId;
35 local->receiverCache[index].actor = foundActor;
36 local->receiverCache[index].containedId = actorId;
37 local->nextCacheIndex = index;
39 return foundActor;
41 /**
42 Thread scheduler uses this to send messages to itself
45 void Thread::SendMessage(const Message &message)
47 __gnu_cxx::hash_map<actorId_t, Actor*>::iterator finder = actorIds.find(message.recipient);
48 if (finder != actorIds.end()) {
49 Actor *foundActor = finder->second;
51 //Receiving actors can be in one of two states to get a solution immediately. This allows quick turn around time
52 //in some cases even before the maintenance task has run.
53 if (message.messageType == MessageType::ACTION_MESSAGE) {
55 if (foundActor->actorState == ActorState::WAITING_FOR_ACTION) {
56 foundActor->task = message.task;
57 for (int i=0; i < message.numArgs; ++i) {
58 foundActor->heapStack.push_back(message.arg[i]);
61 foundActor->actorState = ActorState::ACTIVE;
62 this->hotActor = foundActor;
64 if (foundActor->runQueueRevId != this->runQueueRevId) {
65 runningActors.push_back(foundActor);
66 foundActor->runQueueRevId = this->runQueueRevId;
69 else {
70 foundActor->actionMessages.push_back(message);
74 else if ((foundActor->actorState == ActorState::WAITING_FOR_DATA) && (message.messageType == MessageType::DATA_MESSAGE)) {
75 __gnu_cxx::hash_map<int, int>::iterator findHandler = foundActor->dataHandlers.find(message.dataTaskTypeId);
77 if (findHandler != foundActor->dataHandlers.end()) {
78 //Set up our CPS to resume into a data receive
79 TypeUnion tu1;
80 tu1.UInt32 = findHandler->second;
81 for (int i=0; i < message.numArgs; ++i) {
82 foundActor->heapStack.push_back(message.arg[i]);
84 foundActor->heapStack.push_back(tu1);
85 foundActor->actorState = ActorState::ACTIVE;
86 foundActor->isResuming = true;
88 this->hotActor = foundActor;
90 //if (foundActor->runQueueRevId != this->runQueueRevId) {
91 runningActors.push_back(foundActor);
92 //foundActor->runQueueRevId = this->runQueueRevId;
93 //}
95 else {
96 localMail.push_back(message);
101 else {
102 localMail.push_back(message);
103 //foundActor->actionMessages.push_back(message);
106 else {
107 //we don't have this person, so send the message out to a mailman
108 //printf("<%i", message.recipient ); fflush(stdout);
109 outgoingChannel->sendMessage(message);
111 //printf("[%i]", message.recipient ); fflush(stdout);
115 Thread schedule uses this to check queued messages to see if any are ready to be consumed.
118 void Thread::ReceiveMessages() {
120 int pos = 0;
121 int end = localMail.size();
123 while (pos < end) {
124 Message &message = localMail[pos];
126 if (message.messageType == MessageType::ACTION_MESSAGE) {
127 __gnu_cxx::hash_map<actorId_t, Actor*>::iterator finder = actorIds.find(message.recipient);
128 //Check to see if the recipient is local
129 if (finder != actorIds.end()) {
130 Actor *foundActor = finder->second;
132 if (foundActor->actorState == ActorState::WAITING_FOR_ACTION) {
133 foundActor->task = message.task;
134 //BOOST_ASSERT(message.numArgs < 5);
135 if (message.numArgs > 4) {
136 std::vector<TypeUnion> *vtu = (std::vector<TypeUnion> *)(message.arg[0].VoidPtr);
137 for (std::vector<TypeUnion>::iterator vtu_iter = vtu->begin(), vtu_end = vtu->end(); vtu_iter != vtu_end; ++vtu_iter) {
138 foundActor->heapStack.push_back(*vtu_iter);
140 delete vtu;
142 else {
143 for (int i=0; i < message.numArgs; ++i) {
144 foundActor->heapStack.push_back(message.arg[i]);
147 foundActor->actorState = ActorState::ACTIVE;
148 if (foundActor->runQueueRevId != this->runQueueRevId) {
149 runningActors.push_back(foundActor);
150 foundActor->runQueueRevId = this->runQueueRevId;
153 else {
154 //If the actor isn't waiting for action, re-add them to the queue (the mail queue works similarly to
155 //the running actor process queue. Only the current batch is live at any moment, each cycle clears
156 //the old messages in one go (which is more efficient than picking them out one by one)
157 //localMail.push_back(message);
158 foundActor->actionMessages.push_back(message);
162 else {
163 //we don't have this person, so send the message out to a mailman
164 //printf(">%i", message.recipient ); fflush(stdout);
165 outgoingChannel->sendMessage(message);
169 else if (message.messageType == MessageType::DATA_MESSAGE) {
170 __gnu_cxx::hash_map<actorId_t, Actor*>::iterator finder = actorIds.find(message.recipient);
171 if (finder != actorIds.end()) {
172 Actor *foundActor = finder->second;
173 __gnu_cxx::hash_map<int, int>::iterator findHandler = foundActor->dataHandlers.find(message.dataTaskTypeId);
175 if (findHandler != foundActor->dataHandlers.end()) {
177 if (foundActor->actorState == ActorState::WAITING_FOR_DATA) {
178 //Set up our CPS to resume into a data receive
179 TypeUnion tu1;
180 tu1.UInt32 = findHandler->second;
181 if (message.numArgs > 4) {
182 std::vector<TypeUnion> *vtu = (std::vector<TypeUnion> *)message.arg[0].VoidPtr;
183 for (std::vector<TypeUnion>::iterator vtu_iter = vtu->begin(), vtu_end = vtu->end(); vtu_iter != vtu_end; ++vtu_iter) {
184 foundActor->heapStack.push_back(*vtu_iter);
186 delete vtu;
188 else {
189 for (int i=0; i < message.numArgs; ++i) {
190 foundActor->heapStack.push_back(message.arg[i]);
193 foundActor->heapStack.push_back(tu1);
194 foundActor->actorState = ActorState::ACTIVE;
195 foundActor->isResuming = true;
197 //if this activates the recipient, put it on the active queue
198 //if (foundActor->runQueueRevId != this->runQueueRevId) {
199 runningActors.push_back(foundActor);
200 //foundActor->runQueueRevId = this->runQueueRevId;
203 else {
204 localMail.push_back(message);
207 else {
208 localMail.push_back(message);
211 else {
212 //we don't have this person, so send the message out to a mailman
213 outgoingChannel->sendMessage(message);
217 else {
218 //The only type of mail that should get into localMail are mail messages themselves. Messages request
219 //a service are handled by the maintenance actor
220 std::cout << "ERROR: Unknown message type" << std::endl;
221 localMail.push_back(message);
223 ++pos;
226 //Clean out all messages we've looked through this pass (ones that were rescheduled live further in)
227 if (end != 0) {
228 localMail.erase(localMail.begin(), localMail.begin() + end);
233 Schedules an already existing actor onto the current thread;
235 void Thread::ScheduleExistingActor(Actor *actor) {
236 actor->parentThread = this;
238 //This is our phonebook for the actors on this thread
239 actorIds[actor->actorId] = actor;
241 if (actor->actorState == ActorState::ACTIVE)
243 runningActors.push_back(actor);
246 //clear out this actor's receiver cache, as it's no longer valid
247 for (int i = 0; i < RECEIVER_CACHE_SIZE; ++i) {
248 actor->receiverCache[i].containedId = -1;
250 actor->nextCacheIndex = 0;
252 actor->runQueueRevId = 0;
253 //actor->timesActive = 0;
255 Message message;
256 message.messageType = MessageType::ADD_ACTOR_ID;
257 message.arg[0].UInt32 = threadId;
258 message.arg[1].UInt32 = actor->actorId;
259 message.numArgs = 2;
260 outgoingChannel->sendMessage(message);
264 Schedules a new actor on the current thread
266 void Thread::ScheduleNewActor(Actor *actor) {
267 actor->actorId = nextActorId;
268 actor->actorState = ActorState::WAITING_FOR_ACTION;
269 actor->actorStateBeforeMove = ActorState::WAITING_FOR_ACTION;
271 ScheduleExistingActor(actor);
273 ++nextActorId;
274 if ((nextActorId & 0xffffff) == 0xffffff) {
275 //we have rolled over, just quit with an error for now
276 std::cout << "Exceeded allowed number of actors on scheduler " << this->threadId << std::endl;
277 std::cout << "This is currently a fatal error, and we must exit." << std::endl;
278 exit(1);
282 void Thread::ScheduleNewIsolatedActor(Actor *actor) {
283 //std::cout << "New Isolated: " << nextActorId << std::endl;
284 actor->actorId = nextActorId;
285 actor->actorState = ActorState::WAITING_FOR_ACTION;
286 actor->actorStateBeforeMove = ActorState::WAITING_FOR_ACTION;
288 ++nextActorId;
289 if ((nextActorId & 0xffffff) == 0xffffff) {
290 //we have rolled over, just quit with an error for now
291 std::cout << "Exceeded allowed number of actors on scheduler " << this->threadId << std::endl;
292 std::cout << "This is currently a fatal error, and we must exit." << std::endl;
293 exit(1);
296 Message message;
297 message.messageType = MessageType::CREATE_ISOLATED_ACTOR;
298 message.arg[0].VoidPtr = (void *)actor;
299 message.numArgs = 1;
300 this->outgoingChannel->sendMessage(message);
304 Removes an actor from the scheduler and passes a message up the chain to remove him from the phonebooks.
305 This version does not free the memory the actor was using, for that use 'DeleteActor' instead
307 void Thread::RemoveActor(Actor *actor) {
308 RemoveActor(actor, true);
311 void Thread::RemoveActors(std::vector<Actor*> *actors) {
312 std::vector<actorId_t> *toBeDeleted = new std::vector<actorId_t>();
314 for (std::vector<Actor*>::iterator iter=actors->begin(), end=actors->end(); iter!=end; ++iter) {
315 RemoveActor(*iter, false);
316 toBeDeleted->push_back((*iter)->actorId);
319 Message message;
320 message.messageType = MessageType::DELETE_ACTOR_IDS;
321 message.arg[0].UInt32 = threadId;
322 message.arg[1].VoidPtr = toBeDeleted;
323 message.numArgs = 2;
324 outgoingChannel->sendMessage(message);
327 void Thread::RemoveActor(Actor *actor, bool sendDeleteMsg) {
328 __gnu_cxx::hash_map<actorId_t, Actor*>::iterator iter = actorIds.find(actor->actorId);
329 std::vector<Actor*>::iterator iterActor;
331 ++this->actorPoolRevId;
332 if (this->actorPoolRevId == 0) {
333 //if we have gone so long that we roll over, then quickly invalidate our local actors' receiver caches so they don't get false positives
334 for (__gnu_cxx::hash_map<actorId_t, Actor*>::iterator resetIter = actorIds.begin(), resetEnd = actorIds.end(); resetIter != resetEnd; ++resetIter) {
335 for (int i = 0; i < RECEIVER_CACHE_SIZE; ++i) {
336 resetIter->second->receiverCache[i].containedId = -1;
341 if (iter != actorIds.end()) {
342 actorIds.erase(iter);
344 if (sendDeleteMsg) {
345 Message message;
346 message.messageType = MessageType::DELETE_ACTOR_ID;
347 message.arg[0].UInt32 = threadId;
348 message.arg[1].UInt32 = actor->actorId;
349 message.numArgs = 2;
350 outgoingChannel->sendMessage(message);
352 unsigned int j = 0;
353 while (j < runningActors.size()) {
354 if (runningActors[j]->actorId == actor->actorId)
355 runningActors.erase(runningActors.begin() + j);
356 else
357 ++j;
360 else {
361 std::cout << "Trying to remove " << actor->actorId << " from " << this->threadId << " but can't find it." << std::endl;
366 Remove the actor and completely delete it (for actors that were killed)
368 void Thread::DeleteActor(Actor *actor) {
370 RemoveActor(actor);
371 delete actor;
375 Schedules an already existing actor onto the current thread;
377 void Thread::SendStatus() {
378 Message message;
379 message.messageType = MessageType::LOAD_STATUS;
380 message.arg[0].UInt32 = this->threadId;
381 message.arg[1].Int32 = this->numberActiveActors;
383 message.numArgs = 2;
385 outgoingChannel->sendMessage(message);
389 Moves the actor which has been running the longest to another scheduler.
390 Note: Very likely better solutions exist, ones that could reschedule clumps of actors that have been working together,
391 but for now this will do.
394 //JDT: REFACTOR - breaking rebalance until we put in locality calculator
395 void Thread::MoveHeaviestActor(threadId_t destination, uint32_t count) {
396 //std::vector<Actor*>::iterator iter, chosen;
397 //bool foundOne;
398 std::vector<Actor*> *group;
399 if (count > 1) {
400 group = new std::vector<Actor*>();
403 //std::cout << "Moving " << count << std::endl;
404 //for (int k = 0; k < count; ++k) {
405 //for (iter = runningActors.begin(); iter != runningActors.end(); ++iter) {
406 unsigned int i = 0;
407 while (i < runningActors.size()) {
408 if ( (runningActors[i]->actorState == ActorState::ACTIVE) && (runningActors[i]->actorId != 0xffffffff)) {
410 //std::cout << "Found one, moving it" << std::endl;
411 Actor *chosenActor = runningActors[i];
412 chosenActor->actorStateBeforeMove = chosenActor->actorState;
414 chosenActor->actorState = ActorState::MOVED;
415 chosenActor->parentThread = NULL;
418 if (count == 1) {
419 //std::cout << "$$ " << count << " $$" << std::endl;
420 RemoveActor(chosenActor);
422 Message message;
423 message.messageType = MessageType::PLEASE_RECV_ACTOR;
424 message.arg[0].UInt32 = destination;
425 message.arg[1].VoidPtr = (void *)chosenActor;
427 message.numArgs = 2;
429 outgoingChannel->sendMessage(message);
430 break;
433 else {
434 group->push_back(chosenActor);
435 if (group->size() == count) {
436 break;
441 else {
442 ++i;
446 if (count > 1) {
447 RemoveActors(group);
449 //std::cout << "@@ " << count << " " << group->size() << " @@" << std::endl;
450 Message message;
451 message.messageType = MessageType::PLEASE_RECV_ACTORS;
452 message.arg[0].UInt32 = destination;
453 message.arg[1].VoidPtr = (void *)group;
455 message.numArgs = 2;
457 outgoingChannel->sendMessage(message);
463 The rebalancing algorithm the kernel uses to make sure the actors are equitably distributed across scheduler threads
465 void Thread::TaskRebalance() {
466 __gnu_cxx::hash_map<threadId_t, int>::const_iterator iter;
467 threadId_t lightest, heaviest;
468 int lightest_weight, heaviest_weight;
470 lightest = 0;
471 heaviest = 0;
472 lightest_weight = 99999999;
473 heaviest_weight = 0;
475 //The rebalance works by looking through the list of schedulers and their number of active actors, and if there is a
476 //disparatity greater than a threshhold (currently 2), it asks the more active scheduler to pass one of its actors
477 //to the lesser active scheduler.
479 iter = scheduleWeights->begin();
480 while (iter != scheduleWeights->end()) {
481 if (iter->second < lightest_weight) {
482 lightest = iter->first;
483 lightest_weight = iter->second;
485 if (iter->second > heaviest_weight) {
486 heaviest = iter->first;
487 heaviest_weight = iter->second;
489 ++iter;
491 if ((lightest != heaviest) && (lightest != 0) && (heaviest != 0)) {
492 if ((heaviest_weight - lightest_weight) > 1 + (heaviest_weight / 10)) {
493 //uint32_t amount = (heaviest_weight - lightest_weight) / 2; //(50 + heaviest_weight - lightest_weight) / 50;
494 uint32_t amount = (50 + heaviest_weight - lightest_weight) / 50;
495 //std::cout << "Moving: " << amount << " from " << heaviest << " to " << lightest << std::endl;
497 //if (amount > 500)
498 // amount = 500;
500 Message message;
501 message.messageType = MessageType::PLEASE_MOVE_ACTOR;
502 message.arg[0].UInt32 = lightest;
503 message.arg[1].UInt32 = amount;
504 message.numArgs = 2;
505 (*mailChannelOutgoing)[heaviest]->sendMessage(message);
506 iter = scheduleWeights->begin();
508 while (iter != scheduleWeights->end()) {
509 //std::cout << iter->first << ":" << iter->second << " ";
510 ++iter;
513 //std::cout << std::endl;
519 The mail check loop that mailmen actors use.
521 void Thread::MailCheck() {
522 int pos = 0;
523 int end = mailListIncoming->size();
525 //FIXME: There has to be a nicer way to keep track of lost messages
526 std::vector<Message> lostMessages;
528 while (pos < end) {
529 MailChannel *mChannel = (*mailListIncoming)[pos];
531 //For each channel, check mail.
532 if (mChannel->empty() == false) {
534 std::vector<Message> *incomingRemoteMail = mChannel->recvMessages();
535 std::vector<Message>::iterator inMailIter = incomingRemoteMail->begin();
537 while (inMailIter != incomingRemoteMail->end()) {
538 Message message = *inMailIter;
540 if ((message.messageType == MessageType::ACTION_MESSAGE)||(message.messageType == MessageType::DATA_MESSAGE)) {
541 //find recipient
542 __gnu_cxx::hash_map<actorId_t, threadId_t>::iterator finder = mailAddresses->find(message.recipient);
543 //we found who it goes to
544 if (finder != mailAddresses->end()) {
546 //if the actor isn't in transition/limbo, we deliver
547 if (finder->second != 0) {
548 __gnu_cxx::hash_map<threadId_t, MailChannel*> *channelMap = mailChannelOutgoing;
549 MailChannel *outgoing = (*channelMap)[finder->second];
550 outgoing->sendMessage(message);
552 else {
553 std::cout << "Actor is in LIMBO" << std::endl;
557 else {
558 //can't find anyone, so send the message to myself
559 lostMessages.push_back(message);
563 else if (message.messageType == MessageType::ADD_ACTOR_ID) {
564 threadId_t threadId = message.arg[0].UInt32;
565 actorId_t actorId = message.arg[1].UInt32;
566 __gnu_cxx::hash_map<actorId_t, threadId_t> *mailMap = mailAddresses;
567 (*mailMap)[actorId] = threadId;
569 else if (message.messageType == MessageType::CREATE_ISOLATED_ACTOR) {
570 //FIXME: This assumes receiver is a KERNEL but doesn't pass it along if it's a MAILMAN
572 //actorId_t actorId = message.arg[0].UInt32;
573 if (threadType == ThreadType::KERNEL) {
574 bool found = false;
576 Message messageIso;
577 messageIso.messageType = MessageType::PLEASE_RECV_ACTOR;
578 //message.arg[0] gets filled in below
579 messageIso.arg[1] = message.arg[0]; //Pass the isolated actor over to the receive actor message
581 std::vector<ThreadPoolThread>::iterator poolIter = threadPoolThreads->begin();
582 while ( (found == false) && (poolIter != threadPoolThreads->end()) ) {
583 //std::cout << "Inside search loop..." << std::endl;
584 if (poolIter->available == true) {
585 poolIter->available = false;
586 found = true;
587 messageIso.arg[0].UInt32 = poolIter->threadId;
588 //std::cout << "Found: " << poolIter->threadId << std::endl;
590 ++poolIter;
593 if (found == false) {
594 ThreadPoolThread tpt;
595 //std::cout << "Not found, creating one... " << actor->mailChannelOutgoing->size() + 1 << std::endl;
597 tpt.threadId = mailChannelOutgoing->size() + 1;// + actor->threadPoolThreads->size();
599 Thread *newThread = new Thread(tpt.threadId, (tpt.threadId-1) * 0x1000000);
601 //By not sending status, we'll take ourselves out of the rebalancer, which we need to do if we're in the thread pool for isolated actors.
602 newThread->sendStatus = false;
603 //std::cout << "Thread ID..." << newThread->threadId << " and " << tpt.threadId << std::endl;
605 tpt.thread = new boost::thread(boost::bind(&Thread::SchedulerLoop, newThread));
606 tpt.available = false; //we're about to schedule something, so it's not available for new actors yet
608 messageIso.arg[0].UInt32 = newThread->threadId;
609 mailListIncoming->push_back(newThread->outgoingChannel);
611 (*mailChannelOutgoing)[newThread->threadId] = newThread->incomingChannel;
613 threadPoolThreads->push_back(tpt);
616 (*mailChannelOutgoing)[messageIso.arg[0].UInt32]->sendMessage(messageIso);
620 else if (message.messageType == MessageType::DELETE_ACTOR_ID) {
621 //FIXME: When we move to a mailmen+kernel model instead of just a kernel model, we need to pass up the deleted actor
622 //threadId_t threadId = message.arg[0].UInt32;
624 actorId_t actorId = message.arg[1].UInt32;
626 __gnu_cxx::hash_map<actorId_t, threadId_t>::iterator finder = mailAddresses->find(actorId);
627 if (finder != mailAddresses->end()) {
628 threadId_t threadId = finder->second;
629 __gnu_cxx::hash_map<actorId_t, threadId_t> *mailMap = mailAddresses;
630 mailMap->erase(actorId);
631 for (unsigned int k = 0; k < threadPoolThreads->size(); ++k) {
632 if ((*threadPoolThreads)[k].threadId == threadId) {
633 (*threadPoolThreads)[k].available = true;
638 else if (message.messageType == MessageType::DELETE_ACTOR_IDS) {
639 //FIXME: When we move to a mailmen+kernel model instead of just a kernel model, we need to pass up the deleted actor
640 //threadId_t threadId = message.arg[0].UInt32;
641 std::vector<actorId_t> *actorIds = (std::vector<actorId_t> *)(message.arg[1].VoidPtr);
642 //std::cout << "Kernel: deleting " << actorIds->size() << " actors" << std::endl;
643 for (std::vector<actorId_t>::iterator actorIter = actorIds->begin(), actorEnd = actorIds->end(); actorIter != actorEnd; ++actorIter) {
644 __gnu_cxx::hash_map<actorId_t, threadId_t>::iterator finder = mailAddresses->find(*actorIter);
645 if (finder != mailAddresses->end()) {
646 threadId_t threadId = finder->second;
647 __gnu_cxx::hash_map<actorId_t, threadId_t> *mailMap = mailAddresses;
648 mailMap->erase(*actorIter);
649 for (unsigned int k = 0; k < threadPoolThreads->size(); ++k) {
650 if ((*threadPoolThreads)[k].threadId == threadId) {
651 (*threadPoolThreads)[k].available = true;
656 delete actorIds;
658 else if (message.messageType == MessageType::LOAD_STATUS) {
659 threadId_t threadId = message.arg[0].UInt32;
660 int32_t activeActors = message.arg[1].Int32;
662 std::cout << "Status: " << threadId << ": " << activeActors << std::endl;
663 (*(scheduleWeights))[threadId] = activeActors;
664 TaskRebalance();
666 else {
667 threadId_t threadId = message.arg[0].UInt32;
668 (*(mailChannelOutgoing))[threadId]->sendMessage(message);
670 ++inMailIter;
673 incomingRemoteMail->clear();
675 //Return to sender the messages that I don't have an address for
676 std::vector<Message>::iterator it = lostMessages.begin();
677 while (it != lostMessages.end()) {
678 mChannel->sendMessage(*it);
679 ++it;
681 lostMessages.clear();
683 ++pos;
689 The main scheduler loop for the thread. Runs through each actor and runs the ready ones.
690 Also checks for queued messages, activating them as necessary.
693 void Thread::SchedulerLoop() {
694 int pos;
695 const bool sendStatusUpdate = this->sendStatus;
696 std::vector<TypeUnion> *vtu;
698 std::vector<Actor *> deletedActors;
699 int slicePos;
701 this->previousActiveActors = -1;
702 this->numberActiveActors = 0;
704 pos = 0;
705 Actor *thisActor = NULL;
706 int sliceMultiplier = 1;
708 while (this->isRunning) {
709 if (this->runningActors[pos]->actorState == ActorState::ACTIVE) {
711 thisActor = this->runningActors[pos];
713 switch (thisActor->actorId) {
714 case (0xffffffff) :
715 //MAINTENANCE LOOP
716 if (this->threadType == ThreadType::KERNEL) {
717 MailCheck();
718 ++this->numberActiveActors;
720 if (this->numberActiveActors == 0) {
721 boost::xtime xt;
722 boost::xtime_get(&xt, boost::TIME_UTC);
723 xt.nsec += 1000000;
724 boost::thread::sleep(xt);
728 ++this->runQueueRevId;
729 if (this->runQueueRevId == 0) {
730 //if 0, we've rolled over, so reset everyone
731 for (__gnu_cxx::hash_map<actorId_t, Actor*>::iterator resetIter = this->actorIds.begin(), resetEnd = this->actorIds.end(); resetIter != resetEnd; ++resetIter)
733 (resetIter->second)->runQueueRevId = 0;
735 this->runQueueRevId = 1;
739 //Clean out old slots, using ourselves (the maintenance actor) as the marker. Actors that follow us were
740 //rescheduled (pushed) in this cycle so they are our active queue for the next cycle.
741 this->runningActors.erase(this->runningActors.begin(), this->runningActors.begin() + (1 + pos));
743 //Pull out any actors that were deleted during the last cycle
744 if (deletedActors.size() > 0) {
745 for (std::vector<Actor *>::iterator deletedIter = deletedActors.begin(), deletedEnd = deletedActors.end(); deletedIter != deletedEnd; ++deletedIter)
747 DeleteActor(*deletedIter);
749 deletedActors.clear();
752 //Sort through incoming system messages (which may also have mail)
753 if (this->incomingChannel->empty() == false) {
754 //std::cout << "Thread checking mail" << std::endl;
755 std::vector<Message> *incomingRemoteMail = this->incomingChannel->recvMessages();
756 std::vector<Message>::iterator inMailIter = incomingRemoteMail->begin();
758 while (inMailIter != incomingRemoteMail->end()) {
759 Message message = *inMailIter;
761 if ((message.messageType == MessageType::ACTION_MESSAGE)||(message.messageType == MessageType::DATA_MESSAGE)) {
762 this->localMail.push_back(message);
764 else if (message.messageType == MessageType::PLEASE_MOVE_ACTOR) {
765 threadId_t threadId = message.arg[0].UInt32;
766 uint32_t amount = message.arg[1].UInt32;
767 MoveHeaviestActor(threadId, amount);
769 else if (message.messageType == MessageType::PLEASE_RECV_ACTOR) {
770 Actor *newActor = (Actor *)(message.arg[1].VoidPtr);
771 newActor->actorState = newActor->actorStateBeforeMove;
772 ScheduleExistingActor(newActor);
774 else if (message.messageType == MessageType::PLEASE_RECV_ACTORS) {
775 std::vector<Actor*> *newActors = (std::vector<Actor*> *)(message.arg[1].VoidPtr);
776 //std::cout << "New message: " << newActors->size() << std::endl;
777 for(std::vector<Actor*>::iterator newIter = newActors->begin(), newEnd = newActors->end(); newIter != newEnd; ++newIter) {
778 //std::cout << "***MOVED***" << std::endl;
779 (*newIter)->actorState = (*newIter)->actorStateBeforeMove;
780 ScheduleExistingActor(*newIter);
782 delete newActors;
784 ++inMailIter;
786 incomingRemoteMail->clear();
788 pos = 0;
789 ReceiveMessages();
791 //Send a status update to the kernel if we've changed number of active actors this cycle.
792 if (sendStatusUpdate) {
793 if (this->numberActiveActors != this->previousActiveActors) {
794 SendStatus();
795 this->previousActiveActors = this->numberActiveActors;
799 this->numberActiveActors = 0;
801 break;
802 default:
803 this->timeSliceEndTime = TIMESLICE_QUANTUM;
804 if (thisActor->actionMessages.size() > TIMESLICE_QUANTUM) {
805 sliceMultiplier = thisActor->actionMessages.size() / TIMESLICE_QUANTUM;
806 this->timeSliceEndTime *= sliceMultiplier;
809 this->hotActor = NULL;
811 //Running the actor if it's active, as long as it is active and has slice remaining
813 slicePos = -1;
814 while ((thisActor->actorState == ActorState::ACTIVE) && (this->timeSliceEndTime > 0)) {
815 //Run the actor's current task
816 thisActor->task(thisActor);
818 switch(thisActor->actorState) {
819 case (ActorState::ACTIVE) :
820 thisActor->isResuming = true;
821 ++this->numberActiveActors;
822 //std::cout << "NAA: " << this->numberActiveActors << std::endl;
823 break;
824 case (ActorState::WAITING_FOR_ACTION) :
825 if (thisActor->actionMessages.size() > slicePos+1) {
826 ++slicePos;
827 Message message = thisActor->actionMessages[slicePos];
828 thisActor->task = message.task;
830 if (message.numArgs > 4) {
831 vtu = (std::vector<TypeUnion> *)(message.arg[0].VoidPtr);
832 for (std::vector<TypeUnion>::iterator vtu_iter = vtu->begin(), vtu_end = vtu->end(); vtu_iter != vtu_end; ++vtu_iter) {
833 thisActor->heapStack.push_back(*vtu_iter);
835 delete vtu;
837 else {
838 for (int i=0; i < message.numArgs; ++i) {
839 thisActor->heapStack.push_back(message.arg[i]);
843 thisActor->actorState = ActorState::ACTIVE;
845 if (this->timeSliceEndTime == 0) {
846 if (thisActor->runQueueRevId != this->runQueueRevId) {
847 thisActor->runQueueRevId = this->runQueueRevId;
848 this->runningActors.push_back(thisActor);
852 break;
853 case (ActorState::DELETED) :
854 deletedActors.push_back(thisActor);
855 break;
857 if ((thisActor->actorState != ActorState::ACTIVE) && (this->hotActor != NULL)) {
858 if (slicePos >= 0) {
859 thisActor->actionMessages.erase(thisActor->actionMessages.begin(), thisActor->actionMessages.begin() + slicePos + 1);
860 --slicePos;
862 thisActor = this->hotActor;
863 slicePos = -1;
868 if (slicePos >= 0) {
869 thisActor->actionMessages.erase(thisActor->actionMessages.begin(), thisActor->actionMessages.begin() + slicePos + 1);
870 --slicePos;
874 //In the case the actor is no longer active, it may have just sent a message. This will let another
875 //actor immediately go active (called here a 'hot actor'), allowing them to use the rest of the slice.
876 //This idea comes from "Improving IPC by Kernel Design" (l4ka.org/publications/1993/improving-ipc.pdf)
878 if (thisActor->actorState != ActorState::ACTIVE) {
880 if (this->hotActor != NULL) {
881 Actor *hotActor = this->hotActor;
882 while ((this->timeSliceEndTime > 0) && (this->hotActor != NULL)) {
883 hotActor = this->hotActor;
885 this->hotActor = NULL;
886 hotActor->task(hotActor);
888 //If it's still active afterward, it's an active actor, so count it
889 if (hotActor->actorState == ActorState::ACTIVE) {
890 hotActor->isResuming = true;
891 ++this->numberActiveActors;
892 //std::cout << "NAA: " << this->numberActiveActors << std::endl;
894 //If instead it's waiting for an action message, see if we have one to give it
895 else if (hotActor->actorState == ActorState::WAITING_FOR_ACTION) {
896 if (hotActor->actionMessages.size() > slicePos+1) {
897 ++slicePos;
898 Message message = hotActor->actionMessages[slicePos];
899 hotActor->task = message.task;
900 //BOOST_ASSERT(message.numArgs <= 4);
902 for (int i=0; i < message.numArgs; ++i) {
903 hotActor->heapStack.push_back(message.arg[i]);
905 hotActor->actorState = ActorState::ACTIVE;
907 if (this->timeSliceEndTime == 0) {
908 if (hotActor->runQueueRevId != this->runQueueRevId) {
909 hotActor->runQueueRevId = this->runQueueRevId;
910 this->runningActors.push_back(hotActor);
915 else if (hotActor->actorState == ActorState::DELETED) {
916 deletedActors.push_back(hotActor);
922 //if (thisActor->actorState == ActorState::DELETED) {
923 // deletedActors.push_back(thisActor);
929 ++pos;
930 break;
933 //If we're active, keep us in the running list
934 if ((thisActor->actorState == ActorState::ACTIVE) && (thisActor->runQueueRevId != this->runQueueRevId)) {
935 thisActor->runQueueRevId = this->runQueueRevId;
936 this->runningActors.push_back(thisActor);
939 else {
940 ++pos; //something that was active now isn't, so let's just skip over it.
945 int VM_Main(int argc, char *argv[], void(*task)(Actor *), bool passCmdLine) {
946 //boost::xtime xt;
947 int NUM_SCHED_THREADS = boost::thread::hardware_concurrency();
949 Thread *thread[NUM_SCHED_THREADS];
951 //FIXME!!!!!! This assumes only a certain number of actors will every be created by any one scheduler, and this might be a bad assumption.
952 //Also, actor ids are not currently being recycled, but likely should be.
953 for (int i = 0; i < NUM_SCHED_THREADS; ++i) {
954 thread[i] = new Thread(i+1, i * 0x1000000);
957 //Make the kernel thread the first scheduler thread
958 //It will do all normal scheduler duties and include kernel duties into them
959 thread[0]->threadType = ThreadType::KERNEL;
960 thread[0]->mailListIncoming = new std::vector<MailChannel*>();
961 thread[0]->mailAddresses = new __gnu_cxx::hash_map<actorId_t, threadId_t>();
962 thread[0]->mailChannelOutgoing = new __gnu_cxx::hash_map<threadId_t, MailChannel*>();
963 thread[0]->scheduleWeights = new __gnu_cxx::hash_map<threadId_t, int>();
964 thread[0]->threadPoolThreads = new std::vector<ThreadPoolThread>();
966 //connect up channels
967 for (int i = 0; i < NUM_SCHED_THREADS; ++i) {
968 thread[0]->mailListIncoming->push_back(thread[i]->outgoingChannel);
969 (*(thread[0])->mailChannelOutgoing)[thread[i]->threadId] = thread[i]->incomingChannel;
972 std::vector<std::string> *commandLineArgs = new std::vector<std::string>();
973 //Grab commandline args
974 for (int i = 1; i < argc; ++i) {
975 commandLineArgs->push_back(argv[i]);
978 //Schedule the main actor (like the main function in the program)
979 Actor *mainActor = new Actor();
980 thread[0]->ScheduleNewActor(mainActor);
982 if (passCmdLine) {
983 //And send it the first message
984 Message message;
985 message.recipient = 0;
986 message.numArgs = 1;
987 message.messageType = MessageType::ACTION_MESSAGE;
988 message.task = task;
989 message.arg[0].VoidPtr = commandLineArgs;
990 thread[0]->SendMessage(message);
992 else {
993 Message message;
994 message.recipient = 0;
995 message.numArgs = 0;
996 message.messageType = MessageType::ACTION_MESSAGE;
997 message.task = task;
998 thread[0]->SendMessage(message);
1001 boost::thread *bThread[NUM_SCHED_THREADS];
1002 for (int i = 1; i < NUM_SCHED_THREADS; ++i) {
1003 bThread[i] = new boost::thread(boost::bind(&Thread::SchedulerLoop, thread[i]));
1006 thread[0]->SchedulerLoop();
1008 //join the threads here.
1009 for (int i = 1; i < NUM_SCHED_THREADS; ++i) {
1010 bThread[i]->join();
1012 return 0;