2 #include <ext/hash_map>
7 #include <boost/bind.hpp>
8 #include <boost/function.hpp>
9 #include <boost/thread.hpp>
10 #include <boost/thread/xtime.hpp>
14 #include "aquarium.hpp"
16 Actor
*Thread::ActorIfLocal(actorId_t actorId
, Actor
*local
) {
17 Actor
*foundActor
= NULL
;
19 for (int i
= 0; i
< RECEIVER_CACHE_SIZE
; ++i
) {
20 if (local
->receiverCache
[i
].containedId
== actorId
) {
21 if (local
->receiverCache
[i
].actorPoolRevId
== this->actorPoolRevId
) {
22 return local
->receiverCache
[i
].actor
;
27 __gnu_cxx::hash_map
<actorId_t
, Actor
*>::iterator finder
= actorIds
.find(actorId
);
28 if (finder
!= actorIds
.end()) {
29 foundActor
= finder
->second
;
30 int index
= local
->nextCacheIndex
+ 1;
31 if (index
>= RECEIVER_CACHE_SIZE
) {
34 local
->receiverCache
[index
].actorPoolRevId
= this->actorPoolRevId
;
35 local
->receiverCache
[index
].actor
= foundActor
;
36 local
->receiverCache
[index
].containedId
= actorId
;
37 local
->nextCacheIndex
= index
;
42 Thread scheduler uses this to send messages to itself
45 void Thread::SendMessage(const Message
&message
)
47 __gnu_cxx::hash_map
<actorId_t
, Actor
*>::iterator finder
= actorIds
.find(message
.recipient
);
48 if (finder
!= actorIds
.end()) {
49 Actor
*foundActor
= finder
->second
;
51 //Receiving actors can be in one of two states to get a solution immediately. This allows quick turn around time
52 //in some cases even before the maintenance task has run.
53 if (message
.messageType
== MessageType::ACTION_MESSAGE
) {
55 if (foundActor
->actorState
== ActorState::WAITING_FOR_ACTION
) {
56 foundActor
->task
= message
.task
;
57 for (int i
=0; i
< message
.numArgs
; ++i
) {
58 foundActor
->heapStack
.push_back(message
.arg
[i
]);
61 foundActor
->actorState
= ActorState::ACTIVE
;
62 this->hotActor
= foundActor
;
64 if (foundActor
->runQueueRevId
!= this->runQueueRevId
) {
65 runningActors
.push_back(foundActor
);
66 foundActor
->runQueueRevId
= this->runQueueRevId
;
70 foundActor
->actionMessages
.push_back(message
);
74 else if ((foundActor->actorState == ActorState::WAITING_FOR_DATA) && (message.messageType == MessageType::DATA_MESSAGE)) {
75 __gnu_cxx::hash_map<int, int>::iterator findHandler = foundActor->dataHandlers.find(message.dataTaskTypeId);
77 if (findHandler != foundActor->dataHandlers.end()) {
78 //Set up our CPS to resume into a data receive
80 tu1.UInt32 = findHandler->second;
81 for (int i=0; i < message.numArgs; ++i) {
82 foundActor->heapStack.push_back(message.arg[i]);
84 foundActor->heapStack.push_back(tu1);
85 foundActor->actorState = ActorState::ACTIVE;
86 foundActor->isResuming = true;
88 this->hotActor = foundActor;
90 //if (foundActor->runQueueRevId != this->runQueueRevId) {
91 runningActors.push_back(foundActor);
92 //foundActor->runQueueRevId = this->runQueueRevId;
96 localMail.push_back(message);
102 localMail
.push_back(message
);
103 //foundActor->actionMessages.push_back(message);
107 //we don't have this person, so send the message out to a mailman
108 //printf("<%i", message.recipient ); fflush(stdout);
109 outgoingChannel
->sendMessage(message
);
111 //printf("[%i]", message.recipient ); fflush(stdout);
115 Thread schedule uses this to check queued messages to see if any are ready to be consumed.
118 void Thread::ReceiveMessages() {
121 int end
= localMail
.size();
124 Message
&message
= localMail
[pos
];
126 if (message
.messageType
== MessageType::ACTION_MESSAGE
) {
127 __gnu_cxx::hash_map
<actorId_t
, Actor
*>::iterator finder
= actorIds
.find(message
.recipient
);
128 //Check to see if the recipient is local
129 if (finder
!= actorIds
.end()) {
130 Actor
*foundActor
= finder
->second
;
132 if (foundActor
->actorState
== ActorState::WAITING_FOR_ACTION
) {
133 foundActor
->task
= message
.task
;
134 //BOOST_ASSERT(message.numArgs < 5);
135 if (message
.numArgs
> 4) {
136 std::vector
<TypeUnion
> *vtu
= (std::vector
<TypeUnion
> *)(message
.arg
[0].VoidPtr
);
137 for (std::vector
<TypeUnion
>::iterator vtu_iter
= vtu
->begin(), vtu_end
= vtu
->end(); vtu_iter
!= vtu_end
; ++vtu_iter
) {
138 foundActor
->heapStack
.push_back(*vtu_iter
);
143 for (int i
=0; i
< message
.numArgs
; ++i
) {
144 foundActor
->heapStack
.push_back(message
.arg
[i
]);
147 foundActor
->actorState
= ActorState::ACTIVE
;
148 if (foundActor
->runQueueRevId
!= this->runQueueRevId
) {
149 runningActors
.push_back(foundActor
);
150 foundActor
->runQueueRevId
= this->runQueueRevId
;
154 //If the actor isn't waiting for action, re-add them to the queue (the mail queue works similarly to
155 //the running actor process queue. Only the current batch is live at any moment, each cycle clears
156 //the old messages in one go (which is more efficient than picking them out one by one)
157 //localMail.push_back(message);
158 foundActor
->actionMessages
.push_back(message
);
163 //we don't have this person, so send the message out to a mailman
164 //printf(">%i", message.recipient ); fflush(stdout);
165 outgoingChannel
->sendMessage(message
);
169 else if (message.messageType == MessageType::DATA_MESSAGE) {
170 __gnu_cxx::hash_map<actorId_t, Actor*>::iterator finder = actorIds.find(message.recipient);
171 if (finder != actorIds.end()) {
172 Actor *foundActor = finder->second;
173 __gnu_cxx::hash_map<int, int>::iterator findHandler = foundActor->dataHandlers.find(message.dataTaskTypeId);
175 if (findHandler != foundActor->dataHandlers.end()) {
177 if (foundActor->actorState == ActorState::WAITING_FOR_DATA) {
178 //Set up our CPS to resume into a data receive
180 tu1.UInt32 = findHandler->second;
181 if (message.numArgs > 4) {
182 std::vector<TypeUnion> *vtu = (std::vector<TypeUnion> *)message.arg[0].VoidPtr;
183 for (std::vector<TypeUnion>::iterator vtu_iter = vtu->begin(), vtu_end = vtu->end(); vtu_iter != vtu_end; ++vtu_iter) {
184 foundActor->heapStack.push_back(*vtu_iter);
189 for (int i=0; i < message.numArgs; ++i) {
190 foundActor->heapStack.push_back(message.arg[i]);
193 foundActor->heapStack.push_back(tu1);
194 foundActor->actorState = ActorState::ACTIVE;
195 foundActor->isResuming = true;
197 //if this activates the recipient, put it on the active queue
198 //if (foundActor->runQueueRevId != this->runQueueRevId) {
199 runningActors.push_back(foundActor);
200 //foundActor->runQueueRevId = this->runQueueRevId;
204 localMail.push_back(message);
208 localMail.push_back(message);
212 //we don't have this person, so send the message out to a mailman
213 outgoingChannel->sendMessage(message);
218 //The only type of mail that should get into localMail are mail messages themselves. Messages request
219 //a service are handled by the maintenance actor
220 std::cout
<< "ERROR: Unknown message type" << std::endl
;
221 localMail
.push_back(message
);
226 //Clean out all messages we've looked through this pass (ones that were rescheduled live further in)
228 localMail
.erase(localMail
.begin(), localMail
.begin() + end
);
233 Schedules an already existing actor onto the current thread;
235 void Thread::ScheduleExistingActor(Actor
*actor
) {
236 actor
->parentThread
= this;
238 //This is our phonebook for the actors on this thread
239 actorIds
[actor
->actorId
] = actor
;
241 if (actor
->actorState
== ActorState::ACTIVE
)
243 runningActors
.push_back(actor
);
246 //clear out this actor's receiver cache, as it's no longer valid
247 for (int i
= 0; i
< RECEIVER_CACHE_SIZE
; ++i
) {
248 actor
->receiverCache
[i
].containedId
= -1;
250 actor
->nextCacheIndex
= 0;
252 actor
->runQueueRevId
= 0;
253 //actor->timesActive = 0;
256 message
.messageType
= MessageType::ADD_ACTOR_ID
;
257 message
.arg
[0].UInt32
= threadId
;
258 message
.arg
[1].UInt32
= actor
->actorId
;
260 outgoingChannel
->sendMessage(message
);
264 Schedules a new actor on the current thread
266 void Thread::ScheduleNewActor(Actor
*actor
) {
267 actor
->actorId
= nextActorId
;
268 actor
->actorState
= ActorState::WAITING_FOR_ACTION
;
269 actor
->actorStateBeforeMove
= ActorState::WAITING_FOR_ACTION
;
271 ScheduleExistingActor(actor
);
274 if ((nextActorId
& 0xffffff) == 0xffffff) {
275 //we have rolled over, just quit with an error for now
276 std::cout
<< "Exceeded allowed number of actors on scheduler " << this->threadId
<< std::endl
;
277 std::cout
<< "This is currently a fatal error, and we must exit." << std::endl
;
282 void Thread::ScheduleNewIsolatedActor(Actor
*actor
) {
283 //std::cout << "New Isolated: " << nextActorId << std::endl;
284 actor
->actorId
= nextActorId
;
285 actor
->actorState
= ActorState::WAITING_FOR_ACTION
;
286 actor
->actorStateBeforeMove
= ActorState::WAITING_FOR_ACTION
;
289 if ((nextActorId
& 0xffffff) == 0xffffff) {
290 //we have rolled over, just quit with an error for now
291 std::cout
<< "Exceeded allowed number of actors on scheduler " << this->threadId
<< std::endl
;
292 std::cout
<< "This is currently a fatal error, and we must exit." << std::endl
;
297 message
.messageType
= MessageType::CREATE_ISOLATED_ACTOR
;
298 message
.arg
[0].VoidPtr
= (void *)actor
;
300 this->outgoingChannel
->sendMessage(message
);
304 Removes an actor from the scheduler and passes a message up the chain to remove him from the phonebooks.
305 This version does not free the memory the actor was using, for that use 'DeleteActor' instead
307 void Thread::RemoveActor(Actor
*actor
) {
308 RemoveActor(actor
, true);
311 void Thread::RemoveActors(std::vector
<Actor
*> *actors
) {
312 std::vector
<actorId_t
> *toBeDeleted
= new std::vector
<actorId_t
>();
314 for (std::vector
<Actor
*>::iterator iter
=actors
->begin(), end
=actors
->end(); iter
!=end
; ++iter
) {
315 RemoveActor(*iter
, false);
316 toBeDeleted
->push_back((*iter
)->actorId
);
320 message
.messageType
= MessageType::DELETE_ACTOR_IDS
;
321 message
.arg
[0].UInt32
= threadId
;
322 message
.arg
[1].VoidPtr
= toBeDeleted
;
324 outgoingChannel
->sendMessage(message
);
327 void Thread::RemoveActor(Actor
*actor
, bool sendDeleteMsg
) {
328 __gnu_cxx::hash_map
<actorId_t
, Actor
*>::iterator iter
= actorIds
.find(actor
->actorId
);
329 std::vector
<Actor
*>::iterator iterActor
;
331 ++this->actorPoolRevId
;
332 if (this->actorPoolRevId
== 0) {
333 //if we have gone so long that we roll over, then quickly invalidate our local actors' receiver caches so they don't get false positives
334 for (__gnu_cxx::hash_map
<actorId_t
, Actor
*>::iterator resetIter
= actorIds
.begin(), resetEnd
= actorIds
.end(); resetIter
!= resetEnd
; ++resetIter
) {
335 for (int i
= 0; i
< RECEIVER_CACHE_SIZE
; ++i
) {
336 resetIter
->second
->receiverCache
[i
].containedId
= -1;
341 if (iter
!= actorIds
.end()) {
342 actorIds
.erase(iter
);
346 message
.messageType
= MessageType::DELETE_ACTOR_ID
;
347 message
.arg
[0].UInt32
= threadId
;
348 message
.arg
[1].UInt32
= actor
->actorId
;
350 outgoingChannel
->sendMessage(message
);
353 while (j
< runningActors
.size()) {
354 if (runningActors
[j
]->actorId
== actor
->actorId
)
355 runningActors
.erase(runningActors
.begin() + j
);
361 std::cout
<< "Trying to remove " << actor
->actorId
<< " from " << this->threadId
<< " but can't find it." << std::endl
;
366 Remove the actor and completely delete it (for actors that were killed)
368 void Thread::DeleteActor(Actor
*actor
) {
375 Schedules an already existing actor onto the current thread;
377 void Thread::SendStatus() {
379 message
.messageType
= MessageType::LOAD_STATUS
;
380 message
.arg
[0].UInt32
= this->threadId
;
381 message
.arg
[1].Int32
= this->numberActiveActors
;
385 outgoingChannel
->sendMessage(message
);
389 Moves the actor which has been running the longest to another scheduler.
390 Note: Very likely better solutions exist, ones that could reschedule clumps of actors that have been working together,
391 but for now this will do.
394 //JDT: REFACTOR - breaking rebalance until we put in locality calculator
395 void Thread::MoveHeaviestActor(threadId_t destination
, uint32_t count
) {
396 //std::vector<Actor*>::iterator iter, chosen;
398 std::vector
<Actor
*> *group
;
400 group
= new std::vector
<Actor
*>();
403 //std::cout << "Moving " << count << std::endl;
404 //for (int k = 0; k < count; ++k) {
405 //for (iter = runningActors.begin(); iter != runningActors.end(); ++iter) {
407 while (i
< runningActors
.size()) {
408 if ( (runningActors
[i
]->actorState
== ActorState::ACTIVE
) && (runningActors
[i
]->actorId
!= 0xffffffff)) {
410 //std::cout << "Found one, moving it" << std::endl;
411 Actor
*chosenActor
= runningActors
[i
];
412 chosenActor
->actorStateBeforeMove
= chosenActor
->actorState
;
414 chosenActor
->actorState
= ActorState::MOVED
;
415 chosenActor
->parentThread
= NULL
;
419 //std::cout << "$$ " << count << " $$" << std::endl;
420 RemoveActor(chosenActor
);
423 message
.messageType
= MessageType::PLEASE_RECV_ACTOR
;
424 message
.arg
[0].UInt32
= destination
;
425 message
.arg
[1].VoidPtr
= (void *)chosenActor
;
429 outgoingChannel
->sendMessage(message
);
434 group
->push_back(chosenActor
);
435 if (group
->size() == count
) {
449 //std::cout << "@@ " << count << " " << group->size() << " @@" << std::endl;
451 message
.messageType
= MessageType::PLEASE_RECV_ACTORS
;
452 message
.arg
[0].UInt32
= destination
;
453 message
.arg
[1].VoidPtr
= (void *)group
;
457 outgoingChannel
->sendMessage(message
);
463 The rebalancing algorithm the kernel uses to make sure the actors are equitably distributed across scheduler threads
465 void Thread::TaskRebalance() {
466 __gnu_cxx::hash_map
<threadId_t
, int>::const_iterator iter
;
467 threadId_t lightest
, heaviest
;
468 int lightest_weight
, heaviest_weight
;
472 lightest_weight
= 99999999;
475 //The rebalance works by looking through the list of schedulers and their number of active actors, and if there is a
476 //disparatity greater than a threshhold (currently 2), it asks the more active scheduler to pass one of its actors
477 //to the lesser active scheduler.
479 iter
= scheduleWeights
->begin();
480 while (iter
!= scheduleWeights
->end()) {
481 if (iter
->second
< lightest_weight
) {
482 lightest
= iter
->first
;
483 lightest_weight
= iter
->second
;
485 if (iter
->second
> heaviest_weight
) {
486 heaviest
= iter
->first
;
487 heaviest_weight
= iter
->second
;
491 if ((lightest
!= heaviest
) && (lightest
!= 0) && (heaviest
!= 0)) {
492 if ((heaviest_weight
- lightest_weight
) > 1 + (heaviest_weight
/ 10)) {
493 //uint32_t amount = (heaviest_weight - lightest_weight) / 2; //(50 + heaviest_weight - lightest_weight) / 50;
494 uint32_t amount
= (50 + heaviest_weight
- lightest_weight
) / 50;
495 //std::cout << "Moving: " << amount << " from " << heaviest << " to " << lightest << std::endl;
501 message
.messageType
= MessageType::PLEASE_MOVE_ACTOR
;
502 message
.arg
[0].UInt32
= lightest
;
503 message
.arg
[1].UInt32
= amount
;
505 (*mailChannelOutgoing
)[heaviest
]->sendMessage(message
);
506 iter
= scheduleWeights
->begin();
508 while (iter != scheduleWeights->end()) {
509 //std::cout << iter->first << ":" << iter->second << " ";
513 //std::cout << std::endl;
519 The mail check loop that mailmen actors use.
521 void Thread::MailCheck() {
523 int end
= mailListIncoming
->size();
525 //FIXME: There has to be a nicer way to keep track of lost messages
526 std::vector
<Message
> lostMessages
;
529 MailChannel
*mChannel
= (*mailListIncoming
)[pos
];
531 //For each channel, check mail.
532 if (mChannel
->empty() == false) {
534 std::vector
<Message
> *incomingRemoteMail
= mChannel
->recvMessages();
535 std::vector
<Message
>::iterator inMailIter
= incomingRemoteMail
->begin();
537 while (inMailIter
!= incomingRemoteMail
->end()) {
538 Message message
= *inMailIter
;
540 if ((message
.messageType
== MessageType::ACTION_MESSAGE
)||(message
.messageType
== MessageType::DATA_MESSAGE
)) {
542 __gnu_cxx::hash_map
<actorId_t
, threadId_t
>::iterator finder
= mailAddresses
->find(message
.recipient
);
543 //we found who it goes to
544 if (finder
!= mailAddresses
->end()) {
546 //if the actor isn't in transition/limbo, we deliver
547 if (finder
->second
!= 0) {
548 __gnu_cxx::hash_map
<threadId_t
, MailChannel
*> *channelMap
= mailChannelOutgoing
;
549 MailChannel
*outgoing
= (*channelMap
)[finder
->second
];
550 outgoing
->sendMessage(message
);
553 std::cout
<< "Actor is in LIMBO" << std::endl
;
558 //can't find anyone, so send the message to myself
559 lostMessages
.push_back(message
);
563 else if (message
.messageType
== MessageType::ADD_ACTOR_ID
) {
564 threadId_t threadId
= message
.arg
[0].UInt32
;
565 actorId_t actorId
= message
.arg
[1].UInt32
;
566 __gnu_cxx::hash_map
<actorId_t
, threadId_t
> *mailMap
= mailAddresses
;
567 (*mailMap
)[actorId
] = threadId
;
569 else if (message
.messageType
== MessageType::CREATE_ISOLATED_ACTOR
) {
570 //FIXME: This assumes receiver is a KERNEL but doesn't pass it along if it's a MAILMAN
572 //actorId_t actorId = message.arg[0].UInt32;
573 if (threadType
== ThreadType::KERNEL
) {
577 messageIso
.messageType
= MessageType::PLEASE_RECV_ACTOR
;
578 //message.arg[0] gets filled in below
579 messageIso
.arg
[1] = message
.arg
[0]; //Pass the isolated actor over to the receive actor message
581 std::vector
<ThreadPoolThread
>::iterator poolIter
= threadPoolThreads
->begin();
582 while ( (found
== false) && (poolIter
!= threadPoolThreads
->end()) ) {
583 //std::cout << "Inside search loop..." << std::endl;
584 if (poolIter
->available
== true) {
585 poolIter
->available
= false;
587 messageIso
.arg
[0].UInt32
= poolIter
->threadId
;
588 //std::cout << "Found: " << poolIter->threadId << std::endl;
593 if (found
== false) {
594 ThreadPoolThread tpt
;
595 //std::cout << "Not found, creating one... " << actor->mailChannelOutgoing->size() + 1 << std::endl;
597 tpt
.threadId
= mailChannelOutgoing
->size() + 1;// + actor->threadPoolThreads->size();
599 Thread
*newThread
= new Thread(tpt
.threadId
, (tpt
.threadId
-1) * 0x1000000);
601 //By not sending status, we'll take ourselves out of the rebalancer, which we need to do if we're in the thread pool for isolated actors.
602 newThread
->sendStatus
= false;
603 //std::cout << "Thread ID..." << newThread->threadId << " and " << tpt.threadId << std::endl;
605 tpt
.thread
= new boost::thread(boost::bind(&Thread::SchedulerLoop
, newThread
));
606 tpt
.available
= false; //we're about to schedule something, so it's not available for new actors yet
608 messageIso
.arg
[0].UInt32
= newThread
->threadId
;
609 mailListIncoming
->push_back(newThread
->outgoingChannel
);
611 (*mailChannelOutgoing
)[newThread
->threadId
] = newThread
->incomingChannel
;
613 threadPoolThreads
->push_back(tpt
);
616 (*mailChannelOutgoing
)[messageIso
.arg
[0].UInt32
]->sendMessage(messageIso
);
620 else if (message
.messageType
== MessageType::DELETE_ACTOR_ID
) {
621 //FIXME: When we move to a mailmen+kernel model instead of just a kernel model, we need to pass up the deleted actor
622 //threadId_t threadId = message.arg[0].UInt32;
624 actorId_t actorId
= message
.arg
[1].UInt32
;
626 __gnu_cxx::hash_map
<actorId_t
, threadId_t
>::iterator finder
= mailAddresses
->find(actorId
);
627 if (finder
!= mailAddresses
->end()) {
628 threadId_t threadId
= finder
->second
;
629 __gnu_cxx::hash_map
<actorId_t
, threadId_t
> *mailMap
= mailAddresses
;
630 mailMap
->erase(actorId
);
631 for (unsigned int k
= 0; k
< threadPoolThreads
->size(); ++k
) {
632 if ((*threadPoolThreads
)[k
].threadId
== threadId
) {
633 (*threadPoolThreads
)[k
].available
= true;
638 else if (message
.messageType
== MessageType::DELETE_ACTOR_IDS
) {
639 //FIXME: When we move to a mailmen+kernel model instead of just a kernel model, we need to pass up the deleted actor
640 //threadId_t threadId = message.arg[0].UInt32;
641 std::vector
<actorId_t
> *actorIds
= (std::vector
<actorId_t
> *)(message
.arg
[1].VoidPtr
);
642 //std::cout << "Kernel: deleting " << actorIds->size() << " actors" << std::endl;
643 for (std::vector
<actorId_t
>::iterator actorIter
= actorIds
->begin(), actorEnd
= actorIds
->end(); actorIter
!= actorEnd
; ++actorIter
) {
644 __gnu_cxx::hash_map
<actorId_t
, threadId_t
>::iterator finder
= mailAddresses
->find(*actorIter
);
645 if (finder
!= mailAddresses
->end()) {
646 threadId_t threadId
= finder
->second
;
647 __gnu_cxx::hash_map
<actorId_t
, threadId_t
> *mailMap
= mailAddresses
;
648 mailMap
->erase(*actorIter
);
649 for (unsigned int k
= 0; k
< threadPoolThreads
->size(); ++k
) {
650 if ((*threadPoolThreads
)[k
].threadId
== threadId
) {
651 (*threadPoolThreads
)[k
].available
= true;
658 else if (message
.messageType
== MessageType::LOAD_STATUS
) {
659 threadId_t threadId
= message
.arg
[0].UInt32
;
660 int32_t activeActors
= message
.arg
[1].Int32
;
662 std::cout
<< "Status: " << threadId
<< ": " << activeActors
<< std::endl
;
663 (*(scheduleWeights
))[threadId
] = activeActors
;
667 threadId_t threadId
= message
.arg
[0].UInt32
;
668 (*(mailChannelOutgoing
))[threadId
]->sendMessage(message
);
673 incomingRemoteMail
->clear();
675 //Return to sender the messages that I don't have an address for
676 std::vector
<Message
>::iterator it
= lostMessages
.begin();
677 while (it
!= lostMessages
.end()) {
678 mChannel
->sendMessage(*it
);
681 lostMessages
.clear();
689 The main scheduler loop for the thread. Runs through each actor and runs the ready ones.
690 Also checks for queued messages, activating them as necessary.
693 void Thread::SchedulerLoop() {
695 const bool sendStatusUpdate
= this->sendStatus
;
696 std::vector
<TypeUnion
> *vtu
;
698 std::vector
<Actor
*> deletedActors
;
701 this->previousActiveActors
= -1;
702 this->numberActiveActors
= 0;
705 Actor
*thisActor
= NULL
;
706 int sliceMultiplier
= 1;
708 while (this->isRunning
) {
709 if (this->runningActors
[pos
]->actorState
== ActorState::ACTIVE
) {
711 thisActor
= this->runningActors
[pos
];
713 switch (thisActor
->actorId
) {
716 if (this->threadType
== ThreadType::KERNEL
) {
718 ++this->numberActiveActors
;
720 if (this->numberActiveActors
== 0) {
722 boost::xtime_get(&xt
, boost::TIME_UTC
);
724 boost::thread::sleep(xt
);
728 ++this->runQueueRevId
;
729 if (this->runQueueRevId
== 0) {
730 //if 0, we've rolled over, so reset everyone
731 for (__gnu_cxx::hash_map
<actorId_t
, Actor
*>::iterator resetIter
= this->actorIds
.begin(), resetEnd
= this->actorIds
.end(); resetIter
!= resetEnd
; ++resetIter
)
733 (resetIter
->second
)->runQueueRevId
= 0;
735 this->runQueueRevId
= 1;
739 //Clean out old slots, using ourselves (the maintenance actor) as the marker. Actors that follow us were
740 //rescheduled (pushed) in this cycle so they are our active queue for the next cycle.
741 this->runningActors
.erase(this->runningActors
.begin(), this->runningActors
.begin() + (1 + pos
));
743 //Pull out any actors that were deleted during the last cycle
744 if (deletedActors
.size() > 0) {
745 for (std::vector
<Actor
*>::iterator deletedIter
= deletedActors
.begin(), deletedEnd
= deletedActors
.end(); deletedIter
!= deletedEnd
; ++deletedIter
)
747 DeleteActor(*deletedIter
);
749 deletedActors
.clear();
752 //Sort through incoming system messages (which may also have mail)
753 if (this->incomingChannel
->empty() == false) {
754 //std::cout << "Thread checking mail" << std::endl;
755 std::vector
<Message
> *incomingRemoteMail
= this->incomingChannel
->recvMessages();
756 std::vector
<Message
>::iterator inMailIter
= incomingRemoteMail
->begin();
758 while (inMailIter
!= incomingRemoteMail
->end()) {
759 Message message
= *inMailIter
;
761 if ((message
.messageType
== MessageType::ACTION_MESSAGE
)||(message
.messageType
== MessageType::DATA_MESSAGE
)) {
762 this->localMail
.push_back(message
);
764 else if (message
.messageType
== MessageType::PLEASE_MOVE_ACTOR
) {
765 threadId_t threadId
= message
.arg
[0].UInt32
;
766 uint32_t amount
= message
.arg
[1].UInt32
;
767 MoveHeaviestActor(threadId
, amount
);
769 else if (message
.messageType
== MessageType::PLEASE_RECV_ACTOR
) {
770 Actor
*newActor
= (Actor
*)(message
.arg
[1].VoidPtr
);
771 newActor
->actorState
= newActor
->actorStateBeforeMove
;
772 ScheduleExistingActor(newActor
);
774 else if (message
.messageType
== MessageType::PLEASE_RECV_ACTORS
) {
775 std::vector
<Actor
*> *newActors
= (std::vector
<Actor
*> *)(message
.arg
[1].VoidPtr
);
776 //std::cout << "New message: " << newActors->size() << std::endl;
777 for(std::vector
<Actor
*>::iterator newIter
= newActors
->begin(), newEnd
= newActors
->end(); newIter
!= newEnd
; ++newIter
) {
778 //std::cout << "***MOVED***" << std::endl;
779 (*newIter
)->actorState
= (*newIter
)->actorStateBeforeMove
;
780 ScheduleExistingActor(*newIter
);
786 incomingRemoteMail
->clear();
791 //Send a status update to the kernel if we've changed number of active actors this cycle.
792 if (sendStatusUpdate
) {
793 if (this->numberActiveActors
!= this->previousActiveActors
) {
795 this->previousActiveActors
= this->numberActiveActors
;
799 this->numberActiveActors
= 0;
803 this->timeSliceEndTime
= TIMESLICE_QUANTUM
;
804 if (thisActor
->actionMessages
.size() > TIMESLICE_QUANTUM
) {
805 sliceMultiplier
= thisActor
->actionMessages
.size() / TIMESLICE_QUANTUM
;
806 this->timeSliceEndTime
*= sliceMultiplier
;
809 this->hotActor
= NULL
;
811 //Running the actor if it's active, as long as it is active and has slice remaining
814 while ((thisActor
->actorState
== ActorState::ACTIVE
) && (this->timeSliceEndTime
> 0)) {
815 //Run the actor's current task
816 thisActor
->task(thisActor
);
818 switch(thisActor
->actorState
) {
819 case (ActorState::ACTIVE
) :
820 thisActor
->isResuming
= true;
821 ++this->numberActiveActors
;
822 //std::cout << "NAA: " << this->numberActiveActors << std::endl;
824 case (ActorState::WAITING_FOR_ACTION
) :
825 if (thisActor
->actionMessages
.size() > slicePos
+1) {
827 Message message
= thisActor
->actionMessages
[slicePos
];
828 thisActor
->task
= message
.task
;
830 if (message
.numArgs
> 4) {
831 vtu
= (std::vector
<TypeUnion
> *)(message
.arg
[0].VoidPtr
);
832 for (std::vector
<TypeUnion
>::iterator vtu_iter
= vtu
->begin(), vtu_end
= vtu
->end(); vtu_iter
!= vtu_end
; ++vtu_iter
) {
833 thisActor
->heapStack
.push_back(*vtu_iter
);
838 for (int i
=0; i
< message
.numArgs
; ++i
) {
839 thisActor
->heapStack
.push_back(message
.arg
[i
]);
843 thisActor
->actorState
= ActorState::ACTIVE
;
845 if (this->timeSliceEndTime
== 0) {
846 if (thisActor
->runQueueRevId
!= this->runQueueRevId
) {
847 thisActor
->runQueueRevId
= this->runQueueRevId
;
848 this->runningActors
.push_back(thisActor
);
853 case (ActorState::DELETED
) :
854 deletedActors
.push_back(thisActor
);
857 if ((thisActor
->actorState
!= ActorState::ACTIVE
) && (this->hotActor
!= NULL
)) {
859 thisActor
->actionMessages
.erase(thisActor
->actionMessages
.begin(), thisActor
->actionMessages
.begin() + slicePos
+ 1);
862 thisActor
= this->hotActor
;
869 thisActor
->actionMessages
.erase(thisActor
->actionMessages
.begin(), thisActor
->actionMessages
.begin() + slicePos
+ 1);
874 //In the case the actor is no longer active, it may have just sent a message. This will let another
875 //actor immediately go active (called here a 'hot actor'), allowing them to use the rest of the slice.
876 //This idea comes from "Improving IPC by Kernel Design" (l4ka.org/publications/1993/improving-ipc.pdf)
878 if (thisActor->actorState != ActorState::ACTIVE) {
880 if (this->hotActor != NULL) {
881 Actor *hotActor = this->hotActor;
882 while ((this->timeSliceEndTime > 0) && (this->hotActor != NULL)) {
883 hotActor = this->hotActor;
885 this->hotActor = NULL;
886 hotActor->task(hotActor);
888 //If it's still active afterward, it's an active actor, so count it
889 if (hotActor->actorState == ActorState::ACTIVE) {
890 hotActor->isResuming = true;
891 ++this->numberActiveActors;
892 //std::cout << "NAA: " << this->numberActiveActors << std::endl;
894 //If instead it's waiting for an action message, see if we have one to give it
895 else if (hotActor->actorState == ActorState::WAITING_FOR_ACTION) {
896 if (hotActor->actionMessages.size() > slicePos+1) {
898 Message message = hotActor->actionMessages[slicePos];
899 hotActor->task = message.task;
900 //BOOST_ASSERT(message.numArgs <= 4);
902 for (int i=0; i < message.numArgs; ++i) {
903 hotActor->heapStack.push_back(message.arg[i]);
905 hotActor->actorState = ActorState::ACTIVE;
907 if (this->timeSliceEndTime == 0) {
908 if (hotActor->runQueueRevId != this->runQueueRevId) {
909 hotActor->runQueueRevId = this->runQueueRevId;
910 this->runningActors.push_back(hotActor);
915 else if (hotActor->actorState == ActorState::DELETED) {
916 deletedActors.push_back(hotActor);
922 //if (thisActor->actorState == ActorState::DELETED) {
923 // deletedActors.push_back(thisActor);
933 //If we're active, keep us in the running list
934 if ((thisActor
->actorState
== ActorState::ACTIVE
) && (thisActor
->runQueueRevId
!= this->runQueueRevId
)) {
935 thisActor
->runQueueRevId
= this->runQueueRevId
;
936 this->runningActors
.push_back(thisActor
);
940 ++pos
; //something that was active now isn't, so let's just skip over it.
945 int VM_Main(int argc
, char *argv
[], void(*task
)(Actor
*), bool passCmdLine
) {
947 int NUM_SCHED_THREADS
= boost::thread::hardware_concurrency();
949 Thread
*thread
[NUM_SCHED_THREADS
];
951 //FIXME!!!!!! This assumes only a certain number of actors will every be created by any one scheduler, and this might be a bad assumption.
952 //Also, actor ids are not currently being recycled, but likely should be.
953 for (int i
= 0; i
< NUM_SCHED_THREADS
; ++i
) {
954 thread
[i
] = new Thread(i
+1, i
* 0x1000000);
957 //Make the kernel thread the first scheduler thread
958 //It will do all normal scheduler duties and include kernel duties into them
959 thread
[0]->threadType
= ThreadType::KERNEL
;
960 thread
[0]->mailListIncoming
= new std::vector
<MailChannel
*>();
961 thread
[0]->mailAddresses
= new __gnu_cxx::hash_map
<actorId_t
, threadId_t
>();
962 thread
[0]->mailChannelOutgoing
= new __gnu_cxx::hash_map
<threadId_t
, MailChannel
*>();
963 thread
[0]->scheduleWeights
= new __gnu_cxx::hash_map
<threadId_t
, int>();
964 thread
[0]->threadPoolThreads
= new std::vector
<ThreadPoolThread
>();
966 //connect up channels
967 for (int i
= 0; i
< NUM_SCHED_THREADS
; ++i
) {
968 thread
[0]->mailListIncoming
->push_back(thread
[i
]->outgoingChannel
);
969 (*(thread
[0])->mailChannelOutgoing
)[thread
[i
]->threadId
] = thread
[i
]->incomingChannel
;
972 std::vector
<std::string
> *commandLineArgs
= new std::vector
<std::string
>();
973 //Grab commandline args
974 for (int i
= 1; i
< argc
; ++i
) {
975 commandLineArgs
->push_back(argv
[i
]);
978 //Schedule the main actor (like the main function in the program)
979 Actor
*mainActor
= new Actor();
980 thread
[0]->ScheduleNewActor(mainActor
);
983 //And send it the first message
985 message
.recipient
= 0;
987 message
.messageType
= MessageType::ACTION_MESSAGE
;
989 message
.arg
[0].VoidPtr
= commandLineArgs
;
990 thread
[0]->SendMessage(message
);
994 message
.recipient
= 0;
996 message
.messageType
= MessageType::ACTION_MESSAGE
;
998 thread
[0]->SendMessage(message
);
1001 boost::thread
*bThread
[NUM_SCHED_THREADS
];
1002 for (int i
= 1; i
< NUM_SCHED_THREADS
; ++i
) {
1003 bThread
[i
] = new boost::thread(boost::bind(&Thread::SchedulerLoop
, thread
[i
]));
1006 thread
[0]->SchedulerLoop();
1008 //join the threads here.
1009 for (int i
= 1; i
< NUM_SCHED_THREADS
; ++i
) {