mySQL 5.0.11 sources for tomato
[tomato.git] / release / src / router / mysql / storage / ndb / src / kernel / vm / TransporterCallback.cpp
blobff1aa3c8d2a910b82424ffbc5ea9430fcb865474
1 /* Copyright (c) 2003-2006 MySQL AB
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
16 #include <ndb_global.h>
18 #include <TransporterCallback.hpp>
19 #include <TransporterRegistry.hpp>
20 #include <FastScheduler.hpp>
21 #include <Emulator.hpp>
22 #include <ErrorHandlingMacros.hpp>
24 #include "LongSignal.hpp"
26 #include <signaldata/EventReport.hpp>
27 #include <signaldata/TestOrd.hpp>
28 #include <signaldata/SignalDroppedRep.hpp>
29 #include <signaldata/DisconnectRep.hpp>
31 #include "VMSignal.hpp"
32 #include <NdbOut.hpp>
33 #include "DataBuffer.hpp"
36 /**
37 * The instance
39 SectionSegmentPool g_sectionSegmentPool;
41 struct ConnectionError
43 enum TransporterError err;
44 const char *text;
47 static const ConnectionError connectionError[] =
49 { TE_NO_ERROR, "No error"},
50 { TE_SHM_UNABLE_TO_CREATE_SEGMENT, "Unable to create shared memory segment"},
51 { (enum TransporterError) -1, "No connection error message available (please report a bug)"}
54 const char *lookupConnectionError(Uint32 err)
56 int i= 0;
57 while ((Uint32)connectionError[i].err != err &&
58 connectionError[i].err != -1)
59 i++;
60 return connectionError[i].text;
63 bool
64 import(Ptr<SectionSegment> & first, const Uint32 * src, Uint32 len){
65 /**
66 * Dummy data used when setting prev.m_nextSegment for first segment of a
67 * section
69 Uint32 dummyPrev[4];
71 first.p = 0;
72 if(g_sectionSegmentPool.seize(first)){
74 } else {
75 return false;
78 first.p->m_sz = len;
79 first.p->m_ownerRef = 0;
81 Ptr<SectionSegment> prevPtr = { (SectionSegment *)&dummyPrev[0], 0 };
82 Ptr<SectionSegment> currPtr = first;
84 while(len > SectionSegment::DataLength){
85 prevPtr.p->m_nextSegment = currPtr.i;
86 memcpy(&currPtr.p->theData[0], src, 4 * SectionSegment::DataLength);
87 src += SectionSegment::DataLength;
88 len -= SectionSegment::DataLength;
89 prevPtr = currPtr;
90 if(g_sectionSegmentPool.seize(currPtr)){
92 } else {
93 first.p->m_lastSegment = prevPtr.i;
94 return false;
98 first.p->m_lastSegment = currPtr.i;
99 currPtr.p->m_nextSegment = RNIL;
100 memcpy(&currPtr.p->theData[0], src, 4 * len);
101 return true;
104 void
105 linkSegments(Uint32 head, Uint32 tail){
107 Ptr<SectionSegment> headPtr;
108 g_sectionSegmentPool.getPtr(headPtr, head);
110 Ptr<SectionSegment> tailPtr;
111 g_sectionSegmentPool.getPtr(tailPtr, tail);
113 Ptr<SectionSegment> oldTailPtr;
114 g_sectionSegmentPool.getPtr(oldTailPtr, headPtr.p->m_lastSegment);
116 headPtr.p->m_lastSegment = tailPtr.p->m_lastSegment;
117 headPtr.p->m_sz += tailPtr.p->m_sz;
119 oldTailPtr.p->m_nextSegment = tailPtr.i;
122 void
123 copy(Uint32 * & insertPtr,
124 class SectionSegmentPool & thePool, const SegmentedSectionPtr & _ptr){
126 Uint32 len = _ptr.sz;
127 SectionSegment * ptrP = _ptr.p;
129 while(len > 60){
130 memcpy(insertPtr, &ptrP->theData[0], 4 * 60);
131 len -= 60;
132 insertPtr += 60;
133 ptrP = thePool.getPtr(ptrP->m_nextSegment);
135 memcpy(insertPtr, &ptrP->theData[0], 4 * len);
136 insertPtr += len;
139 void
140 copy(Uint32 * dst, SegmentedSectionPtr src){
141 copy(dst, g_sectionSegmentPool, src);
144 void
145 getSections(Uint32 secCount, SegmentedSectionPtr ptr[3]){
146 Uint32 tSec0 = ptr[0].i;
147 Uint32 tSec1 = ptr[1].i;
148 Uint32 tSec2 = ptr[2].i;
149 SectionSegment * p;
150 switch(secCount){
151 case 3:
152 p = g_sectionSegmentPool.getPtr(tSec2);
153 ptr[2].p = p;
154 ptr[2].sz = p->m_sz;
155 case 2:
156 p = g_sectionSegmentPool.getPtr(tSec1);
157 ptr[1].p = p;
158 ptr[1].sz = p->m_sz;
159 case 1:
160 p = g_sectionSegmentPool.getPtr(tSec0);
161 ptr[0].p = p;
162 ptr[0].sz = p->m_sz;
163 case 0:
164 return;
166 char msg[40];
167 sprintf(msg, "secCount=%d", secCount);
168 ErrorReporter::handleAssert(msg, __FILE__, __LINE__);
171 void
172 getSection(SegmentedSectionPtr & ptr, Uint32 i){
173 ptr.i = i;
174 SectionSegment * p = g_sectionSegmentPool.getPtr(i);
175 ptr.p = p;
176 ptr.sz = p->m_sz;
179 #define relSz(x) ((x + SectionSegment::DataLength - 1) / SectionSegment::DataLength)
181 void
182 release(SegmentedSectionPtr & ptr){
183 g_sectionSegmentPool.releaseList(relSz(ptr.sz),
184 ptr.i,
185 ptr.p->m_lastSegment);
188 void
189 releaseSections(Uint32 secCount, SegmentedSectionPtr ptr[3]){
190 Uint32 tSec0 = ptr[0].i;
191 Uint32 tSz0 = ptr[0].sz;
192 Uint32 tSec1 = ptr[1].i;
193 Uint32 tSz1 = ptr[1].sz;
194 Uint32 tSec2 = ptr[2].i;
195 Uint32 tSz2 = ptr[2].sz;
196 switch(secCount){
197 case 3:
198 g_sectionSegmentPool.releaseList(relSz(tSz2), tSec2,
199 ptr[2].p->m_lastSegment);
200 case 2:
201 g_sectionSegmentPool.releaseList(relSz(tSz1), tSec1,
202 ptr[1].p->m_lastSegment);
203 case 1:
204 g_sectionSegmentPool.releaseList(relSz(tSz0), tSec0,
205 ptr[0].p->m_lastSegment);
206 case 0:
207 return;
209 char msg[40];
210 sprintf(msg, "secCount=%d", secCount);
211 ErrorReporter::handleAssert(msg, __FILE__, __LINE__);
214 #include <DebuggerNames.hpp>
216 void
217 execute(void * callbackObj,
218 SignalHeader * const header,
219 Uint8 prio,
220 Uint32 * const theData,
221 LinearSectionPtr ptr[3]){
223 const Uint32 secCount = header->m_noOfSections;
224 const Uint32 length = header->theLength;
226 #ifdef TRACE_DISTRIBUTED
227 ndbout_c("recv: %s(%d) from (%s, %d)",
228 getSignalName(header->theVerId_signalNumber),
229 header->theVerId_signalNumber,
230 getBlockName(refToBlock(header->theSendersBlockRef)),
231 refToNode(header->theSendersBlockRef));
232 #endif
234 bool ok = true;
235 Ptr<SectionSegment> secPtr[3];
236 switch(secCount){
237 case 3:
238 ok &= import(secPtr[2], ptr[2].p, ptr[2].sz);
239 case 2:
240 ok &= import(secPtr[1], ptr[1].p, ptr[1].sz);
241 case 1:
242 ok &= import(secPtr[0], ptr[0].p, ptr[0].sz);
246 * Check that we haven't received a too long signal
248 ok &= (length + secCount <= 25);
250 Uint32 secPtrI[3];
251 if(ok){
253 * Normal path
255 secPtrI[0] = secPtr[0].i;
256 secPtrI[1] = secPtr[1].i;
257 secPtrI[2] = secPtr[2].i;
259 globalScheduler.execute(header, prio, theData, secPtrI);
260 return;
264 * Out of memory
266 for(Uint32 i = 0; i<secCount; i++){
267 if(secPtr[i].p != 0){
268 g_sectionSegmentPool.releaseList(relSz(ptr[i].sz), secPtr[i].i,
269 secPtr[i].p->m_lastSegment);
272 Uint32 gsn = header->theVerId_signalNumber;
273 Uint32 len = header->theLength;
274 Uint32 newLen= (len > 22 ? 22 : len);
275 SignalDroppedRep * rep = (SignalDroppedRep*)theData;
276 memmove(rep->originalData, theData, (4 * newLen));
277 rep->originalGsn = gsn;
278 rep->originalLength = len;
279 rep->originalSectionCount = secCount;
280 header->theVerId_signalNumber = GSN_SIGNAL_DROPPED_REP;
281 header->theLength = newLen + 3;
282 header->m_noOfSections = 0;
283 globalScheduler.execute(header, prio, theData, secPtrI);
286 NdbOut &
287 operator<<(NdbOut& out, const SectionSegment & ss){
288 out << "[ last= " << ss.m_lastSegment << " next= " << ss.nextPool << " ]";
289 return out;
292 void
293 print(SectionSegment * s, Uint32 len, FILE* out){
294 for(Uint32 i = 0; i<len; i++){
295 fprintf(out, "H\'0x%.8x ", s->theData[i]);
296 if(((i + 1) % 6) == 0)
297 fprintf(out, "\n");
301 void
302 print(SegmentedSectionPtr ptr, FILE* out){
304 ptr.p = g_sectionSegmentPool.getPtr(ptr.i);
305 Uint32 len = ptr.p->m_sz;
307 fprintf(out, "ptr.i = %d(%p) ptr.sz = %d(%d)\n", ptr.i, ptr.p, len, ptr.sz);
308 while(len > SectionSegment::DataLength){
309 print(ptr.p, SectionSegment::DataLength, out);
311 len -= SectionSegment::DataLength;
312 fprintf(out, "ptr.i = %d\n", ptr.p->m_nextSegment);
313 ptr.p = g_sectionSegmentPool.getPtr(ptr.p->m_nextSegment);
316 print(ptr.p, len, out);
317 fprintf(out, "\n");
321 checkJobBuffer() {
322 /**
323 * Check to see if jobbbuffers are starting to get full
324 * and if so call doJob
326 return globalScheduler.checkDoJob();
329 void
330 reportError(void * callbackObj, NodeId nodeId,
331 TransporterError errorCode, const char *info)
333 #ifdef DEBUG_TRANSPORTER
334 ndbout_c("reportError (%d, 0x%x) %s", nodeId, errorCode, info ? info : "")
335 #endif
337 DBUG_ENTER("reportError");
338 DBUG_PRINT("info",("nodeId %d errorCode: 0x%x info: %s",
339 nodeId, errorCode, info));
341 switch (errorCode)
343 case TE_SIGNAL_LOST_SEND_BUFFER_FULL:
345 char msg[64];
346 snprintf(msg, sizeof(msg), "Remote note id %d.%s%s", nodeId,
347 info ? " " : "", info ? info : "");
348 ErrorReporter::handleError(NDBD_EXIT_SIGNAL_LOST_SEND_BUFFER_FULL,
349 msg, __FILE__, NST_ErrorHandler);
351 case TE_SIGNAL_LOST:
353 char msg[64];
354 snprintf(msg, sizeof(msg), "Remote node id %d,%s%s", nodeId,
355 info ? " " : "", info ? info : "");
356 ErrorReporter::handleError(NDBD_EXIT_SIGNAL_LOST,
357 msg, __FILE__, NST_ErrorHandler);
359 case TE_SHM_IPC_PERMANENT:
361 char msg[128];
362 snprintf(msg, sizeof(msg),
363 "Remote node id %d.%s%s",
364 nodeId, info ? " " : "", info ? info : "");
365 ErrorReporter::handleError(NDBD_EXIT_CONNECTION_SETUP_FAILED,
366 msg, __FILE__, NST_ErrorHandler);
368 default:
369 break;
372 if(errorCode & TE_DO_DISCONNECT){
373 reportDisconnect(callbackObj, nodeId, errorCode);
376 SignalT<3> signalT;
377 Signal &signal= *(Signal*)&signalT;
378 memset(&signal.header, 0, sizeof(signal.header));
381 if(errorCode & TE_DO_DISCONNECT)
382 signal.theData[0] = NDB_LE_TransporterError;
383 else
384 signal.theData[0] = NDB_LE_TransporterWarning;
386 signal.theData[1] = nodeId;
387 signal.theData[2] = errorCode;
389 signal.header.theLength = 3;
390 signal.header.theSendersSignalId = 0;
391 signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
392 globalScheduler.execute(&signal, JBA, CMVMI, GSN_EVENT_REP);
394 DBUG_VOID_RETURN;
398 * Report average send length in bytes (4096 last sends)
400 void
401 reportSendLen(void * callbackObj,
402 NodeId nodeId, Uint32 count, Uint64 bytes){
404 SignalT<3> signalT;
405 Signal &signal= *(Signal*)&signalT;
406 memset(&signal.header, 0, sizeof(signal.header));
408 signal.header.theLength = 3;
409 signal.header.theSendersSignalId = 0;
410 signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
411 signal.theData[0] = NDB_LE_SendBytesStatistic;
412 signal.theData[1] = nodeId;
413 signal.theData[2] = (bytes/count);
414 globalScheduler.execute(&signal, JBA, CMVMI, GSN_EVENT_REP);
418 * Report average receive length in bytes (4096 last receives)
420 void
421 reportReceiveLen(void * callbackObj,
422 NodeId nodeId, Uint32 count, Uint64 bytes){
424 SignalT<3> signalT;
425 Signal &signal= *(Signal*)&signalT;
426 memset(&signal.header, 0, sizeof(signal.header));
428 signal.header.theLength = 3;
429 signal.header.theSendersSignalId = 0;
430 signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
431 signal.theData[0] = NDB_LE_ReceiveBytesStatistic;
432 signal.theData[1] = nodeId;
433 signal.theData[2] = (bytes/count);
434 globalScheduler.execute(&signal, JBA, CMVMI, GSN_EVENT_REP);
438 * Report connection established
441 void
442 reportConnect(void * callbackObj, NodeId nodeId){
444 SignalT<1> signalT;
445 Signal &signal= *(Signal*)&signalT;
446 memset(&signal.header, 0, sizeof(signal.header));
448 signal.header.theLength = 1;
449 signal.header.theSendersSignalId = 0;
450 signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
451 signal.theData[0] = nodeId;
453 globalScheduler.execute(&signal, JBA, CMVMI, GSN_CONNECT_REP);
457 * Report connection broken
459 void
460 reportDisconnect(void * callbackObj, NodeId nodeId, Uint32 errNo){
462 DBUG_ENTER("reportDisconnect");
464 SignalT<sizeof(DisconnectRep)/4> signalT;
465 Signal &signal= *(Signal*)&signalT;
466 memset(&signal.header, 0, sizeof(signal.header));
468 signal.header.theLength = DisconnectRep::SignalLength;
469 signal.header.theSendersSignalId = 0;
470 signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
471 signal.header.theTrace = TestOrd::TraceDisconnect;
473 DisconnectRep * const rep = (DisconnectRep *)&signal.theData[0];
474 rep->nodeId = nodeId;
475 rep->err = errNo;
477 globalScheduler.execute(&signal, JBA, CMVMI, GSN_DISCONNECT_REP);
479 DBUG_VOID_RETURN;
482 void
483 SignalLoggerManager::printSegmentedSection(FILE * output,
484 const SignalHeader & sh,
485 const SegmentedSectionPtr ptr[3],
486 unsigned i)
488 fprintf(output, "SECTION %u type=segmented", i);
489 if (i >= 3) {
490 fprintf(output, " *** invalid ***\n");
491 return;
493 const Uint32 len = ptr[i].sz;
494 SectionSegment * ssp = ptr[i].p;
495 Uint32 pos = 0;
496 fprintf(output, " size=%u\n", (unsigned)len);
497 while (pos < len) {
498 if (pos > 0 && pos % SectionSegment::DataLength == 0) {
499 ssp = g_sectionSegmentPool.getPtr(ssp->m_nextSegment);
501 printDataWord(output, pos, ssp->theData[pos % SectionSegment::DataLength]);
503 if (len > 0)
504 putc('\n', output);
507 void
508 transporter_recv_from(void * callbackObj, NodeId nodeId){
509 globalData.m_nodeInfo[nodeId].m_heartbeat_cnt= 0;
510 return;