Merge pull request #113 from tesselode/fix-multi-targets
[wdl/wdl-ol.git] / WDL / shm_msgreply.cpp
blob79c465a3da9bf0b2b422310208afaab51877ba2e
1 #include "shm_msgreply.h"
2 #include "wdlcstring.h"
4 //#define VERIFY_MESSAGES // this is not endian-aware (so it'll fail if enabled and doing ppc<->x86 etc)
5 #ifdef VERIFY_MESSAGES
6 #define WDL_SHA1 WDL_SHA1_msgreplydef
7 #include "sha.cpp"
8 #endif
10 SHM_MsgReplyConnection::SHM_MsgReplyConnection(int bufsize, int maxqueuesize, bool dir, const char *uniquestr, int timeout_sec, int extra_flags)
12 m_maxqueuesize=maxqueuesize;
13 m_has_had_error=false;
14 userData=0;
15 OnRecv=0;
16 IdleProc=0;
17 m_lastmsgid=1;
18 m_shm = 0;
19 m_spares=0;
20 m_waiting_replies=0;
22 if (uniquestr) lstrcpyn_safe(m_uniq,uniquestr,sizeof(m_uniq));
23 else
25 #ifdef _WIN32
26 WDL_INT64 pid = (WDL_INT64) GetCurrentProcessId();
27 #else
28 WDL_INT64 pid = (WDL_INT64) getpid();
29 #endif
30 WDL_INT64 thisptr = (WDL_INT64) (INT_PTR) this;
31 static int cnt=0xdeadf00d;
32 sprintf(m_uniq,"%08x%08x%08x%08x",
33 (int)(pid&0xffffffff),
34 (int)(pid>>32),
35 (int)(thisptr&0xffffffff) ^ GetTickCount(),
36 (int)(thisptr>>32)^(cnt++));
39 m_shm = new WDL_SHM_Connection(dir,m_uniq,bufsize,timeout_sec,extra_flags);
43 SHM_MsgReplyConnection::~SHM_MsgReplyConnection()
45 delete m_shm;
46 WaitingMessage *tmp=m_waiting_replies;
47 while (tmp)
49 WaitingMessage *p=tmp;
50 tmp=tmp->_next;
51 delete p;
53 tmp=m_spares;
54 while (tmp)
56 WaitingMessage *p=tmp;
57 tmp=tmp->_next;
58 delete p;
62 void SHM_MsgReplyConnection::Reply(int msgID, const void *msg, int msglen)
64 if (msgID) Send(0,msg,msglen,NULL,0,&msgID);
68 int SHM_MsgReplyConnection::Send(int type, const void *msg, int msglen,
69 void *replybuf, int maxretbuflen, const int *forceMsgID,
70 const void *secondchunk, int secondchunklen,
71 WDL_HeapBuf *hbreplyout)
73 if (!m_shm||m_has_had_error) return -1;
75 if (secondchunk && secondchunklen>0) msglen+=secondchunklen;
76 else secondchunklen=0;
78 int msgid;
80 WDL_MutexLock lock(&m_shmmutex);
81 m_shm->send_queue.AddDataToLE(&type,4,4);
83 if (forceMsgID) msgid = *forceMsgID;
84 else
86 if (!replybuf&&!hbreplyout) msgid=0;
87 else if (!(msgid = ++m_lastmsgid)) msgid = ++m_lastmsgid;
90 m_shm->send_queue.AddDataToLE(&msgid,4,4);
91 m_shm->send_queue.AddDataToLE(&msglen,4,4);
92 if (msglen>secondchunklen) m_shm->send_queue.Add(msg,msglen-secondchunklen);
93 if (secondchunklen>0) m_shm->send_queue.Add(secondchunk,secondchunklen);
95 #ifdef VERIFY_MESSAGES
96 WDL_SHA1 t;
97 t.add(&type,4);
98 t.add(&msgid,4);
99 t.add(&msglen,4);
100 if (msglen>secondchunklen) t.add(msg,msglen-secondchunklen);
101 if (secondchunklen>0) t.add(secondchunk,secondchunklen);
103 char tb[WDL_SHA1SIZE];
104 t.result(tb);
105 m_shm->send_queue.Add(tb,sizeof(tb));
106 #endif
109 if ((!replybuf && !hbreplyout) || !msgid) m_shm->Run(); // get this reply out ASAP
112 if ((hbreplyout||replybuf) && msgid)
114 int wait_cnt=30; // dont run idleproc for first Xms or so
116 while (!m_has_had_error)
118 if (wait_cnt<=0 && IdleProc && IdleProc(this))
120 m_has_had_error=true;
121 break;
124 WaitingMessage *wmsg=NULL;
125 bool r = RunInternal(msgid,&wmsg);
127 if (wmsg)
129 int rv = wmsg->m_msgdata.GetSize();
131 if (hbreplyout)
133 memcpy(hbreplyout->Resize(rv,false),wmsg->m_msgdata.Get(),rv);
136 if (replybuf)
138 if (rv > maxretbuflen) rv=maxretbuflen;
139 if (rv>0) memcpy(replybuf,wmsg->m_msgdata.Get(),rv);
142 m_shmmutex.Enter();
143 wmsg->_next = m_spares;
144 m_spares=wmsg;
145 m_shmmutex.Leave();
146 return rv;
148 else if (r) break;
151 if (wait_cnt>0) wait_cnt--;
153 HANDLE evt=m_shm->GetWaitEvent();
154 if (evt) WaitForSingleObject(evt,1);
155 else Sleep(1);
160 if (hbreplyout) hbreplyout->Resize(0,false);
162 return -1;
165 void SHM_MsgReplyConnection::Wait(HANDLE extraEvt)
167 HANDLE evt=m_shm ? m_shm->GetWaitEvent() : extraEvt;
168 if (evt && extraEvt && evt != extraEvt)
170 HANDLE hds[2] = {evt,extraEvt};
171 #ifdef _WIN32
172 WaitForMultipleObjects(2,hds,FALSE,1);
173 #else
174 WaitForAnySocketObject(2,hds,1);
175 #endif
177 else if (evt) WaitForSingleObject(evt,1);
178 else Sleep(1);
181 void SHM_MsgReplyConnection::ReturnSpares(WaitingMessage *msglist)
183 if (msglist)
185 WaitingMessage *msgtail = msglist;
186 while (msgtail && msgtail->_next) msgtail=msgtail->_next;
188 m_shmmutex.Enter();
189 msgtail->_next = m_spares;
190 m_spares=msglist;
191 m_shmmutex.Leave();
195 bool SHM_MsgReplyConnection::Run(bool runFull)
197 if (m_has_had_error) return true;
199 if (runFull) return RunInternal();
201 m_shmmutex.Enter();
202 int s=m_shm->Run();
203 if (m_shm->send_queue.Available() > m_maxqueuesize) s=-1;
204 m_shmmutex.Leave();
206 if (s<0) m_has_had_error=true;
207 else if (m_shm && m_shm->WantSendKeepAlive())
209 int zer=0;
210 Send(0,NULL,0,NULL,0,&zer);
213 return s<0;
216 bool SHM_MsgReplyConnection::RunInternal(int checkForReplyID, WaitingMessage **replyPtr)
218 if (!m_shm||m_has_had_error) return true;
220 if (replyPtr) *replyPtr=0;
222 int s=0;
226 m_shmmutex.Enter();
228 // autocompact on first time through
229 if (!s) m_shm->recv_queue.Compact();
231 s = m_shm->Run();
232 if (m_shm->send_queue.Available() > m_maxqueuesize) s=-1;
234 while (m_shm->recv_queue.GetSize()>=12)
236 int datasz = *(int *)((char *)m_shm->recv_queue.Get()+8);
237 WDL_Queue::WDL_Queue__bswap_buffer(&datasz,4); // convert to LE if needed
239 if (m_shm->recv_queue.GetSize() < 12 + datasz) break;
241 #ifdef VERIFY_MESSAGES
242 if (m_shm->recv_queue.GetSize() < 12 + datasz + WDL_SHA1SIZE) break;
243 #endif
245 int type = *(int *)((char *)m_shm->recv_queue.Get());
246 WDL_Queue::WDL_Queue__bswap_buffer(&type,4); // convert to LE if needed
248 WaitingMessage *msg = m_spares;
249 if (msg) m_spares = m_spares->_next;
250 else msg = new WaitingMessage;
252 msg->m_msgid = *(int *)((char *)m_shm->recv_queue.Get() + 4);
253 WDL_Queue::WDL_Queue__bswap_buffer(&msg->m_msgid,4); // convert to LE if needed
255 msg->m_msgtype = type;
256 memcpy(msg->m_msgdata.Resize(datasz,false),(char *)m_shm->recv_queue.Get()+12, datasz);
258 m_shm->recv_queue.Advance(12+datasz);
260 #ifdef VERIFY_MESSAGES
261 WDL_SHA1 t;
262 t.add(&type,4);
263 t.add(&msg->m_msgid,4);
264 t.add(&datasz,4);
265 t.add(msg->m_msgdata.Get(),msg->m_msgdata.GetSize());
266 char tb[WDL_SHA1SIZE];
267 t.result(tb);
268 if (memcmp(m_shm->recv_queue.Get(),tb,WDL_SHA1SIZE))
269 MessageBox(NULL,"FAIL","A",0);
270 m_shm->recv_queue.Advance(WDL_SHA1SIZE);
271 #endif
274 if (type==0)
276 if (checkForReplyID && replyPtr && !*replyPtr &&
277 checkForReplyID == msg->m_msgid)
279 *replyPtr = msg;
280 s=0;
281 break; // we're done!
283 else
285 msg->_next = m_waiting_replies;
286 m_waiting_replies = msg;
289 else
291 m_shmmutex.Leave();
293 WaitingMessage *msgtail=NULL;
295 if (OnRecv)
297 msg->_next=0;
298 msgtail = msg = OnRecv(this,msg);
299 while (msgtail && msgtail->_next) msgtail=msgtail->_next;
301 else if (msg->m_msgid) Reply(msg->m_msgid,"",0); // send an empty reply
303 m_shmmutex.Enter(); // get shm again
305 if (msg)
307 (msgtail?msgtail:msg)->_next = m_spares;
308 m_spares=msg;
311 } // while queue has stuff
313 if (checkForReplyID && replyPtr && !*replyPtr)
315 WaitingMessage *m=m_waiting_replies;
316 WaitingMessage *lp=NULL;
318 while (m)
320 if (m->m_msgid == checkForReplyID)
322 if (lp) lp->_next = m->_next;
323 else m_waiting_replies=m->_next;
325 *replyPtr = m;
326 s=0; // make sure we return ASAP
327 break;
329 lp = m;
330 m=m->_next;
334 m_shmmutex.Leave();
336 } while (s>0);
338 if (s<0) m_has_had_error=true;
339 else if (m_shm && m_shm->WantSendKeepAlive())
341 int zer=0;
342 Send(0,NULL,0,NULL,0,&zer);
344 return s<0;