1 #include "shm_msgreply.h"
2 #include "wdlcstring.h"
4 //#define VERIFY_MESSAGES // this is not endian-aware (so it'll fail if enabled and doing ppc<->x86 etc)
6 #define WDL_SHA1 WDL_SHA1_msgreplydef
10 SHM_MsgReplyConnection::SHM_MsgReplyConnection(int bufsize
, int maxqueuesize
, bool dir
, const char *uniquestr
, int timeout_sec
, int extra_flags
)
12 m_maxqueuesize
=maxqueuesize
;
13 m_has_had_error
=false;
22 if (uniquestr
) lstrcpyn_safe(m_uniq
,uniquestr
,sizeof(m_uniq
));
26 WDL_INT64 pid
= (WDL_INT64
) GetCurrentProcessId();
28 WDL_INT64 pid
= (WDL_INT64
) getpid();
30 WDL_INT64 thisptr
= (WDL_INT64
) (INT_PTR
) this;
31 static int cnt
=0xdeadf00d;
32 sprintf(m_uniq
,"%08x%08x%08x%08x",
33 (int)(pid
&0xffffffff),
35 (int)(thisptr
&0xffffffff) ^ GetTickCount(),
36 (int)(thisptr
>>32)^(cnt
++));
39 m_shm
= new WDL_SHM_Connection(dir
,m_uniq
,bufsize
,timeout_sec
,extra_flags
);
43 SHM_MsgReplyConnection::~SHM_MsgReplyConnection()
46 WaitingMessage
*tmp
=m_waiting_replies
;
49 WaitingMessage
*p
=tmp
;
56 WaitingMessage
*p
=tmp
;
62 void SHM_MsgReplyConnection::Reply(int msgID
, const void *msg
, int msglen
)
64 if (msgID
) Send(0,msg
,msglen
,NULL
,0,&msgID
);
68 int SHM_MsgReplyConnection::Send(int type
, const void *msg
, int msglen
,
69 void *replybuf
, int maxretbuflen
, const int *forceMsgID
,
70 const void *secondchunk
, int secondchunklen
,
71 WDL_HeapBuf
*hbreplyout
)
73 if (!m_shm
||m_has_had_error
) return -1;
75 if (secondchunk
&& secondchunklen
>0) msglen
+=secondchunklen
;
76 else secondchunklen
=0;
80 WDL_MutexLock
lock(&m_shmmutex
);
81 m_shm
->send_queue
.AddDataToLE(&type
,4,4);
83 if (forceMsgID
) msgid
= *forceMsgID
;
86 if (!replybuf
&&!hbreplyout
) msgid
=0;
87 else if (!(msgid
= ++m_lastmsgid
)) msgid
= ++m_lastmsgid
;
90 m_shm
->send_queue
.AddDataToLE(&msgid
,4,4);
91 m_shm
->send_queue
.AddDataToLE(&msglen
,4,4);
92 if (msglen
>secondchunklen
) m_shm
->send_queue
.Add(msg
,msglen
-secondchunklen
);
93 if (secondchunklen
>0) m_shm
->send_queue
.Add(secondchunk
,secondchunklen
);
95 #ifdef VERIFY_MESSAGES
100 if (msglen
>secondchunklen
) t
.add(msg
,msglen
-secondchunklen
);
101 if (secondchunklen
>0) t
.add(secondchunk
,secondchunklen
);
103 char tb
[WDL_SHA1SIZE
];
105 m_shm
->send_queue
.Add(tb
,sizeof(tb
));
109 if ((!replybuf
&& !hbreplyout
) || !msgid
) m_shm
->Run(); // get this reply out ASAP
112 if ((hbreplyout
||replybuf
) && msgid
)
114 int wait_cnt
=30; // dont run idleproc for first Xms or so
116 while (!m_has_had_error
)
118 if (wait_cnt
<=0 && IdleProc
&& IdleProc(this))
120 m_has_had_error
=true;
124 WaitingMessage
*wmsg
=NULL
;
125 bool r
= RunInternal(msgid
,&wmsg
);
129 int rv
= wmsg
->m_msgdata
.GetSize();
133 memcpy(hbreplyout
->Resize(rv
,false),wmsg
->m_msgdata
.Get(),rv
);
138 if (rv
> maxretbuflen
) rv
=maxretbuflen
;
139 if (rv
>0) memcpy(replybuf
,wmsg
->m_msgdata
.Get(),rv
);
143 wmsg
->_next
= m_spares
;
151 if (wait_cnt
>0) wait_cnt
--;
153 HANDLE evt
=m_shm
->GetWaitEvent();
154 if (evt
) WaitForSingleObject(evt
,1);
160 if (hbreplyout
) hbreplyout
->Resize(0,false);
165 void SHM_MsgReplyConnection::Wait(HANDLE extraEvt
)
167 HANDLE evt
=m_shm
? m_shm
->GetWaitEvent() : extraEvt
;
168 if (evt
&& extraEvt
&& evt
!= extraEvt
)
170 HANDLE hds
[2] = {evt
,extraEvt
};
172 WaitForMultipleObjects(2,hds
,FALSE
,1);
174 WaitForAnySocketObject(2,hds
,1);
177 else if (evt
) WaitForSingleObject(evt
,1);
181 void SHM_MsgReplyConnection::ReturnSpares(WaitingMessage
*msglist
)
185 WaitingMessage
*msgtail
= msglist
;
186 while (msgtail
&& msgtail
->_next
) msgtail
=msgtail
->_next
;
189 msgtail
->_next
= m_spares
;
195 bool SHM_MsgReplyConnection::Run(bool runFull
)
197 if (m_has_had_error
) return true;
199 if (runFull
) return RunInternal();
203 if (m_shm
->send_queue
.Available() > m_maxqueuesize
) s
=-1;
206 if (s
<0) m_has_had_error
=true;
207 else if (m_shm
&& m_shm
->WantSendKeepAlive())
210 Send(0,NULL
,0,NULL
,0,&zer
);
216 bool SHM_MsgReplyConnection::RunInternal(int checkForReplyID
, WaitingMessage
**replyPtr
)
218 if (!m_shm
||m_has_had_error
) return true;
220 if (replyPtr
) *replyPtr
=0;
228 // autocompact on first time through
229 if (!s
) m_shm
->recv_queue
.Compact();
232 if (m_shm
->send_queue
.Available() > m_maxqueuesize
) s
=-1;
234 while (m_shm
->recv_queue
.GetSize()>=12)
236 int datasz
= *(int *)((char *)m_shm
->recv_queue
.Get()+8);
237 WDL_Queue::WDL_Queue__bswap_buffer(&datasz
,4); // convert to LE if needed
239 if (m_shm
->recv_queue
.GetSize() < 12 + datasz
) break;
241 #ifdef VERIFY_MESSAGES
242 if (m_shm
->recv_queue
.GetSize() < 12 + datasz
+ WDL_SHA1SIZE
) break;
245 int type
= *(int *)((char *)m_shm
->recv_queue
.Get());
246 WDL_Queue::WDL_Queue__bswap_buffer(&type
,4); // convert to LE if needed
248 WaitingMessage
*msg
= m_spares
;
249 if (msg
) m_spares
= m_spares
->_next
;
250 else msg
= new WaitingMessage
;
252 msg
->m_msgid
= *(int *)((char *)m_shm
->recv_queue
.Get() + 4);
253 WDL_Queue::WDL_Queue__bswap_buffer(&msg
->m_msgid
,4); // convert to LE if needed
255 msg
->m_msgtype
= type
;
256 memcpy(msg
->m_msgdata
.Resize(datasz
,false),(char *)m_shm
->recv_queue
.Get()+12, datasz
);
258 m_shm
->recv_queue
.Advance(12+datasz
);
260 #ifdef VERIFY_MESSAGES
263 t
.add(&msg
->m_msgid
,4);
265 t
.add(msg
->m_msgdata
.Get(),msg
->m_msgdata
.GetSize());
266 char tb
[WDL_SHA1SIZE
];
268 if (memcmp(m_shm
->recv_queue
.Get(),tb
,WDL_SHA1SIZE
))
269 MessageBox(NULL
,"FAIL","A",0);
270 m_shm
->recv_queue
.Advance(WDL_SHA1SIZE
);
276 if (checkForReplyID
&& replyPtr
&& !*replyPtr
&&
277 checkForReplyID
== msg
->m_msgid
)
281 break; // we're done!
285 msg
->_next
= m_waiting_replies
;
286 m_waiting_replies
= msg
;
293 WaitingMessage
*msgtail
=NULL
;
298 msgtail
= msg
= OnRecv(this,msg
);
299 while (msgtail
&& msgtail
->_next
) msgtail
=msgtail
->_next
;
301 else if (msg
->m_msgid
) Reply(msg
->m_msgid
,"",0); // send an empty reply
303 m_shmmutex
.Enter(); // get shm again
307 (msgtail
?msgtail
:msg
)->_next
= m_spares
;
311 } // while queue has stuff
313 if (checkForReplyID
&& replyPtr
&& !*replyPtr
)
315 WaitingMessage
*m
=m_waiting_replies
;
316 WaitingMessage
*lp
=NULL
;
320 if (m
->m_msgid
== checkForReplyID
)
322 if (lp
) lp
->_next
= m
->_next
;
323 else m_waiting_replies
=m
->_next
;
326 s
=0; // make sure we return ASAP
338 if (s
<0) m_has_had_error
=true;
339 else if (m_shm
&& m_shm
->WantSendKeepAlive())
342 Send(0,NULL
,0,NULL
,0,&zer
);