1 /* Copyright (C) 2021-2024 Free Software Foundation, Inc.
4 This file is part of GNU Binutils.
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3, or (at your option)
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, write to the Free Software
18 Foundation, 51 Franklin Street - Fifth Floor, Boston,
19 MA 02110-1301, USA. */
33 #include "DbeThread.h"
34 #include "Experiment.h"
36 #define ipc_trace if (ipc_flags) ipc_default_log
37 #define ipc_request_trace if (ipc_flags) ipc_request_log
38 #define ipc_response_trace if (ipc_flags) ipc_response_log
43 static const int L_PROGRESS
= 0;
44 static const int L_INTEGER
= 1;
45 static const int L_BOOLEAN
= 2;
46 static const int L_LONG
= 3;
47 static const int L_STRING
= 4;
48 static const int L_DOUBLE
= 5;
49 static const int L_ARRAY
= 6;
50 static const int L_OBJECT
= 7;
51 static const int L_CHAR
= 8;
55 IPCresponse
*IPCresponseGlobal
;
57 BufferPool
*responseBufferPool
;
59 IPCrequest::IPCrequest (int sz
, int reqID
, int chID
)
66 buf
= (char *) malloc (size
);
67 cancelImmediate
= false;
70 IPCrequest::~IPCrequest ()
76 IPCrequest::read (void)
78 for (int i
= 0; i
< size
; i
++)
81 ipc_request_trace (TRACE_LVL_4
, " IPCrequest:getc(stdin): %02x\n", c
);
87 IPCrequest::getStatus (void)
93 IPCrequest::setStatus (IPCrequestStatus newStatus
)
99 readByte (IPCrequest
* req
)
103 for (int i
= 0; i
< 2; i
++)
108 ipc_request_trace (TRACE_LVL_4
, " readByte:getc(stdin): %02x\n", c
);
114 case '0': case '1': case '2': case '3':
115 case '4': case '5': case '6': case '7':
117 val
= val
* 16 + c
- '0';
119 case 'a': case 'b': case 'c': case 'd': case 'e': case 'f':
120 val
= val
* 16 + c
- 'a' + 10;
126 fprintf (stderr
, "readByte: Unknown byte: %d\n", c
);
134 readIVal (IPCrequest
*req
)
136 int val
= readByte (req
);
137 for (int i
= 0; i
< 3; i
++)
138 val
= val
* 256 + readByte (req
);
139 ipc_trace (" readIVal: %d\n", val
);
144 readSVal (IPCrequest
*req
)
146 int len
= readIVal (req
);
149 ipc_trace (" readSVal: <NULL>\n");
152 char *str
= (char *) malloc (len
+ 1);
156 *s
++ = req
->rgetc ();
158 ipc_trace (" readSVal: '%s'\n", str
);
163 readLVal (IPCrequest
*req
)
165 long long val
= readByte (req
);
166 for (int i
= 0; i
< 7; i
++)
167 val
= val
* 256 + readByte (req
);
168 ipc_trace (" readLVal: %lld\n", val
);
173 readBVal (IPCrequest
*req
)
175 int val
= readByte (req
);
176 ipc_trace (" readBVal: %s\n", val
== 0 ? "true" : "false");
181 readCVal (IPCrequest
*req
)
183 int val
= readByte (req
);
184 ipc_trace (" readCVal: %d\n", val
);
189 readDVal (IPCrequest
*req
)
191 String s
= readSVal (req
);
198 readAVal (IPCrequest
*req
)
201 int type
= readByte (req
);
205 type
= readByte (req
);
207 ipc_trace ("readAVal: twoD=%s type=%d\n", twoD
? "true" : "false", type
);
209 int len
= readIVal (req
);
217 Vector
<Vector
<int>*> *array
= new Vector
<Vector
<int>*>(len
);
218 for (int i
= 0; i
< len
; i
++)
219 array
->store (i
, (Vector
<int>*)readAVal (req
));
224 Vector
<int> *array
= new Vector
<int>(len
);
225 for (int i
= 0; i
< len
; i
++)
226 array
->store (i
, readIVal (req
));
233 Vector
<Vector
<long long>*> *array
= new Vector
<Vector
<long long>*>(len
);
234 for (int i
= 0; i
< len
; i
++)
235 array
->store (i
, (Vector
<long long>*)readAVal (req
));
240 Vector
<long long> *array
= new Vector
<long long>(len
);
241 for (int i
= 0; i
< len
; i
++)
242 array
->store (i
, readLVal (req
));
249 Vector
<Vector
<double>*> *array
= new Vector
<Vector
<double>*>(len
);
250 for (int i
= 0; i
< len
; i
++)
251 array
->store (i
, (Vector
<double>*)readAVal (req
));
256 Vector
<double> *array
= new Vector
<double>(len
);
257 for (int i
= 0; i
< len
; i
++)
258 array
->store (i
, readDVal (req
));
265 Vector
< Vector
<bool>*> *array
= new Vector
< Vector
<bool>*>(len
);
266 for (int i
= 0; i
< len
; i
++)
267 array
->store (i
, (Vector
<bool>*)readAVal (req
));
272 Vector
<bool> *array
= new Vector
<bool>(len
);
273 for (int i
= 0; i
< len
; i
++)
274 array
->store (i
, readBVal (req
));
281 Vector
<Vector
<char>*> *array
= new Vector
<Vector
<char>*>(len
);
282 for (int i
= 0; i
< len
; i
++)
283 array
->store (i
, (Vector
<char>*)readAVal (req
));
288 Vector
<char> *array
= new Vector
<char>(len
);
289 for (int i
= 0; i
< len
; i
++)
290 array
->store (i
, readCVal (req
));
297 Vector
<Vector
<String
>*> *array
= new Vector
<Vector
<String
>*>(len
);
298 for (int i
= 0; i
< len
; i
++)
299 array
->store (i
, (Vector
<String
>*)readAVal (req
));
304 Vector
<String
> *array
= new Vector
<String
>(len
);
305 for (int i
= 0; i
< len
; i
++)
306 array
->store (i
, readSVal (req
));
313 Vector
<Vector
<Object
>*> *array
= new Vector
<Vector
<Object
>*>(len
);
314 for (int i
= 0; i
< len
; i
++)
315 array
->store (i
, (Vector
<Object
>*)readAVal (req
));
320 Vector
<Object
> *array
= new Vector
<Object
>(len
);
321 for (int i
= 0; i
< len
; i
++)
322 array
->store (i
, readAVal (req
));
327 fprintf (stderr
, "readAVal: Unknown code: %d\n", type
);
335 static long long lVal
;
341 readResult (int type
, IPCrequest
*req
)
343 int tVal
= readByte (req
);
347 iVal
= readIVal (req
);
350 lVal
= readLVal (req
);
353 bVal
= readBVal (req
);
356 dVal
= readDVal (req
);
359 sVal
= readSVal (req
);
362 aVal
= readAVal (req
);
365 fprintf (stderr
, "EOF read in readResult\n");
369 fprintf (stderr
, "Unknown code: %d\n", tVal
);
374 fprintf (stderr
, "Internal error: readResult: parameter mismatch: type=%d should be %d\n", tVal
, type
);
380 readInt (IPCrequest
*req
)
382 readResult (L_INTEGER
, req
);
387 readString (IPCrequest
*req
)
389 readResult (L_STRING
, req
);
394 readLong (IPCrequest
*req
)
396 readResult (L_LONG
, req
);
401 readDouble (IPCrequest
*req
)
403 readResult (L_DOUBLE
, req
);
408 readBoolean (IPCrequest
*req
)
410 readResult (L_BOOLEAN
, req
);
415 readObject (IPCrequest
*req
)
417 readResult (L_LONG
, req
);
418 return (DbeObj
) lVal
;
422 readArray (IPCrequest
*req
)
424 readResult (L_ARRAY
, req
);
429 IPCresponse::IPCresponse (int sz
)
434 responseStatus
= RESPONSE_STATUS_SUCCESS
;
435 sb
= new StringBuilder (sz
);
439 IPCresponse::~IPCresponse ()
445 IPCresponse::reset ()
450 responseStatus
= RESPONSE_STATUS_SUCCESS
;
455 IPCresponse::sendByte (int b
)
457 ipc_trace ("sendByte: %02x %d\n", b
, b
);
458 sb
->appendf ("%02x", b
);
462 IPCresponse::sendIVal (int i
)
464 ipc_trace ("sendIVal: %08x %d\n", i
, i
);
465 sb
->appendf ("%08x", i
);
469 IPCresponse::sendLVal (long long l
)
471 ipc_trace ("sendLVal: %016llx %lld\n", l
, l
);
472 sb
->appendf ("%016llx", l
);
476 IPCresponse::sendSVal (const char *s
)
483 sendIVal ((int) strlen (s
));
484 ipc_trace ("sendSVal: %s\n", s
);
485 sb
->appendf ("%s", s
);
489 IPCresponse::sendBVal (bool b
)
491 sendByte (b
? 1 : 0);
495 IPCresponse::sendCVal (char c
)
501 IPCresponse::sendDVal (double d
)
504 snprintf (str
, sizeof (str
), "%.12f", d
);
509 IPCresponse::sendAVal (void *ptr
)
513 sendByte (L_INTEGER
);
518 VecType type
= ((Vector
<void*>*)ptr
)->type ();
523 sendByte (L_INTEGER
);
524 Vector
<int> *array
= (Vector
<int>*)ptr
;
525 sendIVal (array
->size ());
526 for (int i
= 0; i
< array
->size (); i
++)
527 sendIVal (array
->fetch (i
));
532 sendByte (L_BOOLEAN
);
533 Vector
<bool> *array
= (Vector
<bool>*)ptr
;
534 sendIVal (array
->size ());
535 for (int i
= 0; i
< array
->size (); i
++)
536 sendBVal (array
->fetch (i
));
542 Vector
<char> *array
= (Vector
<char>*)ptr
;
543 sendIVal (array
->size ());
544 for (int i
= 0; i
< array
->size (); i
++)
545 sendCVal (array
->fetch (i
));
551 Vector
<long long> *array
= (Vector
<long long>*)ptr
;
552 sendIVal (array
->size ());
553 for (int i
= 0; i
< array
->size (); i
++)
554 sendLVal (array
->fetch (i
));
560 Vector
<double> *array
= (Vector
<double>*)ptr
;
561 sendIVal (array
->size ());
562 for (int i
= 0; i
< array
->size (); i
++)
563 sendDVal (array
->fetch (i
));
569 Vector
<String
> *array
= (Vector
<String
>*)ptr
;
570 sendIVal (array
->size ());
571 for (int i
= 0; i
< array
->size (); i
++)
572 sendSVal (array
->fetch (i
));
579 Vector
<void*> *array
= (Vector
<void*>*)ptr
;
580 sendIVal (array
->size ());
581 for (int i
= 0; i
< array
->size (); i
++)
582 sendAVal (array
->fetch (i
));
588 sendByte (L_INTEGER
);
589 Vector
<void*> *array
= (Vector
<void*>*)ptr
;
590 sendIVal (array
->size ());
591 for (int i
= 0; i
< array
->size (); i
++)
592 sendAVal (array
->fetch (i
));
599 Vector
<void*> *array
= (Vector
<void*>*)ptr
;
600 sendIVal (array
->size ());
601 for (int i
= 0; i
< array
->size (); i
++)
602 sendAVal (array
->fetch (i
));
608 Vector
<void*> *array
= (Vector
<void*>*)ptr
;
609 sendIVal (array
->size ());
610 for (int i
= 0; i
< array
->size (); i
++)
611 sendAVal (array
->fetch (i
));
615 fprintf (stderr
, "sendAVal: Unknown type: %d\n", type
);
621 cancelNeeded (int chID
)
623 if (chID
== cancellableChannelID
&& chID
== cancelRequestedChannelID
)
630 writeResponseWithHeader (int requestID
, int channelID
, int responseType
,
631 int responseStatus
, IPCresponse
* os
)
633 if (cancelNeeded (channelID
))
635 responseStatus
= RESPONSE_STATUS_CANCELLED
;
636 ipc_trace ("CANCELLING %d %d\n", requestID
, channelID
);
637 // This is for gracefully cancelling regular ops like openExperiment - getFiles should never reach here
639 os
->setRequestID (requestID
);
640 os
->setChannelID (channelID
);
641 os
->setResponseType (responseType
);
642 os
->setResponseStatus (responseStatus
);
645 responseBufferPool
->recycle (os
);
649 writeAck (int requestID
, int channelID
)
652 char *s
= getenv (NTXT ("SP_NO_IPC_ACK"));
660 ipc_request_trace (TRACE_LVL_4
, "ACK skipped: requestID=%d channelID=%d\n", i
, j
);
664 IPCresponse
*OUTS
= responseBufferPool
->getNewResponse (BUFFER_SIZE_SMALL
);
665 writeResponseWithHeader (requestID
, channelID
, RESPONSE_TYPE_ACK
,
666 RESPONSE_STATUS_SUCCESS
, OUTS
);
671 writeHandshake (int requestID
, int channelID
)
673 IPCresponse
*OUTS
= responseBufferPool
->getNewResponse (BUFFER_SIZE_SMALL
);
674 writeResponseWithHeader (requestID
, channelID
, RESPONSE_TYPE_HANDSHAKE
, RESPONSE_STATUS_SUCCESS
, OUTS
);
678 writeResponseGeneric (int responseStatus
, int requestID
, int channelID
)
680 IPCresponse
*OUTS
= responseBufferPool
->getNewResponse (BUFFER_SIZE_SMALL
);
681 writeResponseWithHeader (requestID
, channelID
, RESPONSE_TYPE_COMPLETE
, responseStatus
, OUTS
);
684 BufferPool::BufferPool ()
686 pthread_mutex_init (&p_mutex
, NULL
);
691 BufferPool::~BufferPool ()
693 for (IPCresponse
*p
= smallBuf
; p
;)
695 IPCresponse
*tmp
= p
;
699 for (IPCresponse
*p
= largeBuf
; p
;)
701 IPCresponse
*tmp
= p
;
708 BufferPool::getNewResponse (int size
)
710 pthread_mutex_lock (&p_mutex
);
711 if (ipc_single_threaded_mode
&& size
< BUFFER_SIZE_LARGE
)
712 size
= BUFFER_SIZE_LARGE
;
713 IPCresponse
*newResponse
= NULL
;
714 if (size
>= BUFFER_SIZE_LARGE
)
718 newResponse
= largeBuf
;
719 largeBuf
= largeBuf
->next
;
724 newResponse
= smallBuf
;
725 smallBuf
= smallBuf
->next
;
728 newResponse
->reset ();
731 newResponse
= new IPCresponse (size
);
732 ipc_trace ("GETNEWBUFFER %d\n", size
);
734 pthread_mutex_unlock (&p_mutex
);
739 BufferPool::recycle (IPCresponse
*respB
)
741 pthread_mutex_lock (&p_mutex
);
742 if (respB
->getCurBufSize () >= BUFFER_SIZE_LARGE
)
744 respB
->next
= largeBuf
;
749 respB
->next
= smallBuf
;
752 pthread_mutex_unlock (&p_mutex
);
756 writeArray (void *ptr
, IPCrequest
* req
)
758 if (req
->getStatus () == CANCELLED_IMMEDIATE
)
760 IPCresponse
*OUTS
= responseBufferPool
->getNewResponse (BUFFER_SIZE_LARGE
);
761 OUTS
->sendByte (L_ARRAY
);
762 OUTS
->sendAVal (ptr
);
763 writeResponseWithHeader (req
->getRequestID (), req
->getChannelID (),
764 RESPONSE_TYPE_COMPLETE
, RESPONSE_STATUS_SUCCESS
, OUTS
);
768 writeString (const char *s
, IPCrequest
* req
)
770 if (req
->getStatus () == CANCELLED_IMMEDIATE
)
772 IPCresponse
*OUTS
= responseBufferPool
->getNewResponse (BUFFER_SIZE_LARGE
);
773 OUTS
->sendByte (L_STRING
);
775 writeResponseWithHeader (req
->getRequestID (), req
->getChannelID (),
776 RESPONSE_TYPE_COMPLETE
, RESPONSE_STATUS_SUCCESS
, OUTS
);
780 writeObject (DbeObj obj
, IPCrequest
* req
)
782 writeLong ((long long) obj
, req
);
786 writeBoolean (bool b
, IPCrequest
* req
)
788 if (req
->getStatus () == CANCELLED_IMMEDIATE
)
790 IPCresponse
*OUTS
= responseBufferPool
->getNewResponse (BUFFER_SIZE_MEDIUM
);
791 OUTS
->sendByte (L_BOOLEAN
);
793 writeResponseWithHeader (req
->getRequestID (), req
->getChannelID (),
794 RESPONSE_TYPE_COMPLETE
, RESPONSE_STATUS_SUCCESS
, OUTS
);
798 writeInt (int i
, IPCrequest
* req
)
800 if (req
->getStatus () == CANCELLED_IMMEDIATE
)
802 IPCresponse
*OUTS
= responseBufferPool
->getNewResponse (BUFFER_SIZE_MEDIUM
);
803 OUTS
->sendByte (L_INTEGER
);
805 writeResponseWithHeader (req
->getRequestID (), req
->getChannelID (), RESPONSE_TYPE_COMPLETE
, RESPONSE_STATUS_SUCCESS
, OUTS
);
809 writeChar (char c
, IPCrequest
* req
)
811 if (req
->getStatus () == CANCELLED_IMMEDIATE
)
813 IPCresponse
*OUTS
= responseBufferPool
->getNewResponse (BUFFER_SIZE_MEDIUM
);
814 OUTS
->sendByte (L_CHAR
);
816 writeResponseWithHeader (req
->getRequestID (), req
->getChannelID (), RESPONSE_TYPE_COMPLETE
, RESPONSE_STATUS_SUCCESS
, OUTS
);
820 writeLong (long long l
, IPCrequest
* req
)
822 if (req
->getStatus () == CANCELLED_IMMEDIATE
)
824 IPCresponse
*OUTS
= responseBufferPool
->getNewResponse (BUFFER_SIZE_MEDIUM
);
825 OUTS
->sendByte (L_LONG
);
827 writeResponseWithHeader (req
->getRequestID (), req
->getChannelID (), RESPONSE_TYPE_COMPLETE
, RESPONSE_STATUS_SUCCESS
, OUTS
);
831 writeDouble (double d
, IPCrequest
* req
)
833 if (req
->getStatus () == CANCELLED_IMMEDIATE
) return;
834 IPCresponse
*OUTS
= responseBufferPool
->getNewResponse (BUFFER_SIZE_MEDIUM
);
835 OUTS
->sendByte (L_DOUBLE
);
837 writeResponseWithHeader (req
->getRequestID (), req
->getChannelID (), RESPONSE_TYPE_COMPLETE
, RESPONSE_STATUS_SUCCESS
, OUTS
);
841 setProgress (int percentage
, const char *proc_str
)
843 if (cancelNeeded (currentChannelID
))
845 // ExperimentLoadCancelException *e1 = new ExperimentLoadCancelException();
849 if (NULL
== proc_str
)
851 int size
= strlen (proc_str
) + 100; // 100 bytes for additional data
852 int bs
= BUFFER_SIZE_MEDIUM
;
853 if (size
> BUFFER_SIZE_MEDIUM
)
855 if (size
> BUFFER_SIZE_LARGE
) return 1; // This should never happen
856 bs
= BUFFER_SIZE_LARGE
;
858 IPCresponse
*OUTS
= responseBufferPool
->getNewResponse (bs
);
859 OUTS
->sendByte (L_PROGRESS
);
860 OUTS
->sendIVal (percentage
);
861 OUTS
->sendSVal (proc_str
);
862 writeResponseWithHeader (currentRequestID
, currentChannelID
, RESPONSE_TYPE_PROGRESS
, RESPONSE_STATUS_SUCCESS
, OUTS
);
866 static pthread_mutex_t responce_lock
= PTHREAD_MUTEX_INITIALIZER
;
869 IPCresponse::print (void)
872 int sz
= responseType
== RESPONSE_TYPE_HANDSHAKE
?
873 IPC_VERSION_NUMBER
: sb
->length ();
874 snprintf (buf
, sizeof (buf
), "%02x%08x%02x%02x%08x", HEADER_MARKER
,
875 requestID
, responseType
, responseStatus
, sz
);
876 pthread_mutex_lock (&responce_lock
);
877 ipc_response_trace (TRACE_LVL_1
,
878 "IPCresponse: ID=%08x type=%02x status=%02x sz:%6d\n",
879 requestID
, responseType
, responseStatus
, sz
);
882 pthread_mutex_unlock (&responce_lock
);
886 setCancelRequestedCh (int chID
)
888 cancelRequestedChannelID
= chID
;
894 int marker
= readByte (NULL
);
895 if (marker
!= HEADER_MARKER
)
897 fprintf (stderr
, "Internal error: received request (%d) without header marker\n", marker
);
902 ipc_request_trace (TRACE_LVL_1
, "RequestHeaderBegin------------------------\n");
903 int requestID
= readIVal (NULL
);
904 int requestType
= readByte (NULL
);
905 int channelID
= readIVal (NULL
);
906 int nBytes
= readIVal (NULL
);
907 if (requestType
== REQUEST_TYPE_HANDSHAKE
)
909 // write the ack directly to the wire, not through the response queue
910 writeAck (requestID
, channelID
);
911 writeHandshake (requestID
, channelID
);
912 ipc_request_trace (TRACE_LVL_1
, "RQ: HANDSHAKE --- %x ----- %x ---- %x --- %x -RequestHeaderEnd\n", requestID
, requestType
, channelID
, nBytes
);
914 else if (requestType
== REQUEST_TYPE_CANCEL
)
916 writeAck (requestID
, channelID
);
917 ipc_request_trace (TRACE_LVL_1
, "RQ: CANCEL --- RQ: %x ----- %x --- CH: %x --- %x -RequestHeaderEnd\n", requestID
, requestType
, channelID
, nBytes
);
918 if (channelID
== cancellableChannelID
)
920 // we have worked on at least one request belonging to this channel
921 writeResponseGeneric (RESPONSE_STATUS_SUCCESS
, requestID
, channelID
);
922 setCancelRequestedCh (channelID
);
923 ipc_trace ("CANCELLABLE %x %x\n", channelID
, currentChannelID
);
924 if (channelID
== currentChannelID
)
925 // request for this channel is currently in progress
926 ipc_request_trace (TRACE_LVL_1
, "IN PROGRESS REQUEST NEEDS CANCELLATION");
927 // ssp_post_cond(waitingToFinish);
932 // it is possible that a request for this channel is on the requestQ
933 // or has been submitted to the work group queue but is waiting for a thread to pick it up
934 writeResponseGeneric (RESPONSE_STATUS_FAILURE
, requestID
, channelID
);
935 setCancelRequestedCh (channelID
);
936 ipc_request_trace (TRACE_LVL_1
, "RETURNING FAILURE TO CANCEL REQUEST channel %d\n", channelID
);
941 writeAck (requestID
, channelID
);
942 ipc_request_trace (TRACE_LVL_1
, "RQ: --- %x ----- %x ---- %x --- %x -RequestHeaderEnd\n", requestID
, requestType
, channelID
, nBytes
);
943 IPCrequest
*nreq
= new IPCrequest (nBytes
, requestID
, channelID
);
945 ipc_request_trace (TRACE_LVL_1
, "RQ: --- %x Read from stream \n", requestID
);
946 if (cancelNeeded (channelID
))
948 ipc_request_trace (TRACE_LVL_1
, "CANCELLABLE REQ RECVD %x %x\n", channelID
, requestID
);
949 writeResponseGeneric (RESPONSE_STATUS_CANCELLED
, requestID
, channelID
);
953 DbeQueue
*q
= new DbeQueue (ipc_doWork
, nreq
);
954 ipcThreadPool
->put_queue (q
);