Automatic date update in version.in
[binutils-gdb.git] / gprofng / src / ipcio.cc
blob29d699dea545ae72248edd74a8d0d875161020fd
1 /* Copyright (C) 2021-2024 Free Software Foundation, Inc.
2 Contributed by Oracle.
4 This file is part of GNU Binutils.
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3, or (at your option)
9 any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, write to the Free Software
18 Foundation, 51 Franklin Street - Fifth Floor, Boston,
19 MA 02110-1301, USA. */
21 #include "config.h"
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <signal.h>
25 #include <unistd.h>
26 #include <iostream>
27 #include <iomanip>
28 #include <sstream>
29 #include <queue>
30 #include "vec.h"
31 #include "util.h"
32 #include "ipcio.h"
33 #include "DbeThread.h"
34 #include "Experiment.h"
36 #define ipc_trace if (ipc_flags) ipc_default_log
37 #define ipc_request_trace if (ipc_flags) ipc_request_log
38 #define ipc_response_trace if (ipc_flags) ipc_response_log
40 using namespace std;
42 // IPC implementation
43 static const int L_PROGRESS = 0;
44 static const int L_INTEGER = 1;
45 static const int L_BOOLEAN = 2;
46 static const int L_LONG = 3;
47 static const int L_STRING = 4;
48 static const int L_DOUBLE = 5;
49 static const int L_ARRAY = 6;
50 static const int L_OBJECT = 7;
51 static const int L_CHAR = 8;
53 int currentRequestID;
54 int currentChannelID;
55 IPCresponse *IPCresponseGlobal;
57 BufferPool *responseBufferPool;
59 IPCrequest::IPCrequest (int sz, int reqID, int chID)
61 size = sz;
62 requestID = reqID;
63 channelID = chID;
64 status = INITIALIZED;
65 idx = 0;
66 buf = (char *) malloc (size);
67 cancelImmediate = false;
70 IPCrequest::~IPCrequest ()
72 free (buf);
75 void
76 IPCrequest::read (void)
78 for (int i = 0; i < size; i++)
80 int c = getc (stdin);
81 ipc_request_trace (TRACE_LVL_4, " IPCrequest:getc(stdin): %02x\n", c);
82 buf[i] = c;
86 IPCrequestStatus
87 IPCrequest::getStatus (void)
89 return status;
92 void
93 IPCrequest::setStatus (IPCrequestStatus newStatus)
95 status = newStatus;
98 static int
99 readByte (IPCrequest* req)
101 int c;
102 int val = 0;
103 for (int i = 0; i < 2; i++)
105 if (req == NULL)
107 c = getc (stdin);
108 ipc_request_trace (TRACE_LVL_4, " readByte:getc(stdin): %02x\n", c);
110 else
111 c = req->rgetc ();
112 switch (c)
114 case '0': case '1': case '2': case '3':
115 case '4': case '5': case '6': case '7':
116 case '8': case '9':
117 val = val * 16 + c - '0';
118 break;
119 case 'a': case 'b': case 'c': case 'd': case 'e': case 'f':
120 val = val * 16 + c - 'a' + 10;
121 break;
122 case EOF:
123 val = EOF;
124 break;
125 default:
126 fprintf (stderr, "readByte: Unknown byte: %d\n", c);
127 break;
130 return val;
133 static int
134 readIVal (IPCrequest *req)
136 int val = readByte (req);
137 for (int i = 0; i < 3; i++)
138 val = val * 256 + readByte (req);
139 ipc_trace (" readIVal: %d\n", val);
140 return val;
143 static String
144 readSVal (IPCrequest *req)
146 int len = readIVal (req);
147 if (len == -1)
149 ipc_trace (" readSVal: <NULL>\n");
150 return NULL;
152 char *str = (char *) malloc (len + 1);
153 char *s = str;
154 *s = (char) 0;
155 while (len--)
156 *s++ = req->rgetc ();
157 *s = (char) 0;
158 ipc_trace (" readSVal: '%s'\n", str);
159 return str;
162 static long long
163 readLVal (IPCrequest *req)
165 long long val = readByte (req);
166 for (int i = 0; i < 7; i++)
167 val = val * 256 + readByte (req);
168 ipc_trace (" readLVal: %lld\n", val);
169 return val;
172 static bool
173 readBVal (IPCrequest *req)
175 int val = readByte (req);
176 ipc_trace (" readBVal: %s\n", val == 0 ? "true" : "false");
177 return val != 0;
180 static char
181 readCVal (IPCrequest *req)
183 int val = readByte (req);
184 ipc_trace (" readCVal: %d\n", val);
185 return (char) val;
188 static double
189 readDVal (IPCrequest *req)
191 String s = readSVal (req);
192 double d = atof (s);
193 free (s);
194 return d;
197 static Object
198 readAVal (IPCrequest *req)
200 bool twoD = false;
201 int type = readByte (req);
202 if (type == L_ARRAY)
204 twoD = true;
205 type = readByte (req);
207 ipc_trace ("readAVal: twoD=%s type=%d\n", twoD ? "true" : "false", type);
209 int len = readIVal (req);
210 if (len == -1)
211 return NULL;
212 switch (type)
214 case L_INTEGER:
215 if (twoD)
217 Vector<Vector<int>*> *array = new Vector<Vector<int>*>(len);
218 for (int i = 0; i < len; i++)
219 array->store (i, (Vector<int>*)readAVal (req));
220 return array;
222 else
224 Vector<int> *array = new Vector<int>(len);
225 for (int i = 0; i < len; i++)
226 array->store (i, readIVal (req));
227 return array;
229 //break;
230 case L_LONG:
231 if (twoD)
233 Vector<Vector<long long>*> *array = new Vector<Vector<long long>*>(len);
234 for (int i = 0; i < len; i++)
235 array->store (i, (Vector<long long>*)readAVal (req));
236 return array;
238 else
240 Vector<long long> *array = new Vector<long long>(len);
241 for (int i = 0; i < len; i++)
242 array->store (i, readLVal (req));
243 return array;
245 //break;
246 case L_DOUBLE:
247 if (twoD)
249 Vector<Vector<double>*> *array = new Vector<Vector<double>*>(len);
250 for (int i = 0; i < len; i++)
251 array->store (i, (Vector<double>*)readAVal (req));
252 return array;
254 else
256 Vector<double> *array = new Vector<double>(len);
257 for (int i = 0; i < len; i++)
258 array->store (i, readDVal (req));
259 return array;
261 //break;
262 case L_BOOLEAN:
263 if (twoD)
265 Vector < Vector<bool>*> *array = new Vector < Vector<bool>*>(len);
266 for (int i = 0; i < len; i++)
267 array->store (i, (Vector<bool>*)readAVal (req));
268 return array;
270 else
272 Vector<bool> *array = new Vector<bool>(len);
273 for (int i = 0; i < len; i++)
274 array->store (i, readBVal (req));
275 return array;
277 //break;
278 case L_CHAR:
279 if (twoD)
281 Vector<Vector<char>*> *array = new Vector<Vector<char>*>(len);
282 for (int i = 0; i < len; i++)
283 array->store (i, (Vector<char>*)readAVal (req));
284 return array;
286 else
288 Vector<char> *array = new Vector<char>(len);
289 for (int i = 0; i < len; i++)
290 array->store (i, readCVal (req));
291 return array;
293 //break;
294 case L_STRING:
295 if (twoD)
297 Vector<Vector<String>*> *array = new Vector<Vector<String>*>(len);
298 for (int i = 0; i < len; i++)
299 array->store (i, (Vector<String>*)readAVal (req));
300 return array;
302 else
304 Vector<String> *array = new Vector<String>(len);
305 for (int i = 0; i < len; i++)
306 array->store (i, readSVal (req));
307 return array;
309 //break;
310 case L_OBJECT:
311 if (twoD)
313 Vector<Vector<Object>*> *array = new Vector<Vector<Object>*>(len);
314 for (int i = 0; i < len; i++)
315 array->store (i, (Vector<Object>*)readAVal (req));
316 return array;
318 else
320 Vector<Object> *array = new Vector<Object>(len);
321 for (int i = 0; i < len; i++)
322 array->store (i, readAVal (req));
323 return array;
325 //break;
326 default:
327 fprintf (stderr, "readAVal: Unknown code: %d\n", type);
328 break;
330 return NULL;
333 static int iVal;
334 static bool bVal;
335 static long long lVal;
336 static String sVal;
337 static double dVal;
338 static Object aVal;
340 static void
341 readResult (int type, IPCrequest *req)
343 int tVal = readByte (req);
344 switch (tVal)
346 case L_INTEGER:
347 iVal = readIVal (req);
348 break;
349 case L_LONG:
350 lVal = readLVal (req);
351 break;
352 case L_BOOLEAN:
353 bVal = readBVal (req);
354 break;
355 case L_DOUBLE:
356 dVal = readDVal (req);
357 break;
358 case L_STRING:
359 sVal = readSVal (req);
360 break;
361 case L_ARRAY:
362 aVal = readAVal (req);
363 break;
364 case EOF:
365 fprintf (stderr, "EOF read in readResult\n");
366 sVal = NULL;
367 return;
368 default:
369 fprintf (stderr, "Unknown code: %d\n", tVal);
370 abort ();
372 if (type != tVal)
374 fprintf (stderr, "Internal error: readResult: parameter mismatch: type=%d should be %d\n", tVal, type);
375 abort ();
380 readInt (IPCrequest *req)
382 readResult (L_INTEGER, req);
383 return iVal;
386 String
387 readString (IPCrequest *req)
389 readResult (L_STRING, req);
390 return sVal;
393 long long
394 readLong (IPCrequest *req)
396 readResult (L_LONG, req);
397 return lVal;
400 double
401 readDouble (IPCrequest *req)
403 readResult (L_DOUBLE, req);
404 return dVal;
407 bool
408 readBoolean (IPCrequest *req)
410 readResult (L_BOOLEAN, req);
411 return bVal;
414 DbeObj
415 readObject (IPCrequest *req)
417 readResult (L_LONG, req);
418 return (DbeObj) lVal;
421 Object
422 readArray (IPCrequest *req)
424 readResult (L_ARRAY, req);
425 return aVal;
428 // Write
429 IPCresponse::IPCresponse (int sz)
431 requestID = -1;
432 channelID = -1;
433 responseType = -1;
434 responseStatus = RESPONSE_STATUS_SUCCESS;
435 sb = new StringBuilder (sz);
436 next = NULL;
439 IPCresponse::~IPCresponse ()
441 delete sb;
444 void
445 IPCresponse::reset ()
447 requestID = -1;
448 channelID = -1;
449 responseType = -1;
450 responseStatus = RESPONSE_STATUS_SUCCESS;
451 sb->setLength (0);
454 void
455 IPCresponse::sendByte (int b)
457 ipc_trace ("sendByte: %02x %d\n", b, b);
458 sb->appendf ("%02x", b);
461 void
462 IPCresponse::sendIVal (int i)
464 ipc_trace ("sendIVal: %08x %d\n", i, i);
465 sb->appendf ("%08x", i);
468 void
469 IPCresponse::sendLVal (long long l)
471 ipc_trace ("sendLVal: %016llx %lld\n", l, l);
472 sb->appendf ("%016llx", l);
475 void
476 IPCresponse::sendSVal (const char *s)
478 if (s == NULL)
480 sendIVal (-1);
481 return;
483 sendIVal ((int) strlen (s));
484 ipc_trace ("sendSVal: %s\n", s);
485 sb->appendf ("%s", s);
488 void
489 IPCresponse::sendBVal (bool b)
491 sendByte (b ? 1 : 0);
494 void
495 IPCresponse::sendCVal (char c)
497 sendByte (c);
500 void
501 IPCresponse::sendDVal (double d)
503 char str[32];
504 snprintf (str, sizeof (str), "%.12f", d);
505 sendSVal (str);
508 void
509 IPCresponse::sendAVal (void *ptr)
511 if (ptr == NULL)
513 sendByte (L_INTEGER);
514 sendIVal (-1);
515 return;
518 VecType type = ((Vector<void*>*)ptr)->type ();
519 switch (type)
521 case VEC_INTEGER:
523 sendByte (L_INTEGER);
524 Vector<int> *array = (Vector<int>*)ptr;
525 sendIVal (array->size ());
526 for (int i = 0; i < array->size (); i++)
527 sendIVal (array->fetch (i));
528 break;
530 case VEC_BOOL:
532 sendByte (L_BOOLEAN);
533 Vector<bool> *array = (Vector<bool>*)ptr;
534 sendIVal (array->size ());
535 for (int i = 0; i < array->size (); i++)
536 sendBVal (array->fetch (i));
537 break;
539 case VEC_CHAR:
541 sendByte (L_CHAR);
542 Vector<char> *array = (Vector<char>*)ptr;
543 sendIVal (array->size ());
544 for (int i = 0; i < array->size (); i++)
545 sendCVal (array->fetch (i));
546 break;
548 case VEC_LLONG:
550 sendByte (L_LONG);
551 Vector<long long> *array = (Vector<long long>*)ptr;
552 sendIVal (array->size ());
553 for (int i = 0; i < array->size (); i++)
554 sendLVal (array->fetch (i));
555 break;
557 case VEC_DOUBLE:
559 sendByte (L_DOUBLE);
560 Vector<double> *array = (Vector<double>*)ptr;
561 sendIVal (array->size ());
562 for (int i = 0; i < array->size (); i++)
563 sendDVal (array->fetch (i));
564 break;
566 case VEC_STRING:
568 sendByte (L_STRING);
569 Vector<String> *array = (Vector<String>*)ptr;
570 sendIVal (array->size ());
571 for (int i = 0; i < array->size (); i++)
572 sendSVal (array->fetch (i));
573 break;
575 case VEC_STRINGARR:
577 sendByte (L_ARRAY);
578 sendByte (L_STRING);
579 Vector<void*> *array = (Vector<void*>*)ptr;
580 sendIVal (array->size ());
581 for (int i = 0; i < array->size (); i++)
582 sendAVal (array->fetch (i));
583 break;
585 case VEC_INTARR:
587 sendByte (L_ARRAY);
588 sendByte (L_INTEGER);
589 Vector<void*> *array = (Vector<void*>*)ptr;
590 sendIVal (array->size ());
591 for (int i = 0; i < array->size (); i++)
592 sendAVal (array->fetch (i));
593 break;
595 case VEC_LLONGARR:
597 sendByte (L_ARRAY);
598 sendByte (L_LONG);
599 Vector<void*> *array = (Vector<void*>*)ptr;
600 sendIVal (array->size ());
601 for (int i = 0; i < array->size (); i++)
602 sendAVal (array->fetch (i));
603 break;
605 case VEC_VOIDARR:
607 sendByte (L_OBJECT);
608 Vector<void*> *array = (Vector<void*>*)ptr;
609 sendIVal (array->size ());
610 for (int i = 0; i < array->size (); i++)
611 sendAVal (array->fetch (i));
612 break;
614 default:
615 fprintf (stderr, "sendAVal: Unknown type: %d\n", type);
616 abort ();
620 bool
621 cancelNeeded (int chID)
623 if (chID == cancellableChannelID && chID == cancelRequestedChannelID)
624 return true;
625 else
626 return false;
629 static void
630 writeResponseWithHeader (int requestID, int channelID, int responseType,
631 int responseStatus, IPCresponse* os)
633 if (cancelNeeded (channelID))
635 responseStatus = RESPONSE_STATUS_CANCELLED;
636 ipc_trace ("CANCELLING %d %d\n", requestID, channelID);
637 // This is for gracefully cancelling regular ops like openExperiment - getFiles should never reach here
639 os->setRequestID (requestID);
640 os->setChannelID (channelID);
641 os->setResponseType (responseType);
642 os->setResponseStatus (responseStatus);
643 os->print ();
644 os->reset ();
645 responseBufferPool->recycle (os);
648 void
649 writeAck (int requestID, int channelID)
651 #if DEBUG
652 char *s = getenv (NTXT ("SP_NO_IPC_ACK"));
653 #else /* ^DEBUG */
654 char *s = NULL;
655 #endif /* ^DEBUG */
656 if (s)
658 int i = requestID;
659 int j = channelID;
660 ipc_request_trace (TRACE_LVL_4, "ACK skipped: requestID=%d channelID=%d\n", i, j);
662 else
664 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_SMALL);
665 writeResponseWithHeader (requestID, channelID, RESPONSE_TYPE_ACK,
666 RESPONSE_STATUS_SUCCESS, OUTS);
670 void
671 writeHandshake (int requestID, int channelID)
673 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_SMALL);
674 writeResponseWithHeader (requestID, channelID, RESPONSE_TYPE_HANDSHAKE, RESPONSE_STATUS_SUCCESS, OUTS);
677 void
678 writeResponseGeneric (int responseStatus, int requestID, int channelID)
680 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_SMALL);
681 writeResponseWithHeader (requestID, channelID, RESPONSE_TYPE_COMPLETE, responseStatus, OUTS);
684 BufferPool::BufferPool ()
686 pthread_mutex_init (&p_mutex, NULL);
687 smallBuf = NULL;
688 largeBuf = NULL;
691 BufferPool::~BufferPool ()
693 for (IPCresponse *p = smallBuf; p;)
695 IPCresponse *tmp = p;
696 p = tmp->next;
697 delete tmp;
699 for (IPCresponse *p = largeBuf; p;)
701 IPCresponse *tmp = p;
702 p = tmp->next;
703 delete tmp;
707 IPCresponse*
708 BufferPool::getNewResponse (int size)
710 pthread_mutex_lock (&p_mutex);
711 if (ipc_single_threaded_mode && size < BUFFER_SIZE_LARGE)
712 size = BUFFER_SIZE_LARGE;
713 IPCresponse *newResponse = NULL;
714 if (size >= BUFFER_SIZE_LARGE)
716 if (largeBuf)
718 newResponse = largeBuf;
719 largeBuf = largeBuf->next;
722 else if (smallBuf)
724 newResponse = smallBuf;
725 smallBuf = smallBuf->next;
727 if (newResponse)
728 newResponse->reset ();
729 else
731 newResponse = new IPCresponse (size);
732 ipc_trace ("GETNEWBUFFER %d\n", size);
734 pthread_mutex_unlock (&p_mutex);
735 return newResponse;
738 void
739 BufferPool::recycle (IPCresponse *respB)
741 pthread_mutex_lock (&p_mutex);
742 if (respB->getCurBufSize () >= BUFFER_SIZE_LARGE)
744 respB->next = largeBuf;
745 largeBuf = respB;
747 else
749 respB->next = smallBuf;
750 smallBuf = respB;
752 pthread_mutex_unlock (&p_mutex);
755 void
756 writeArray (void *ptr, IPCrequest* req)
758 if (req->getStatus () == CANCELLED_IMMEDIATE)
759 return;
760 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_LARGE);
761 OUTS->sendByte (L_ARRAY);
762 OUTS->sendAVal (ptr);
763 writeResponseWithHeader (req->getRequestID (), req->getChannelID (),
764 RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
767 void
768 writeString (const char *s, IPCrequest* req)
770 if (req->getStatus () == CANCELLED_IMMEDIATE)
771 return;
772 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_LARGE);
773 OUTS->sendByte (L_STRING);
774 OUTS->sendSVal (s);
775 writeResponseWithHeader (req->getRequestID (), req->getChannelID (),
776 RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
779 void
780 writeObject (DbeObj obj, IPCrequest* req)
782 writeLong ((long long) obj, req);
785 void
786 writeBoolean (bool b, IPCrequest* req)
788 if (req->getStatus () == CANCELLED_IMMEDIATE)
789 return;
790 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM);
791 OUTS->sendByte (L_BOOLEAN);
792 OUTS->sendBVal (b);
793 writeResponseWithHeader (req->getRequestID (), req->getChannelID (),
794 RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
797 void
798 writeInt (int i, IPCrequest* req)
800 if (req->getStatus () == CANCELLED_IMMEDIATE)
801 return;
802 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM);
803 OUTS->sendByte (L_INTEGER);
804 OUTS->sendIVal (i);
805 writeResponseWithHeader (req->getRequestID (), req->getChannelID (), RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
808 void
809 writeChar (char c, IPCrequest* req)
811 if (req->getStatus () == CANCELLED_IMMEDIATE)
812 return;
813 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM);
814 OUTS->sendByte (L_CHAR);
815 OUTS->sendCVal (c);
816 writeResponseWithHeader (req->getRequestID (), req->getChannelID (), RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
819 void
820 writeLong (long long l, IPCrequest* req)
822 if (req->getStatus () == CANCELLED_IMMEDIATE)
823 return;
824 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM);
825 OUTS->sendByte (L_LONG);
826 OUTS->sendLVal (l);
827 writeResponseWithHeader (req->getRequestID (), req->getChannelID (), RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
830 void
831 writeDouble (double d, IPCrequest* req)
833 if (req->getStatus () == CANCELLED_IMMEDIATE) return;
834 IPCresponse *OUTS = responseBufferPool->getNewResponse (BUFFER_SIZE_MEDIUM);
835 OUTS->sendByte (L_DOUBLE);
836 OUTS->sendDVal (d);
837 writeResponseWithHeader (req->getRequestID (), req->getChannelID (), RESPONSE_TYPE_COMPLETE, RESPONSE_STATUS_SUCCESS, OUTS);
841 setProgress (int percentage, const char *proc_str)
843 if (cancelNeeded (currentChannelID))
845 // ExperimentLoadCancelException *e1 = new ExperimentLoadCancelException();
846 // throw (e1);
847 return 1;
849 if (NULL == proc_str)
850 return 1;
851 int size = strlen (proc_str) + 100; // 100 bytes for additional data
852 int bs = BUFFER_SIZE_MEDIUM;
853 if (size > BUFFER_SIZE_MEDIUM)
855 if (size > BUFFER_SIZE_LARGE) return 1; // This should never happen
856 bs = BUFFER_SIZE_LARGE;
858 IPCresponse *OUTS = responseBufferPool->getNewResponse (bs);
859 OUTS->sendByte (L_PROGRESS);
860 OUTS->sendIVal (percentage);
861 OUTS->sendSVal (proc_str);
862 writeResponseWithHeader (currentRequestID, currentChannelID, RESPONSE_TYPE_PROGRESS, RESPONSE_STATUS_SUCCESS, OUTS);
863 return 0;
866 static pthread_mutex_t responce_lock = PTHREAD_MUTEX_INITIALIZER;
868 void
869 IPCresponse::print (void)
871 char buf[23];
872 int sz = responseType == RESPONSE_TYPE_HANDSHAKE ?
873 IPC_VERSION_NUMBER : sb->length ();
874 snprintf (buf, sizeof (buf), "%02x%08x%02x%02x%08x", HEADER_MARKER,
875 requestID, responseType, responseStatus, sz);
876 pthread_mutex_lock (&responce_lock);
877 ipc_response_trace (TRACE_LVL_1,
878 "IPCresponse: ID=%08x type=%02x status=%02x sz:%6d\n",
879 requestID, responseType, responseStatus, sz);
880 write (1, buf, 22);
881 sb->write (1);
882 pthread_mutex_unlock (&responce_lock);
885 void
886 setCancelRequestedCh (int chID)
888 cancelRequestedChannelID = chID;
891 void
892 readRequestHeader ()
894 int marker = readByte (NULL);
895 if (marker != HEADER_MARKER)
897 fprintf (stderr, "Internal error: received request (%d) without header marker\n", marker);
898 error_flag = 1;
899 return;
901 else
902 ipc_request_trace (TRACE_LVL_1, "RequestHeaderBegin------------------------\n");
903 int requestID = readIVal (NULL);
904 int requestType = readByte (NULL);
905 int channelID = readIVal (NULL);
906 int nBytes = readIVal (NULL);
907 if (requestType == REQUEST_TYPE_HANDSHAKE)
909 // write the ack directly to the wire, not through the response queue
910 writeAck (requestID, channelID);
911 writeHandshake (requestID, channelID);
912 ipc_request_trace (TRACE_LVL_1, "RQ: HANDSHAKE --- %x ----- %x ---- %x --- %x -RequestHeaderEnd\n", requestID, requestType, channelID, nBytes);
914 else if (requestType == REQUEST_TYPE_CANCEL)
916 writeAck (requestID, channelID);
917 ipc_request_trace (TRACE_LVL_1, "RQ: CANCEL --- RQ: %x ----- %x --- CH: %x --- %x -RequestHeaderEnd\n", requestID, requestType, channelID, nBytes);
918 if (channelID == cancellableChannelID)
920 // we have worked on at least one request belonging to this channel
921 writeResponseGeneric (RESPONSE_STATUS_SUCCESS, requestID, channelID);
922 setCancelRequestedCh (channelID);
923 ipc_trace ("CANCELLABLE %x %x\n", channelID, currentChannelID);
924 if (channelID == currentChannelID)
925 // request for this channel is currently in progress
926 ipc_request_trace (TRACE_LVL_1, "IN PROGRESS REQUEST NEEDS CANCELLATION");
927 // ssp_post_cond(waitingToFinish);
929 else
931 // FIXME:
932 // it is possible that a request for this channel is on the requestQ
933 // or has been submitted to the work group queue but is waiting for a thread to pick it up
934 writeResponseGeneric (RESPONSE_STATUS_FAILURE, requestID, channelID);
935 setCancelRequestedCh (channelID);
936 ipc_request_trace (TRACE_LVL_1, "RETURNING FAILURE TO CANCEL REQUEST channel %d\n", channelID);
939 else
941 writeAck (requestID, channelID);
942 ipc_request_trace (TRACE_LVL_1, "RQ: --- %x ----- %x ---- %x --- %x -RequestHeaderEnd\n", requestID, requestType, channelID, nBytes);
943 IPCrequest *nreq = new IPCrequest (nBytes, requestID, channelID);
944 nreq->read ();
945 ipc_request_trace (TRACE_LVL_1, "RQ: --- %x Read from stream \n", requestID);
946 if (cancelNeeded (channelID))
948 ipc_request_trace (TRACE_LVL_1, "CANCELLABLE REQ RECVD %x %x\n", channelID, requestID);
949 writeResponseGeneric (RESPONSE_STATUS_CANCELLED, requestID, channelID);
950 delete nreq;
951 return;
953 DbeQueue *q = new DbeQueue (ipc_doWork, nreq);
954 ipcThreadPool->put_queue (q);