1 /* Copyright (c) 2003-2006 MySQL AB
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
16 #include <ndb_global.h>
18 #include <TransporterCallback.hpp>
19 #include <TransporterRegistry.hpp>
20 #include <FastScheduler.hpp>
21 #include <Emulator.hpp>
22 #include <ErrorHandlingMacros.hpp>
24 #include "LongSignal.hpp"
26 #include <signaldata/EventReport.hpp>
27 #include <signaldata/TestOrd.hpp>
28 #include <signaldata/SignalDroppedRep.hpp>
29 #include <signaldata/DisconnectRep.hpp>
31 #include "VMSignal.hpp"
33 #include "DataBuffer.hpp"
39 SectionSegmentPool g_sectionSegmentPool
;
41 struct ConnectionError
43 enum TransporterError err
;
47 static const ConnectionError connectionError
[] =
49 { TE_NO_ERROR
, "No error"},
50 { TE_SHM_UNABLE_TO_CREATE_SEGMENT
, "Unable to create shared memory segment"},
51 { (enum TransporterError
) -1, "No connection error message available (please report a bug)"}
54 const char *lookupConnectionError(Uint32 err
)
57 while ((Uint32
)connectionError
[i
].err
!= err
&&
58 connectionError
[i
].err
!= -1)
60 return connectionError
[i
].text
;
64 import(Ptr
<SectionSegment
> & first
, const Uint32
* src
, Uint32 len
){
66 * Dummy data used when setting prev.m_nextSegment for first segment of a
72 if(g_sectionSegmentPool
.seize(first
)){
79 first
.p
->m_ownerRef
= 0;
81 Ptr
<SectionSegment
> prevPtr
= { (SectionSegment
*)&dummyPrev
[0], 0 };
82 Ptr
<SectionSegment
> currPtr
= first
;
84 while(len
> SectionSegment::DataLength
){
85 prevPtr
.p
->m_nextSegment
= currPtr
.i
;
86 memcpy(&currPtr
.p
->theData
[0], src
, 4 * SectionSegment::DataLength
);
87 src
+= SectionSegment::DataLength
;
88 len
-= SectionSegment::DataLength
;
90 if(g_sectionSegmentPool
.seize(currPtr
)){
93 first
.p
->m_lastSegment
= prevPtr
.i
;
98 first
.p
->m_lastSegment
= currPtr
.i
;
99 currPtr
.p
->m_nextSegment
= RNIL
;
100 memcpy(&currPtr
.p
->theData
[0], src
, 4 * len
);
105 linkSegments(Uint32 head
, Uint32 tail
){
107 Ptr
<SectionSegment
> headPtr
;
108 g_sectionSegmentPool
.getPtr(headPtr
, head
);
110 Ptr
<SectionSegment
> tailPtr
;
111 g_sectionSegmentPool
.getPtr(tailPtr
, tail
);
113 Ptr
<SectionSegment
> oldTailPtr
;
114 g_sectionSegmentPool
.getPtr(oldTailPtr
, headPtr
.p
->m_lastSegment
);
116 headPtr
.p
->m_lastSegment
= tailPtr
.p
->m_lastSegment
;
117 headPtr
.p
->m_sz
+= tailPtr
.p
->m_sz
;
119 oldTailPtr
.p
->m_nextSegment
= tailPtr
.i
;
123 copy(Uint32
* & insertPtr
,
124 class SectionSegmentPool
& thePool
, const SegmentedSectionPtr
& _ptr
){
126 Uint32 len
= _ptr
.sz
;
127 SectionSegment
* ptrP
= _ptr
.p
;
130 memcpy(insertPtr
, &ptrP
->theData
[0], 4 * 60);
133 ptrP
= thePool
.getPtr(ptrP
->m_nextSegment
);
135 memcpy(insertPtr
, &ptrP
->theData
[0], 4 * len
);
140 copy(Uint32
* dst
, SegmentedSectionPtr src
){
141 copy(dst
, g_sectionSegmentPool
, src
);
145 getSections(Uint32 secCount
, SegmentedSectionPtr ptr
[3]){
146 Uint32 tSec0
= ptr
[0].i
;
147 Uint32 tSec1
= ptr
[1].i
;
148 Uint32 tSec2
= ptr
[2].i
;
152 p
= g_sectionSegmentPool
.getPtr(tSec2
);
156 p
= g_sectionSegmentPool
.getPtr(tSec1
);
160 p
= g_sectionSegmentPool
.getPtr(tSec0
);
167 sprintf(msg
, "secCount=%d", secCount
);
168 ErrorReporter::handleAssert(msg
, __FILE__
, __LINE__
);
172 getSection(SegmentedSectionPtr
& ptr
, Uint32 i
){
174 SectionSegment
* p
= g_sectionSegmentPool
.getPtr(i
);
179 #define relSz(x) ((x + SectionSegment::DataLength - 1) / SectionSegment::DataLength)
182 release(SegmentedSectionPtr
& ptr
){
183 g_sectionSegmentPool
.releaseList(relSz(ptr
.sz
),
185 ptr
.p
->m_lastSegment
);
189 releaseSections(Uint32 secCount
, SegmentedSectionPtr ptr
[3]){
190 Uint32 tSec0
= ptr
[0].i
;
191 Uint32 tSz0
= ptr
[0].sz
;
192 Uint32 tSec1
= ptr
[1].i
;
193 Uint32 tSz1
= ptr
[1].sz
;
194 Uint32 tSec2
= ptr
[2].i
;
195 Uint32 tSz2
= ptr
[2].sz
;
198 g_sectionSegmentPool
.releaseList(relSz(tSz2
), tSec2
,
199 ptr
[2].p
->m_lastSegment
);
201 g_sectionSegmentPool
.releaseList(relSz(tSz1
), tSec1
,
202 ptr
[1].p
->m_lastSegment
);
204 g_sectionSegmentPool
.releaseList(relSz(tSz0
), tSec0
,
205 ptr
[0].p
->m_lastSegment
);
210 sprintf(msg
, "secCount=%d", secCount
);
211 ErrorReporter::handleAssert(msg
, __FILE__
, __LINE__
);
214 #include <DebuggerNames.hpp>
217 execute(void * callbackObj
,
218 SignalHeader
* const header
,
220 Uint32
* const theData
,
221 LinearSectionPtr ptr
[3]){
223 const Uint32 secCount
= header
->m_noOfSections
;
224 const Uint32 length
= header
->theLength
;
226 #ifdef TRACE_DISTRIBUTED
227 ndbout_c("recv: %s(%d) from (%s, %d)",
228 getSignalName(header
->theVerId_signalNumber
),
229 header
->theVerId_signalNumber
,
230 getBlockName(refToBlock(header
->theSendersBlockRef
)),
231 refToNode(header
->theSendersBlockRef
));
235 Ptr
<SectionSegment
> secPtr
[3];
238 ok
&= import(secPtr
[2], ptr
[2].p
, ptr
[2].sz
);
240 ok
&= import(secPtr
[1], ptr
[1].p
, ptr
[1].sz
);
242 ok
&= import(secPtr
[0], ptr
[0].p
, ptr
[0].sz
);
246 * Check that we haven't received a too long signal
248 ok
&= (length
+ secCount
<= 25);
255 secPtrI
[0] = secPtr
[0].i
;
256 secPtrI
[1] = secPtr
[1].i
;
257 secPtrI
[2] = secPtr
[2].i
;
259 globalScheduler
.execute(header
, prio
, theData
, secPtrI
);
266 for(Uint32 i
= 0; i
<secCount
; i
++){
267 if(secPtr
[i
].p
!= 0){
268 g_sectionSegmentPool
.releaseList(relSz(ptr
[i
].sz
), secPtr
[i
].i
,
269 secPtr
[i
].p
->m_lastSegment
);
272 Uint32 gsn
= header
->theVerId_signalNumber
;
273 Uint32 len
= header
->theLength
;
274 Uint32 newLen
= (len
> 22 ? 22 : len
);
275 SignalDroppedRep
* rep
= (SignalDroppedRep
*)theData
;
276 memmove(rep
->originalData
, theData
, (4 * newLen
));
277 rep
->originalGsn
= gsn
;
278 rep
->originalLength
= len
;
279 rep
->originalSectionCount
= secCount
;
280 header
->theVerId_signalNumber
= GSN_SIGNAL_DROPPED_REP
;
281 header
->theLength
= newLen
+ 3;
282 header
->m_noOfSections
= 0;
283 globalScheduler
.execute(header
, prio
, theData
, secPtrI
);
287 operator<<(NdbOut
& out
, const SectionSegment
& ss
){
288 out
<< "[ last= " << ss
.m_lastSegment
<< " next= " << ss
.nextPool
<< " ]";
293 print(SectionSegment
* s
, Uint32 len
, FILE* out
){
294 for(Uint32 i
= 0; i
<len
; i
++){
295 fprintf(out
, "H\'0x%.8x ", s
->theData
[i
]);
296 if(((i
+ 1) % 6) == 0)
302 print(SegmentedSectionPtr ptr
, FILE* out
){
304 ptr
.p
= g_sectionSegmentPool
.getPtr(ptr
.i
);
305 Uint32 len
= ptr
.p
->m_sz
;
307 fprintf(out
, "ptr.i = %d(%p) ptr.sz = %d(%d)\n", ptr
.i
, ptr
.p
, len
, ptr
.sz
);
308 while(len
> SectionSegment::DataLength
){
309 print(ptr
.p
, SectionSegment::DataLength
, out
);
311 len
-= SectionSegment::DataLength
;
312 fprintf(out
, "ptr.i = %d\n", ptr
.p
->m_nextSegment
);
313 ptr
.p
= g_sectionSegmentPool
.getPtr(ptr
.p
->m_nextSegment
);
316 print(ptr
.p
, len
, out
);
323 * Check to see if jobbbuffers are starting to get full
324 * and if so call doJob
326 return globalScheduler
.checkDoJob();
330 reportError(void * callbackObj
, NodeId nodeId
,
331 TransporterError errorCode
, const char *info
)
333 #ifdef DEBUG_TRANSPORTER
334 ndbout_c("reportError (%d, 0x%x) %s", nodeId
, errorCode
, info
? info
: "")
337 DBUG_ENTER("reportError");
338 DBUG_PRINT("info",("nodeId %d errorCode: 0x%x info: %s",
339 nodeId
, errorCode
, info
));
343 case TE_SIGNAL_LOST_SEND_BUFFER_FULL
:
346 snprintf(msg
, sizeof(msg
), "Remote note id %d.%s%s", nodeId
,
347 info
? " " : "", info
? info
: "");
348 ErrorReporter::handleError(NDBD_EXIT_SIGNAL_LOST_SEND_BUFFER_FULL
,
349 msg
, __FILE__
, NST_ErrorHandler
);
354 snprintf(msg
, sizeof(msg
), "Remote node id %d,%s%s", nodeId
,
355 info
? " " : "", info
? info
: "");
356 ErrorReporter::handleError(NDBD_EXIT_SIGNAL_LOST
,
357 msg
, __FILE__
, NST_ErrorHandler
);
359 case TE_SHM_IPC_PERMANENT
:
362 snprintf(msg
, sizeof(msg
),
363 "Remote node id %d.%s%s",
364 nodeId
, info
? " " : "", info
? info
: "");
365 ErrorReporter::handleError(NDBD_EXIT_CONNECTION_SETUP_FAILED
,
366 msg
, __FILE__
, NST_ErrorHandler
);
372 if(errorCode
& TE_DO_DISCONNECT
){
373 reportDisconnect(callbackObj
, nodeId
, errorCode
);
377 Signal
&signal
= *(Signal
*)&signalT
;
378 memset(&signal
.header
, 0, sizeof(signal
.header
));
381 if(errorCode
& TE_DO_DISCONNECT
)
382 signal
.theData
[0] = NDB_LE_TransporterError
;
384 signal
.theData
[0] = NDB_LE_TransporterWarning
;
386 signal
.theData
[1] = nodeId
;
387 signal
.theData
[2] = errorCode
;
389 signal
.header
.theLength
= 3;
390 signal
.header
.theSendersSignalId
= 0;
391 signal
.header
.theSendersBlockRef
= numberToRef(0, globalData
.ownId
);
392 globalScheduler
.execute(&signal
, JBA
, CMVMI
, GSN_EVENT_REP
);
398 * Report average send length in bytes (4096 last sends)
401 reportSendLen(void * callbackObj
,
402 NodeId nodeId
, Uint32 count
, Uint64 bytes
){
405 Signal
&signal
= *(Signal
*)&signalT
;
406 memset(&signal
.header
, 0, sizeof(signal
.header
));
408 signal
.header
.theLength
= 3;
409 signal
.header
.theSendersSignalId
= 0;
410 signal
.header
.theSendersBlockRef
= numberToRef(0, globalData
.ownId
);
411 signal
.theData
[0] = NDB_LE_SendBytesStatistic
;
412 signal
.theData
[1] = nodeId
;
413 signal
.theData
[2] = (bytes
/count
);
414 globalScheduler
.execute(&signal
, JBA
, CMVMI
, GSN_EVENT_REP
);
418 * Report average receive length in bytes (4096 last receives)
421 reportReceiveLen(void * callbackObj
,
422 NodeId nodeId
, Uint32 count
, Uint64 bytes
){
425 Signal
&signal
= *(Signal
*)&signalT
;
426 memset(&signal
.header
, 0, sizeof(signal
.header
));
428 signal
.header
.theLength
= 3;
429 signal
.header
.theSendersSignalId
= 0;
430 signal
.header
.theSendersBlockRef
= numberToRef(0, globalData
.ownId
);
431 signal
.theData
[0] = NDB_LE_ReceiveBytesStatistic
;
432 signal
.theData
[1] = nodeId
;
433 signal
.theData
[2] = (bytes
/count
);
434 globalScheduler
.execute(&signal
, JBA
, CMVMI
, GSN_EVENT_REP
);
438 * Report connection established
442 reportConnect(void * callbackObj
, NodeId nodeId
){
445 Signal
&signal
= *(Signal
*)&signalT
;
446 memset(&signal
.header
, 0, sizeof(signal
.header
));
448 signal
.header
.theLength
= 1;
449 signal
.header
.theSendersSignalId
= 0;
450 signal
.header
.theSendersBlockRef
= numberToRef(0, globalData
.ownId
);
451 signal
.theData
[0] = nodeId
;
453 globalScheduler
.execute(&signal
, JBA
, CMVMI
, GSN_CONNECT_REP
);
457 * Report connection broken
460 reportDisconnect(void * callbackObj
, NodeId nodeId
, Uint32 errNo
){
462 DBUG_ENTER("reportDisconnect");
464 SignalT
<sizeof(DisconnectRep
)/4> signalT
;
465 Signal
&signal
= *(Signal
*)&signalT
;
466 memset(&signal
.header
, 0, sizeof(signal
.header
));
468 signal
.header
.theLength
= DisconnectRep::SignalLength
;
469 signal
.header
.theSendersSignalId
= 0;
470 signal
.header
.theSendersBlockRef
= numberToRef(0, globalData
.ownId
);
471 signal
.header
.theTrace
= TestOrd::TraceDisconnect
;
473 DisconnectRep
* const rep
= (DisconnectRep
*)&signal
.theData
[0];
474 rep
->nodeId
= nodeId
;
477 globalScheduler
.execute(&signal
, JBA
, CMVMI
, GSN_DISCONNECT_REP
);
483 SignalLoggerManager::printSegmentedSection(FILE * output
,
484 const SignalHeader
& sh
,
485 const SegmentedSectionPtr ptr
[3],
488 fprintf(output
, "SECTION %u type=segmented", i
);
490 fprintf(output
, " *** invalid ***\n");
493 const Uint32 len
= ptr
[i
].sz
;
494 SectionSegment
* ssp
= ptr
[i
].p
;
496 fprintf(output
, " size=%u\n", (unsigned)len
);
498 if (pos
> 0 && pos
% SectionSegment::DataLength
== 0) {
499 ssp
= g_sectionSegmentPool
.getPtr(ssp
->m_nextSegment
);
501 printDataWord(output
, pos
, ssp
->theData
[pos
% SectionSegment::DataLength
]);
508 transporter_recv_from(void * callbackObj
, NodeId nodeId
){
509 globalData
.m_nodeInfo
[nodeId
].m_heartbeat_cnt
= 0;