Initial commit
[minnow.git] / src / aquarium / aquarium.hpp
blob055e18b6362cac45dd21224a586590e51f3595c6
1 #ifndef AQUARIUM_HPP
2 #define AQUARIUM_HPP
4 #include <ctime>
5 #include <deque>
6 #include <ext/hash_map>
7 #include <iostream>
8 #include <map>
9 #include <vector>
11 #include <boost/bind.hpp>
12 #include <boost/function.hpp>
13 #include <boost/thread.hpp>
15 #include <time.h>
17 #include "simplearray.h"
19 //FIXME: I'm using the union in the code with UInt32, but in 64-bit machines that probably isn't best
20 #define actorId_t uint32_t
21 #define threadId_t uint32_t
23 const int RECEIVER_CACHE_SIZE=3;
24 const unsigned int TIMESLICE_QUANTUM=2000;
26 class ThreadType { public: enum Type { THREAD, MAILMAN, KERNEL}; };
27 class ActorState { public: enum State { ACTIVE, WAITING_FOR_ACTION, WAITING_FOR_DATA, DELETED, MOVED }; };
28 class MessageType { public: enum Type { ACTION_MESSAGE, DATA_MESSAGE, ADD_ACTOR_ID, MOVE_ACTOR_ID, DELETE_ACTOR_ID, DELETE_ACTOR_IDS, CREATE_ISOLATED_ACTOR, LOAD_STATUS, PLEASE_MOVE_ACTOR, PLEASE_RECV_ACTOR, PLEASE_RECV_ACTORS }; };
30 typedef struct Thread;
31 typedef struct Actor;
33 /**
34 Hold a meta container for all basic types.
35 As a side note: All functions are re-entrant, using the actor->heapStack to store state, so they only need a pointer to that actor for all their activities.
37 union TypeUnion {
38 int8_t Int8;
39 uint8_t UInt8;
40 int16_t Int16;
41 uint16_t UInt16;
42 int32_t Int32;
43 uint32_t UInt32;
44 int64_t Int64;
45 uint64_t UInt64;
46 float Float;
47 double Double;
48 bool Bool;
49 void(*Function)(Actor *);
50 void *VoidPtr;
53 /**
54 Message container. For messages with longer number of arguments, create a vector and use VoidPtr in TypeUnion.
56 struct Message {
57 MessageType::Type messageType;
58 actorId_t recipient;
59 uint32_t dataTaskTypeId;
60 void(*task)(Actor *);
61 int numArgs;
62 TypeUnion arg[4];
65 /**
66 Channel between threads, and between threads and the kernel.
68 class MailChannel {
69 std::vector<Message> *msgChannelIncoming, *msgChannel1, *msgChannel2;
70 volatile int currentChannel;
71 boost::mutex mailLock;
72 volatile bool isEmpty;
75 void DebugMessage(const Message &message) {
76 std::cout << " Message: " << message.messageType << " rec:" << message.recipient << " taskId:";
77 std::cout << message.dataTaskTypeId << " nArgs:" << message.numArgs << " arg0:" << message.arg[0].UInt32 << " " << message.arg[1].UInt32 << " " << message.arg[2].UInt32 << std::endl;
81 public:
82 void sendMessage(const Message &message) {
83 boost::mutex::scoped_lock lock(mailLock);
84 msgChannelIncoming->push_back(message);
85 isEmpty = false;
88 std::vector<Message> *recvMessages() {
89 boost::mutex::scoped_lock lock(mailLock);
91 if (currentChannel == 1) {
92 msgChannelIncoming = msgChannel2;
93 currentChannel = 2;
94 isEmpty = msgChannelIncoming->empty();
95 return msgChannel1;
97 else {
98 msgChannelIncoming = msgChannel1;
99 currentChannel = 1;
100 isEmpty = msgChannelIncoming->empty();
101 return msgChannel2;
105 bool empty() {
106 //boost::mutex::scoped_lock lock(mailLock);
107 return isEmpty;
110 MailChannel() {
111 msgChannel1 = new std::vector<Message>();
112 msgChannel2 = new std::vector<Message>();
114 msgChannelIncoming = msgChannel1;
115 currentChannel = 1;
116 isEmpty = true;
120 struct ThreadPoolThread {
121 bool available;
122 threadId_t threadId;
123 boost::thread *thread;
126 struct ActorWrapper {
127 struct Actor* actor;
128 actorId_t containedId;
129 uint32_t actorPoolRevId;
133 The microthread task, which we call an actor.
135 struct Actor {
136 public:
137 ActorState::State actorState;
138 ActorState::State actorStateBeforeMove;
139 actorId_t actorId;
140 void(*task)(Actor *);
141 Thread *parentThread;
143 struct ActorWrapper receiverCache[RECEIVER_CACHE_SIZE];
144 int nextCacheIndex;
146 uint32_t runQueueRevId;
148 //incoming DATA_MESSAGE messages have a queue and handlers for each enumerated type. In the action, a switch statement allows for continuations
149 //based on the id of the continuation (basically which slot to jump back into when the function is restarted).
150 std::vector<Message> dataMessages;
151 __gnu_cxx::hash_map<int, int> dataHandlers;
153 //For CPS
154 bool isResuming;
156 std::vector<Message> actionMessages;
157 std::vector<TypeUnion> heapStack;
158 //SimpleArray actionMessages;
159 //SimpleArray heapStack;
161 Actor() : runQueueRevId(0), isResuming(false) {}
166 The actual OS-level thread. This will act as a scheduler for actors(microthreads).
168 class Thread {
169 private:
170 __gnu_cxx::hash_map<actorId_t, Actor*> actorIds;
171 std::vector<Message> localMail;
172 bool isRunning;
173 actorId_t nextActorId;
174 bool sendStatus;
176 int32_t numberActiveActors;
177 int32_t previousActiveActors;
179 public:
180 ThreadType::Type threadType;
181 std::vector<Actor*> runningActors;
182 threadId_t threadId;
183 MailChannel *incomingChannel;
184 MailChannel *outgoingChannel;
186 //For mailmen
187 std::vector<MailChannel*> *mailListIncoming;
188 __gnu_cxx::hash_map<actorId_t, threadId_t> *mailAddresses;
189 __gnu_cxx::hash_map<threadId_t, MailChannel*> *mailChannelOutgoing;
191 //For the kernel
192 __gnu_cxx::hash_map<threadId_t, int> *scheduleWeights; //I'm assuming int is big enough
193 std::vector<ThreadPoolThread> *threadPoolThreads;
195 //For local receiver caching, whenever our actor pool members change, we rev this so that local receiver caches can be recreated
196 uint32_t actorPoolRevId;
198 //For more efficient run queue handling, this prevents actors from rescheduling themselves in loop scenerios
199 uint32_t runQueueRevId;
201 //For scheduling tasks, we need to know how much time is remaining
202 int timeSliceEndTime;
203 Actor *hotActor;
205 Thread(threadId_t id, actorId_t nextActor) {
206 threadType = ThreadType::THREAD;
207 isRunning = true;
208 sendStatus = true;
209 nextActorId = nextActor;
211 threadId = id;
213 actorPoolRevId = 0;
214 runQueueRevId = 1; //0 denotes "never scheduled", and is the reset value
216 incomingChannel = new MailChannel();
217 outgoingChannel = new MailChannel();
219 Actor *maintenance = new Actor();
220 maintenance->actorId = 0xFFFFFFFF;
221 maintenance->actorState = ActorState::ACTIVE;
222 runningActors.push_back(maintenance);
225 void SendMessage(const Message &message);
226 //JDT REFACTOR
227 void ReceiveMessages();
228 void ScheduleExistingActor(Actor *actor);
229 void ScheduleNewActor(Actor *actor);
230 void ScheduleNewIsolatedActor(Actor *actor);
231 void RemoveRunningActor(Actor *actor);
232 void RemoveActor(Actor *actor);
233 void RemoveActors(std::vector<Actor*> *actors);
234 void RemoveActor(Actor *actor, bool sendDeleteMsg);
236 void DeleteActor(Actor *actor);
237 void SendStatus();
238 void MoveHeaviestActor(threadId_t threadId, uint32_t amount);
239 Actor* ActorIfLocal(actorId_t actorId, Actor *local);
240 void TaskRebalance();
241 void MailCheck();
242 void SchedulerLoop();
245 //Main startup procedure
246 int VM_Main(int argc, char *argv[], void(*task)(Actor *), bool passCmdLine);
248 #endif //AQUARIUM_HPP