mySQL 5.0.11 sources for tomato
[tomato.git] / release / src / router / mysql / storage / ndb / src / kernel / blocks / suma / Suma.cpp
blobbfcb74cbd048cff693acb948a8166d9e19073347
1 /* Copyright (c) 2003-2008 MySQL AB, 2009 Sun Microsystems, Inc.
2 Use is subject to license terms.
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
17 #include <my_config.h>
18 #include "Suma.hpp"
20 #include <ndb_version.h>
22 #include <NdbTCP.h>
23 #include <Bitmask.hpp>
24 #include <SimpleProperties.hpp>
26 #include <signaldata/NodeFailRep.hpp>
27 #include <signaldata/ReadNodesConf.hpp>
29 #include <signaldata/ListTables.hpp>
30 #include <signaldata/GetTabInfo.hpp>
31 #include <signaldata/GetTableId.hpp>
32 #include <signaldata/DictTabInfo.hpp>
33 #include <signaldata/SumaImpl.hpp>
34 #include <signaldata/ScanFrag.hpp>
35 #include <signaldata/TransIdAI.hpp>
36 #include <signaldata/CreateTrig.hpp>
37 #include <signaldata/AlterTrig.hpp>
38 #include <signaldata/DropTrig.hpp>
39 #include <signaldata/FireTrigOrd.hpp>
40 #include <signaldata/TrigAttrInfo.hpp>
41 #include <signaldata/CheckNodeGroups.hpp>
42 #include <signaldata/GCPSave.hpp>
43 #include <signaldata/CreateTab.hpp>
44 #include <signaldata/DropTab.hpp>
45 #include <signaldata/AlterTable.hpp>
46 #include <signaldata/AlterTab.hpp>
47 #include <signaldata/DihFragCount.hpp>
48 #include <signaldata/SystemError.hpp>
50 #include <ndbapi/NdbDictionary.hpp>
52 #include <DebuggerNames.hpp>
53 #include <../dbtup/Dbtup.hpp>
54 #include <../dbdih/Dbdih.hpp>
56 //#define HANDOVER_DEBUG
57 //#define NODEFAIL_DEBUG
58 //#define NODEFAIL_DEBUG2
59 //#define DEBUG_SUMA_SEQUENCE
60 //#define EVENT_DEBUG
61 //#define EVENT_PH3_DEBUG
62 //#define EVENT_DEBUG2
63 #if 0
64 #undef DBUG_ENTER
65 #undef DBUG_PRINT
66 #undef DBUG_RETURN
67 #undef DBUG_VOID_RETURN
69 #define DBUG_ENTER(a) {ndbout_c("%s:%d >%s", __FILE__, __LINE__, a);}
70 #define DBUG_PRINT(a,b) {ndbout << __FILE__ << ":" << __LINE__ << " " << a << ": "; ndbout_c b ;}
71 #define DBUG_RETURN(a) { ndbout_c("%s:%d <", __FILE__, __LINE__); return(a); }
72 #define DBUG_VOID_RETURN { ndbout_c("%s:%d <", __FILE__, __LINE__); return; }
73 #endif
75 /**
76 * @todo:
77 * SUMA crashes if an index is created at the same time as
78 * global replication. Very easy to reproduce using testIndex.
79 * Note: This only happens occasionally, but is quite easy to reprod.
82 Uint32 g_subPtrI = RNIL;
83 static const Uint32 SUMA_SEQUENCE = 0xBABEBABE;
85 static const Uint32 MAX_CONCURRENT_GCP = 2;
87 /**************************************************************
89 * Start of suma
93 #define PRINT_ONLY 0
95 void
96 Suma::getNodeGroupMembers(Signal* signal)
98 jam();
99 DBUG_ENTER("Suma::getNodeGroupMembers");
101 * Ask DIH for nodeGroupMembers
103 CheckNodeGroups * sd = (CheckNodeGroups*)signal->getDataPtrSend();
104 sd->blockRef = reference();
105 sd->requestType =
106 CheckNodeGroups::Direct |
107 CheckNodeGroups::GetNodeGroupMembers;
108 sd->nodeId = getOwnNodeId();
109 EXECUTE_DIRECT(DBDIH, GSN_CHECKNODEGROUPSREQ, signal,
110 CheckNodeGroups::SignalLength);
111 jamEntry();
113 c_nodeGroup = sd->output;
114 c_nodes_in_nodegroup_mask.assign(sd->mask);
115 c_noNodesInGroup = c_nodes_in_nodegroup_mask.count();
116 Uint32 i, pos= 0;
118 for (i = 0; i < MAX_NDB_NODES; i++) {
119 if (sd->mask.get(i))
121 c_nodesInGroup[pos++] = i;
125 const Uint32 replicas= c_noNodesInGroup;
127 Uint32 buckets= 1;
128 for(i = 1; i <= replicas; i++)
129 buckets *= i;
131 for(i = 0; i<buckets; i++)
133 Bucket* ptr= c_buckets+i;
134 for(Uint32 j= 0; j< replicas; j++)
136 ptr->m_nodes[j] = c_nodesInGroup[(i + j) % replicas];
140 c_no_of_buckets= buckets;
141 ndbrequire(c_noNodesInGroup > 0); // at least 1 node in the nodegroup
143 #ifndef DBUG_OFF
144 for (Uint32 i = 0; i < c_noNodesInGroup; i++) {
145 DBUG_PRINT("exit",("Suma: NodeGroup %u, me %u, "
146 "member[%u] %u",
147 c_nodeGroup, getOwnNodeId(),
148 i, c_nodesInGroup[i]));
150 #endif
152 DBUG_VOID_RETURN;
155 void
156 Suma::execREAD_CONFIG_REQ(Signal* signal)
158 jamEntry();
160 const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr();
162 Uint32 ref = req->senderRef;
163 Uint32 senderData = req->senderData;
165 const ndb_mgm_configuration_iterator * p =
166 m_ctx.m_config.getOwnConfigIterator();
167 ndbrequire(p != 0);
169 // SumaParticipant
170 Uint32 noTables, noAttrs;
171 ndb_mgm_get_int_parameter(p, CFG_DB_NO_TABLES,
172 &noTables);
173 ndb_mgm_get_int_parameter(p, CFG_DB_NO_ATTRIBUTES,
174 &noAttrs);
176 c_tablePool.setSize(noTables);
177 c_tables.setSize(noTables);
179 c_subscriptions.setSize(noTables);
180 c_subscriberPool.setSize(2*noTables);
182 c_subscriptionPool.setSize(noTables);
183 c_syncPool.setSize(2);
184 c_dataBufferPool.setSize(noAttrs);
186 // Calculate needed gcp pool as 10 records + the ones needed
187 // during a possible api timeout
188 Uint32 dbApiHbInterval, gcpInterval;
189 ndb_mgm_get_int_parameter(p, CFG_DB_API_HEARTBEAT_INTERVAL,
190 &dbApiHbInterval);
191 ndb_mgm_get_int_parameter(p, CFG_DB_GCP_INTERVAL,
192 &gcpInterval);
193 c_gcp_pool.setSize(10 + (4*dbApiHbInterval)/gcpInterval);
195 c_page_chunk_pool.setSize(50);
198 SLList<SyncRecord> tmp(c_syncPool);
199 Ptr<SyncRecord> ptr;
200 while(tmp.seize(ptr))
201 new (ptr.p) SyncRecord(* this, c_dataBufferPool);
202 tmp.release();
205 // Suma
206 c_masterNodeId = getOwnNodeId();
208 c_nodeGroup = c_noNodesInGroup = 0;
209 for (int i = 0; i < MAX_REPLICAS; i++) {
210 c_nodesInGroup[i] = 0;
213 m_first_free_page= RNIL;
215 c_no_of_buckets = 0;
216 memset(c_buckets, 0, sizeof(c_buckets));
217 for(Uint32 i = 0; i<NO_OF_BUCKETS; i++)
219 Bucket* bucket= c_buckets+i;
220 bucket->m_buffer_tail = RNIL;
221 bucket->m_buffer_head.m_page_id = RNIL;
222 bucket->m_buffer_head.m_page_pos = Buffer_page::DATA_WORDS;
225 m_max_seen_gci = 0; // FIRE_TRIG_ORD
226 m_max_sent_gci = 0; // FIRE_TRIG_ORD -> send
227 m_last_complete_gci = 0; // SUB_GCP_COMPLETE_REP
228 m_gcp_complete_rep_count = 0;
229 m_out_of_buffer_gci = 0;
231 c_startup.m_wait_handover= false;
232 c_failedApiNodes.clear();
234 ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
235 conf->senderRef = reference();
236 conf->senderData = senderData;
237 sendSignal(ref, GSN_READ_CONFIG_CONF, signal,
238 ReadConfigConf::SignalLength, JBB);
241 void
242 Suma::execSTTOR(Signal* signal) {
243 jamEntry();
245 DBUG_ENTER("Suma::execSTTOR");
246 const Uint32 startphase = signal->theData[1];
247 const Uint32 typeOfStart = signal->theData[7];
249 DBUG_PRINT("info",("startphase = %u, typeOfStart = %u",
250 startphase, typeOfStart));
252 if(startphase == 3)
254 jam();
255 ndbrequire((m_tup = (Dbtup*)globalData.getBlock(DBTUP)) != 0);
256 signal->theData[0] = reference();
257 sendSignal(NDBCNTR_REF, GSN_READ_NODESREQ, signal, 1, JBB);
258 DBUG_VOID_RETURN;
261 if(startphase == 5)
263 if (ERROR_INSERTED(13029)) /* Hold startphase 5 */
265 sendSignalWithDelay(SUMA_REF, GSN_STTOR, signal,
266 30, signal->getLength());
267 DBUG_VOID_RETURN;
270 c_startup.m_restart_server_node_id = 0;
271 getNodeGroupMembers(signal);
272 if (typeOfStart == NodeState::ST_NODE_RESTART ||
273 typeOfStart == NodeState::ST_INITIAL_NODE_RESTART)
275 jam();
277 send_start_me_req(signal);
278 DBUG_VOID_RETURN;
282 if(startphase == 7)
284 if (typeOfStart != NodeState::ST_NODE_RESTART &&
285 typeOfStart != NodeState::ST_INITIAL_NODE_RESTART)
287 for( Uint32 i = 0; i < c_no_of_buckets; i++)
289 if (get_responsible_node(i) == getOwnNodeId())
291 // I'm running this bucket
292 DBUG_PRINT("info",("bucket %u set to true", i));
293 m_active_buckets.set(i);
294 ndbout_c("m_active_buckets.set(%d)", i);
299 if(!m_active_buckets.isclear())
301 NdbNodeBitmask tmp;
302 Uint32 bucket = 0;
303 while ((bucket = m_active_buckets.find(bucket)) != Bucket_mask::NotFound)
305 tmp.set(get_responsible_node(bucket, c_nodes_in_nodegroup_mask));
306 bucket++;
309 ndbassert(tmp.get(getOwnNodeId()));
310 m_gcp_complete_rep_count = tmp.count();// I contribute 1 gcp complete rep
312 else
313 m_gcp_complete_rep_count = 0; // I contribute 1 gcp complete rep
315 if(typeOfStart == NodeState::ST_INITIAL_START &&
316 c_masterNodeId == getOwnNodeId())
318 jam();
319 createSequence(signal);
320 DBUG_VOID_RETURN;
321 }//if
323 if (ERROR_INSERTED(13030))
325 ndbout_c("Dont start handover");
326 DBUG_VOID_RETURN;
328 }//if
330 if(startphase == 100)
333 * Allow API's to connect
335 sendSTTORRY(signal);
336 DBUG_VOID_RETURN;
339 if(startphase == 101)
341 if (typeOfStart == NodeState::ST_NODE_RESTART ||
342 typeOfStart == NodeState::ST_INITIAL_NODE_RESTART)
345 * Handover code here
347 c_startup.m_wait_handover= true;
348 check_start_handover(signal);
349 DBUG_VOID_RETURN;
352 sendSTTORRY(signal);
354 DBUG_VOID_RETURN;
357 void
358 Suma::send_start_me_req(Signal* signal)
360 Uint32 nodeId= c_startup.m_restart_server_node_id;
361 do {
362 nodeId = c_alive_nodes.find(nodeId + 1);
364 if(nodeId == getOwnNodeId())
365 continue;
366 if(nodeId == NdbNodeBitmask::NotFound)
368 nodeId = 0;
369 continue;
371 break;
372 } while(true);
375 infoEvent("Suma: asking node %d to recreate subscriptions on me", nodeId);
376 c_startup.m_restart_server_node_id= nodeId;
377 sendSignal(calcSumaBlockRef(nodeId),
378 GSN_SUMA_START_ME_REQ, signal, 1, JBB);
381 void
382 Suma::execSUMA_START_ME_REF(Signal* signal)
384 const SumaStartMeRef* ref= (SumaStartMeRef*)signal->getDataPtr();
385 ndbrequire(ref->errorCode == SumaStartMeRef::Busy);
387 infoEvent("Suma: node %d refused %d",
388 c_startup.m_restart_server_node_id, ref->errorCode);
390 c_startup.m_restart_server_node_id++;
391 send_start_me_req(signal);
394 void
395 Suma::execSUMA_START_ME_CONF(Signal* signal)
397 infoEvent("Suma: node %d has completed restoring me",
398 c_startup.m_restart_server_node_id);
399 sendSTTORRY(signal);
400 c_startup.m_restart_server_node_id= 0;
403 void
404 Suma::createSequence(Signal* signal)
406 jam();
407 DBUG_ENTER("Suma::createSequence");
409 UtilSequenceReq * req = (UtilSequenceReq*)signal->getDataPtrSend();
411 req->senderData = RNIL;
412 req->sequenceId = SUMA_SEQUENCE;
413 req->requestType = UtilSequenceReq::Create;
414 sendSignal(DBUTIL_REF, GSN_UTIL_SEQUENCE_REQ,
415 signal, UtilSequenceReq::SignalLength, JBB);
416 // execUTIL_SEQUENCE_CONF will call createSequenceReply()
417 DBUG_VOID_RETURN;
420 void
421 Suma::createSequenceReply(Signal* signal,
422 UtilSequenceConf * conf,
423 UtilSequenceRef * ref)
425 jam();
427 if (ref != NULL)
429 switch ((UtilSequenceRef::ErrorCode)ref->errorCode)
431 case UtilSequenceRef::NoSuchSequence:
432 ndbrequire(false);
433 case UtilSequenceRef::TCError:
435 char buf[128];
436 snprintf(buf, sizeof(buf),
437 "Startup failed during sequence creation. TC error %d",
438 ref->TCErrorCode);
439 progError(__LINE__, NDBD_EXIT_RESOURCE_ALLOC_ERROR, buf);
442 ndbrequire(false);
445 sendSTTORRY(signal);
448 void
449 Suma::execREAD_NODESCONF(Signal* signal){
450 jamEntry();
451 ReadNodesConf * const conf = (ReadNodesConf *)signal->getDataPtr();
453 if(getNodeState().getNodeRestartInProgress())
455 c_alive_nodes.assign(NdbNodeBitmask::Size, conf->startedNodes);
456 c_alive_nodes.set(getOwnNodeId());
458 else
460 c_alive_nodes.assign(NdbNodeBitmask::Size, conf->startingNodes);
461 NdbNodeBitmask tmp;
462 tmp.assign(NdbNodeBitmask::Size, conf->startedNodes);
463 ndbrequire(tmp.isclear()); // No nodes can be started during SR
466 c_masterNodeId = conf->masterNodeId;
468 sendSTTORRY(signal);
471 void
472 Suma::execAPI_START_REP(Signal* signal)
474 Uint32 nodeId = signal->theData[0];
475 c_connected_nodes.set(nodeId);
477 check_start_handover(signal);
480 void
481 Suma::check_start_handover(Signal* signal)
483 if(c_startup.m_wait_handover)
485 NodeBitmask tmp;
486 tmp.assign(c_connected_nodes);
487 tmp.bitAND(c_subscriber_nodes);
488 if(!c_subscriber_nodes.equal(tmp))
490 return;
493 c_startup.m_wait_handover= false;
494 send_handover_req(signal);
498 void
499 Suma::send_handover_req(Signal* signal)
501 c_startup.m_handover_nodes.assign(c_alive_nodes);
502 c_startup.m_handover_nodes.bitAND(c_nodes_in_nodegroup_mask);
503 c_startup.m_handover_nodes.clear(getOwnNodeId());
504 Uint32 gci= m_last_complete_gci + 3;
506 SumaHandoverReq* req= (SumaHandoverReq*)signal->getDataPtrSend();
507 char buf[255];
508 c_startup.m_handover_nodes.getText(buf);
509 infoEvent("Suma: initiate handover with nodes %s GCI: %d",
510 buf, gci);
512 req->gci = gci;
513 req->nodeId = getOwnNodeId();
515 NodeReceiverGroup rg(SUMA, c_startup.m_handover_nodes);
516 sendSignal(rg, GSN_SUMA_HANDOVER_REQ, signal,
517 SumaHandoverReq::SignalLength, JBB);
520 void
521 Suma::sendSTTORRY(Signal* signal){
522 signal->theData[0] = 0;
523 signal->theData[3] = 1;
524 signal->theData[4] = 3;
525 signal->theData[5] = 5;
526 signal->theData[6] = 7;
527 signal->theData[7] = 100;
528 signal->theData[8] = 101;
529 signal->theData[9] = 255; // No more start phases from missra
530 sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 10, JBB);
533 void
534 Suma::execNDB_STTOR(Signal* signal)
536 jamEntry();
539 void
540 Suma::execCONTINUEB(Signal* signal){
541 jamEntry();
542 Uint32 type= signal->theData[0];
543 switch(type){
544 case SumaContinueB::RELEASE_GCI:
545 release_gci(signal, signal->theData[1], signal->theData[2]);
546 return;
547 case SumaContinueB::RESEND_BUCKET:
548 resend_bucket(signal,
549 signal->theData[1],
550 signal->theData[2],
551 signal->theData[3],
552 signal->theData[4]);
553 return;
554 case SumaContinueB::OUT_OF_BUFFER_RELEASE:
555 out_of_buffer_release(signal, signal->theData[1]);
556 return;
560 /*****************************************************************************
562 * Node state handling
564 *****************************************************************************/
566 void Suma::execAPI_FAILREQ(Signal* signal)
568 jamEntry();
569 DBUG_ENTER("Suma::execAPI_FAILREQ");
570 Uint32 failedApiNode = signal->theData[0];
571 //BlockReference retRef = signal->theData[1];
573 if (c_startup.m_restart_server_node_id &&
574 c_startup.m_restart_server_node_id != RNIL)
576 jam();
577 sendSignalWithDelay(reference(), GSN_API_FAILREQ, signal,
578 200, signal->getLength());
579 DBUG_VOID_RETURN;
582 if (c_failedApiNodes.get(failedApiNode))
584 jam();
585 DBUG_VOID_RETURN;
588 if (!c_subscriber_nodes.get(failedApiNode))
590 jam();
591 DBUG_VOID_RETURN;
594 c_failedApiNodes.set(failedApiNode);
595 c_connected_nodes.clear(failedApiNode);
596 bool found = removeSubscribersOnNode(signal, failedApiNode);
598 if(!found){
599 jam();
600 c_failedApiNodes.clear(failedApiNode);
603 SubGcpCompleteAck * const ack = (SubGcpCompleteAck*)signal->getDataPtr();
604 Ptr<Gcp_record> gcp;
605 for(c_gcp_list.first(gcp); !gcp.isNull(); c_gcp_list.next(gcp))
607 jam();
608 ack->rep.gci = gcp.p->m_gci;
609 if(gcp.p->m_subscribers.get(failedApiNode))
611 jam();
612 gcp.p->m_subscribers.clear(failedApiNode);
613 ack->rep.senderRef = numberToRef(0, failedApiNode);
614 sendSignal(SUMA_REF, GSN_SUB_GCP_COMPLETE_ACK, signal,
615 SubGcpCompleteAck::SignalLength, JBB);
619 c_subscriber_nodes.clear(failedApiNode);
621 check_start_handover(signal);
623 DBUG_VOID_RETURN;
624 }//execAPI_FAILREQ()
626 bool
627 Suma::removeSubscribersOnNode(Signal *signal, Uint32 nodeId)
629 DBUG_ENTER("Suma::removeSubscribersOnNode");
630 bool found = false;
632 KeyTable<Table>::Iterator it;
633 LINT_INIT(it.bucket);
634 LINT_INIT(it.curr.p);
635 for(c_tables.first(it);!it.isNull();c_tables.next(it))
637 LocalDLList<Subscriber> subbs(c_subscriberPool,it.curr.p->c_subscribers);
638 SubscriberPtr i_subbPtr;
639 for(subbs.first(i_subbPtr);!i_subbPtr.isNull();)
641 SubscriberPtr subbPtr = i_subbPtr;
642 subbs.next(i_subbPtr);
643 jam();
644 if (refToNode(subbPtr.p->m_senderRef) == nodeId) {
645 jam();
646 subbs.remove(subbPtr);
647 c_removeDataSubscribers.add(subbPtr);
648 found = true;
651 if (subbs.isEmpty())
653 // ToDo handle this
656 if(found){
657 jam();
658 sendSubStopReq(signal);
660 DBUG_RETURN(found);
663 void
664 Suma::sendSubStopReq(Signal *signal, bool unlock){
665 static bool remove_lock = false;
666 jam();
667 DBUG_ENTER("Suma::sendSubStopReq");
669 SubscriberPtr subbPtr;
670 c_removeDataSubscribers.first(subbPtr);
671 if (subbPtr.isNull()){
672 jam();
673 #if 0
674 signal->theData[0] = failedApiNode;
675 signal->theData[1] = reference();
676 sendSignal(retRef, GSN_API_FAILCONF, signal, 2, JBB);
677 #endif
678 c_failedApiNodes.clear();
680 remove_lock = false;
681 DBUG_VOID_RETURN;
684 if(remove_lock && !unlock) {
685 jam();
686 DBUG_VOID_RETURN;
688 remove_lock = true;
690 SubscriptionPtr subPtr;
691 c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
693 SubStopReq * const req = (SubStopReq*)signal->getDataPtrSend();
694 req->senderRef = reference();
695 req->senderData = subbPtr.i;
696 req->subscriberRef = subbPtr.p->m_senderRef;
697 req->subscriberData = subbPtr.p->m_senderData;
698 req->subscriptionId = subPtr.p->m_subscriptionId;
699 req->subscriptionKey = subPtr.p->m_subscriptionKey;
700 req->part = SubscriptionData::TableData;
702 sendSignal(SUMA_REF,GSN_SUB_STOP_REQ,signal,SubStopReq::SignalLength,JBB);
703 DBUG_VOID_RETURN;
706 void
707 Suma::execSUB_STOP_CONF(Signal* signal){
708 jamEntry();
709 DBUG_ENTER("Suma::execSUB_STOP_CONF");
710 ndbassert(signal->getNoOfSections() == 0);
711 sendSubStopReq(signal,true);
712 DBUG_VOID_RETURN;
715 void
716 Suma::execSUB_STOP_REF(Signal* signal){
717 jamEntry();
718 DBUG_ENTER("Suma::execSUB_STOP_REF");
719 ndbassert(signal->getNoOfSections() == 0);
721 SubStopRef * const ref = (SubStopRef*)signal->getDataPtr();
723 Uint32 senderData = ref->senderData;
724 Uint32 subscriptionId = ref->subscriptionId;
725 Uint32 subscriptionKey = ref->subscriptionKey;
726 Uint32 part = ref->part;
727 Uint32 subscriberData = ref->subscriberData;
728 Uint32 subscriberRef = ref->subscriberRef;
730 if(ref->errorCode != 1411){
731 ndbrequire(false);
734 SubStopReq * const req = (SubStopReq*)signal->getDataPtrSend();
735 req->senderRef = reference();
736 req->senderData = senderData;
737 req->subscriberRef = subscriberRef;
738 req->subscriberData = subscriberData;
739 req->subscriptionId = subscriptionId;
740 req->subscriptionKey = subscriptionKey;
741 req->part = part;
743 sendSignal(SUMA_REF,GSN_SUB_STOP_REQ,signal,SubStopReq::SignalLength,JBB);
745 DBUG_VOID_RETURN;
748 void
749 Suma::execNODE_FAILREP(Signal* signal){
750 jamEntry();
751 DBUG_ENTER("Suma::execNODE_FAILREP");
752 ndbassert(signal->getNoOfSections() == 0);
754 const NodeFailRep * rep = (NodeFailRep*)signal->getDataPtr();
755 NdbNodeBitmask failed; failed.assign(NdbNodeBitmask::Size, rep->theNodes);
757 if(failed.get(Restart.nodeId))
759 Restart.resetRestart(signal);
762 if (ERROR_INSERTED(13032))
764 Uint32 node = c_subscriber_nodes.find(0);
765 if (node != NodeBitmask::NotFound)
767 ndbout_c("Inserting API_FAILREQ node: %u", node);
768 signal->theData[0] = node;
769 EXECUTE_DIRECT(QMGR, GSN_API_FAILREQ, signal, 1);
773 signal->theData[0] = SumaContinueB::RESEND_BUCKET;
775 NdbNodeBitmask tmp;
776 tmp.assign(c_alive_nodes);
777 tmp.bitANDC(failed);
779 NdbNodeBitmask takeover_nodes;
781 if(c_nodes_in_nodegroup_mask.overlaps(failed))
783 for( Uint32 i = 0; i < c_no_of_buckets; i++)
785 if(m_active_buckets.get(i))
786 continue;
787 else if(m_switchover_buckets.get(i))
789 Uint32 state= c_buckets[i].m_state;
790 if((state & Bucket::BUCKET_HANDOVER) &&
791 failed.get(get_responsible_node(i)))
793 m_active_buckets.set(i);
794 m_switchover_buckets.clear(i);
795 ndbout_c("aborting handover");
797 else if(state & Bucket::BUCKET_STARTING)
799 progError(__LINE__, NDBD_EXIT_SYSTEM_ERROR,
800 "Nodefailure during SUMA takeover");
803 else if(get_responsible_node(i, tmp) == getOwnNodeId())
805 start_resend(signal, i);
810 c_alive_nodes.assign(tmp);
812 DBUG_VOID_RETURN;
815 void
816 Suma::execINCL_NODEREQ(Signal* signal){
817 jamEntry();
819 const Uint32 senderRef = signal->theData[0];
820 const Uint32 nodeId = signal->theData[1];
822 ndbrequire(!c_alive_nodes.get(nodeId));
823 c_alive_nodes.set(nodeId);
825 signal->theData[0] = nodeId;
826 signal->theData[1] = reference();
827 sendSignal(senderRef, GSN_INCL_NODECONF, signal, 2, JBB);
830 void
831 Suma::execSIGNAL_DROPPED_REP(Signal* signal){
832 jamEntry();
833 ndbrequire(false);
836 /********************************************************************
838 * Dump state
842 static unsigned
843 count_subscribers(const DLList<Suma::Subscriber> &subs)
845 unsigned n= 0;
846 Suma::SubscriberPtr i_subbPtr;
847 subs.first(i_subbPtr);
848 while(!i_subbPtr.isNull()){
849 n++;
850 subs.next(i_subbPtr);
852 return n;
855 void
856 Suma::execDUMP_STATE_ORD(Signal* signal){
857 jamEntry();
859 Uint32 tCase = signal->theData[0];
860 #if 0
861 if(tCase >= 8000 && tCase <= 8003){
862 SubscriptionPtr subPtr;
863 c_subscriptions.getPtr(subPtr, g_subPtrI);
865 Ptr<SyncRecord> syncPtr;
866 c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
868 if(tCase == 8000){
869 syncPtr.p->startMeta(signal);
872 if(tCase == 8001){
873 syncPtr.p->startScan(signal);
876 if(tCase == 8002){
877 syncPtr.p->startTrigger(signal);
880 if(tCase == 8003){
881 subPtr.p->m_subscriptionType = SubCreateReq::SingleTableScan;
882 LocalDataBuffer<15> attrs(c_dataBufferPool, syncPtr.p->m_attributeList);
883 Uint32 tab = 0;
884 Uint32 att[] = { 0, 1, 1 };
885 syncPtr.p->m_tableList.append(&tab, 1);
886 attrs.append(att, 3);
889 #endif
890 if(tCase == 8004){
891 infoEvent("Suma: c_subscriberPool size: %d free: %d",
892 c_subscriberPool.getSize(),
893 c_subscriberPool.getNoOfFree());
895 infoEvent("Suma: c_tablePool size: %d free: %d",
896 c_tablePool.getSize(),
897 c_tablePool.getNoOfFree());
899 infoEvent("Suma: c_subscriptionPool size: %d free: %d",
900 c_subscriptionPool.getSize(),
901 c_subscriptionPool.getNoOfFree());
903 infoEvent("Suma: c_syncPool size: %d free: %d",
904 c_syncPool.getSize(),
905 c_syncPool.getNoOfFree());
907 infoEvent("Suma: c_dataBufferPool size: %d free: %d",
908 c_dataBufferPool.getSize(),
909 c_dataBufferPool.getNoOfFree());
911 infoEvent("Suma: c_metaSubscribers count: %d",
912 count_subscribers(c_metaSubscribers));
913 #if 0
914 infoEvent("Suma: c_dataSubscribers count: %d",
915 count_subscribers(c_dataSubscribers));
916 infoEvent("Suma: c_prepDataSubscribers count: %d",
917 count_subscribers(c_prepDataSubscribers));
918 #endif
919 infoEvent("Suma: c_removeDataSubscribers count: %d",
920 count_subscribers(c_removeDataSubscribers));
923 if(tCase == 8005)
925 for(Uint32 i = 0; i<c_no_of_buckets; i++)
927 Bucket* ptr= c_buckets + i;
928 infoEvent("Bucket %d %d%d-%x switch gci: %d max_acked_gci: %d max_gci: %d tail: %d head: %d",
930 m_active_buckets.get(i),
931 m_switchover_buckets.get(i),
932 ptr->m_state,
933 ptr->m_switchover_gci,
934 ptr->m_max_acked_gci,
935 ptr->m_buffer_head.m_max_gci,
936 ptr->m_buffer_tail,
937 ptr->m_buffer_head.m_page_id);
941 if (tCase == 8006)
943 SET_ERROR_INSERT_VALUE(13029);
946 if (tCase == 8007)
948 c_startup.m_restart_server_node_id = MAX_NDB_NODES + 1;
949 SET_ERROR_INSERT_VALUE(13029);
952 if (tCase == 8008)
954 CLEAR_ERROR_INSERT_VALUE;
957 if (tCase == 8010)
959 char buf1[255], buf2[255];
960 c_subscriber_nodes.getText(buf1);
961 c_connected_nodes.getText(buf2);
962 infoEvent("c_subscriber_nodes: %s", buf1);
963 infoEvent("c_connected_nodes: %s", buf2);
966 if (tCase == 8009)
968 if (ERROR_INSERTED(13030))
970 CLEAR_ERROR_INSERT_VALUE;
971 sendSTTORRY(signal);
973 else
975 SET_ERROR_INSERT_VALUE(13030);
977 return;
980 if (tCase == 8011)
982 jam();
983 Uint32 bucket = signal->theData[1];
984 KeyTable<Table>::Iterator it;
985 if (signal->getLength() == 1)
987 jam();
988 bucket = 0;
989 infoEvent("-- Starting dump of subscribers --");
992 c_tables.next(bucket, it);
993 const Uint32 RT_BREAK = 16;
994 for(Uint32 i = 0; i<RT_BREAK || it.bucket == bucket; i++)
996 jam();
997 if(it.curr.i == RNIL)
999 jam();
1000 infoEvent("-- Ending dump of subscribers --");
1001 return;
1004 infoEvent("Table: %u ver: %u #n: %u (ref,data,subscritopn)",
1005 it.curr.p->m_tableId,
1006 it.curr.p->m_schemaVersion,
1007 it.curr.p->n_subscribers);
1009 Ptr<Subscriber> ptr;
1010 LocalDLList<Subscriber> list(c_subscriberPool, it.curr.p->c_subscribers);
1011 for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++)
1013 jam();
1014 infoEvent(" [ %x %u %u ]",
1015 ptr.p->m_senderRef,
1016 ptr.p->m_senderData,
1017 ptr.p->m_subPtrI);
1019 c_tables.next(it);
1022 signal->theData[0] = tCase;
1023 signal->theData[1] = it.bucket;
1024 sendSignalWithDelay(reference(), GSN_DUMP_STATE_ORD, signal, 100, 2);
1025 return;
1029 /*************************************************************
1031 * Creation of subscription id's
1033 ************************************************************/
1035 void
1036 Suma::execCREATE_SUBID_REQ(Signal* signal)
1038 jamEntry();
1039 DBUG_ENTER("Suma::execCREATE_SUBID_REQ");
1040 ndbassert(signal->getNoOfSections() == 0);
1041 CRASH_INSERTION(13001);
1043 CreateSubscriptionIdReq const * req =
1044 (CreateSubscriptionIdReq*)signal->getDataPtr();
1045 SubscriberPtr subbPtr;
1046 if(!c_subscriberPool.seize(subbPtr)){
1047 jam();
1048 sendSubIdRef(signal, req->senderRef, req->senderData, 1412);
1049 DBUG_VOID_RETURN;
1051 DBUG_PRINT("info",("c_subscriberPool size: %d free: %d",
1052 c_subscriberPool.getSize(),
1053 c_subscriberPool.getNoOfFree()));
1055 subbPtr.p->m_senderRef = req->senderRef;
1056 subbPtr.p->m_senderData = req->senderData;
1058 UtilSequenceReq * utilReq = (UtilSequenceReq*)signal->getDataPtrSend();
1059 utilReq->senderData = subbPtr.i;
1060 utilReq->sequenceId = SUMA_SEQUENCE;
1061 utilReq->requestType = UtilSequenceReq::NextVal;
1062 sendSignal(DBUTIL_REF, GSN_UTIL_SEQUENCE_REQ,
1063 signal, UtilSequenceReq::SignalLength, JBB);
1065 DBUG_VOID_RETURN;
1068 void
1069 Suma::execUTIL_SEQUENCE_CONF(Signal* signal)
1071 jamEntry();
1072 DBUG_ENTER("Suma::execUTIL_SEQUENCE_CONF");
1073 ndbassert(signal->getNoOfSections() == 0);
1074 CRASH_INSERTION(13002);
1076 UtilSequenceConf * conf = (UtilSequenceConf*)signal->getDataPtr();
1077 if(conf->requestType == UtilSequenceReq::Create) {
1078 jam();
1079 createSequenceReply(signal, conf, NULL);
1080 DBUG_VOID_RETURN;
1083 Uint64 subId;
1084 memcpy(&subId,conf->sequenceValue,8);
1085 SubscriberPtr subbPtr;
1086 c_subscriberPool.getPtr(subbPtr,conf->senderData);
1088 CreateSubscriptionIdConf * subconf = (CreateSubscriptionIdConf*)conf;
1089 subconf->senderRef = reference();
1090 subconf->senderData = subbPtr.p->m_senderData;
1091 subconf->subscriptionId = (Uint32)subId;
1092 subconf->subscriptionKey =(getOwnNodeId() << 16) | (Uint32)(subId & 0xFFFF);
1094 sendSignal(subbPtr.p->m_senderRef, GSN_CREATE_SUBID_CONF, signal,
1095 CreateSubscriptionIdConf::SignalLength, JBB);
1097 c_subscriberPool.release(subbPtr);
1098 DBUG_PRINT("info",("c_subscriberPool size: %d free: %d",
1099 c_subscriberPool.getSize(),
1100 c_subscriberPool.getNoOfFree()));
1101 DBUG_VOID_RETURN;
1104 void
1105 Suma::execUTIL_SEQUENCE_REF(Signal* signal)
1107 jamEntry();
1108 DBUG_ENTER("Suma::execUTIL_SEQUENCE_REF");
1109 ndbassert(signal->getNoOfSections() == 0);
1110 UtilSequenceRef * ref = (UtilSequenceRef*)signal->getDataPtr();
1111 Uint32 err= ref->errorCode;
1113 if(ref->requestType == UtilSequenceReq::Create) {
1114 jam();
1115 createSequenceReply(signal, NULL, ref);
1116 DBUG_VOID_RETURN;
1119 Uint32 subData = ref->senderData;
1121 SubscriberPtr subbPtr;
1122 c_subscriberPool.getPtr(subbPtr,subData);
1123 sendSubIdRef(signal, subbPtr.p->m_senderRef, subbPtr.p->m_senderData, err);
1124 c_subscriberPool.release(subbPtr);
1125 DBUG_PRINT("info",("c_subscriberPool size: %d free: %d",
1126 c_subscriberPool.getSize(),
1127 c_subscriberPool.getNoOfFree()));
1128 DBUG_VOID_RETURN;
1129 }//execUTIL_SEQUENCE_REF()
1132 void
1133 Suma::sendSubIdRef(Signal* signal,
1134 Uint32 senderRef, Uint32 senderData, Uint32 errCode)
1136 jam();
1137 DBUG_ENTER("Suma::sendSubIdRef");
1138 CreateSubscriptionIdRef * ref =
1139 (CreateSubscriptionIdRef *)signal->getDataPtrSend();
1141 ref->senderRef = reference();
1142 ref->senderData = senderData;
1143 ref->errorCode = errCode;
1144 sendSignal(senderRef,
1145 GSN_CREATE_SUBID_REF,
1146 signal,
1147 CreateSubscriptionIdRef::SignalLength,
1148 JBB);
1150 releaseSections(signal);
1151 DBUG_VOID_RETURN;
1154 /**********************************************************
1155 * Suma participant interface
1157 * Creation of subscriptions
1160 void
1161 Suma::addTableId(Uint32 tableId,
1162 SubscriptionPtr subPtr, SyncRecord *psyncRec)
1164 DBUG_ENTER("Suma::addTableId");
1165 DBUG_PRINT("enter",("tableId: %u subPtr.i: %u", tableId, subPtr.i));
1166 subPtr.p->m_tableId= tableId;
1167 if(psyncRec != NULL)
1168 psyncRec->m_tableList.append(&tableId, 1);
1169 DBUG_VOID_RETURN;
1172 void
1173 Suma::execSUB_CREATE_REQ(Signal* signal)
1175 jamEntry();
1176 DBUG_ENTER("Suma::execSUB_CREATE_REQ");
1177 ndbassert(signal->getNoOfSections() == 0);
1178 CRASH_INSERTION(13003);
1180 const SubCreateReq req = *(SubCreateReq*)signal->getDataPtr();
1182 const Uint32 subRef = req.senderRef;
1183 const Uint32 subData = req.senderData;
1184 const Uint32 subId = req.subscriptionId;
1185 const Uint32 subKey = req.subscriptionKey;
1186 const Uint32 type = req.subscriptionType & SubCreateReq::RemoveFlags;
1187 const Uint32 flags = req.subscriptionType & SubCreateReq::GetFlags;
1188 const bool addTableFlag = (flags & SubCreateReq::AddTableFlag) != 0;
1189 const bool restartFlag = (flags & SubCreateReq::RestartFlag) != 0;
1190 const Uint32 reportAll = (flags & SubCreateReq::ReportAll) ?
1191 Subscription::REPORT_ALL : 0;
1192 const Uint32 reportSubscribe = (flags & SubCreateReq::ReportSubscribe) ?
1193 Subscription::REPORT_SUBSCRIBE : 0;
1194 const Uint32 tableId = req.tableId;
1195 Subscription::State state = (Subscription::State) req.state;
1196 if (signal->getLength() != SubCreateReq::SignalLength2)
1199 api or restarted by older version
1200 if restarted by old version, do the best we can
1202 state = Subscription::DEFINED;
1205 Subscription key;
1206 key.m_subscriptionId = subId;
1207 key.m_subscriptionKey = subKey;
1209 DBUG_PRINT("enter",("key.m_subscriptionId: %u, key.m_subscriptionKey: %u",
1210 key.m_subscriptionId, key.m_subscriptionKey));
1212 SubscriptionPtr subPtr;
1214 if (addTableFlag) {
1215 ndbrequire(restartFlag); //TODO remove this
1217 if(!c_subscriptions.find(subPtr, key)) {
1218 jam();
1219 sendSubCreateRef(signal, 1407);
1220 DBUG_VOID_RETURN;
1222 jam();
1223 if (restartFlag)
1225 ndbrequire(type != SubCreateReq::SingleTableScan);
1226 ndbrequire(req.tableId != subPtr.p->m_tableId);
1227 ndbrequire(type != SubCreateReq::TableEvent);
1228 addTableId(req.tableId, subPtr, 0);
1230 } else {
1231 if (c_startup.m_restart_server_node_id &&
1232 subRef != calcSumaBlockRef(c_startup.m_restart_server_node_id))
1235 * only allow "restart_server" Suma's to come through
1236 * for restart purposes
1238 jam();
1239 sendSubCreateRef(signal, 1415);
1240 DBUG_VOID_RETURN;
1242 // Check that id/key is unique
1243 if(c_subscriptions.find(subPtr, key)) {
1244 jam();
1245 sendSubCreateRef(signal, 1415);
1246 DBUG_VOID_RETURN;
1248 if(!c_subscriptions.seize(subPtr)) {
1249 jam();
1250 sendSubCreateRef(signal, 1412);
1251 DBUG_VOID_RETURN;
1253 DBUG_PRINT("info",("c_subscriptionPool size: %d free: %d",
1254 c_subscriptionPool.getSize(),
1255 c_subscriptionPool.getNoOfFree()));
1256 jam();
1257 subPtr.p->m_senderRef = subRef;
1258 subPtr.p->m_senderData = subData;
1259 subPtr.p->m_subscriptionId = subId;
1260 subPtr.p->m_subscriptionKey = subKey;
1261 subPtr.p->m_subscriptionType = type;
1262 subPtr.p->m_options = reportSubscribe | reportAll;
1263 subPtr.p->m_tableId = tableId;
1264 subPtr.p->m_table_ptrI = RNIL;
1265 subPtr.p->m_state = state;
1266 subPtr.p->n_subscribers = 0;
1267 subPtr.p->m_current_sync_ptrI = RNIL;
1269 fprintf(stderr, "table %d options %x\n", subPtr.p->m_tableId, subPtr.p->m_options);
1270 DBUG_PRINT("info",("Added: key.m_subscriptionId: %u, key.m_subscriptionKey: %u",
1271 key.m_subscriptionId, key.m_subscriptionKey));
1273 c_subscriptions.add(subPtr);
1276 SubCreateConf * const conf = (SubCreateConf*)signal->getDataPtrSend();
1277 conf->senderRef = reference();
1278 conf->senderData = subPtr.p->m_senderData;
1279 sendSignal(subRef, GSN_SUB_CREATE_CONF, signal, SubCreateConf::SignalLength, JBB);
1280 DBUG_VOID_RETURN;
1283 void
1284 Suma::sendSubCreateRef(Signal* signal, Uint32 errCode)
1286 jam();
1287 SubCreateRef * ref = (SubCreateRef *)signal->getDataPtrSend();
1288 ref->errorCode = errCode;
1289 sendSignal(signal->getSendersBlockRef(), GSN_SUB_CREATE_REF, signal,
1290 SubCreateRef::SignalLength, JBB);
1291 return;
1294 /**********************************************************
1296 * Setting upp trigger for subscription
1300 void
1301 Suma::execSUB_SYNC_REQ(Signal* signal)
1303 jamEntry();
1304 DBUG_ENTER("Suma::execSUB_SYNC_REQ");
1305 ndbassert(signal->getNoOfSections() <= 1);
1306 CRASH_INSERTION(13004);
1308 SubSyncReq * const req = (SubSyncReq*)signal->getDataPtr();
1310 SubscriptionPtr subPtr;
1311 Subscription key;
1312 key.m_subscriptionId = req->subscriptionId;
1313 key.m_subscriptionKey = req->subscriptionKey;
1315 DBUG_PRINT("enter",("key.m_subscriptionId: %u, key.m_subscriptionKey: %u",
1316 key.m_subscriptionId, key.m_subscriptionKey));
1318 if(!c_subscriptions.find(subPtr, key))
1320 jam();
1321 DBUG_PRINT("info",("Not found"));
1322 sendSubSyncRef(signal, 1407);
1323 DBUG_VOID_RETURN;
1326 bool ok = false;
1327 SubscriptionData::Part part = (SubscriptionData::Part)req->part;
1329 Ptr<SyncRecord> syncPtr;
1330 if(!c_syncPool.seize(syncPtr))
1332 jam();
1333 sendSubSyncRef(signal, 1416);
1334 DBUG_VOID_RETURN;
1336 DBUG_PRINT("info",("c_syncPool size: %d free: %d",
1337 c_syncPool.getSize(),
1338 c_syncPool.getNoOfFree()));
1340 syncPtr.p->m_senderRef = req->senderRef;
1341 syncPtr.p->m_senderData = req->senderData;
1342 syncPtr.p->m_subscriptionPtrI = subPtr.i;
1343 syncPtr.p->ptrI = syncPtr.i;
1344 syncPtr.p->m_error = 0;
1346 subPtr.p->m_current_sync_ptrI = syncPtr.i;
1349 jam();
1350 syncPtr.p->m_tableList.append(&subPtr.p->m_tableId, 1);
1351 if(signal->getNoOfSections() > 0){
1352 SegmentedSectionPtr ptr(0,0,0);
1353 signal->getSection(ptr, SubSyncReq::ATTRIBUTE_LIST);
1354 LocalDataBuffer<15> attrBuf(c_dataBufferPool,syncPtr.p->m_attributeList);
1355 append(attrBuf, ptr, getSectionSegmentPool());
1356 releaseSections(signal);
1360 TablePtr tabPtr;
1361 initTable(signal,subPtr.p->m_tableId,tabPtr,syncPtr);
1362 tabPtr.p->n_subscribers++;
1363 if (subPtr.p->m_options & Subscription::REPORT_ALL)
1364 tabPtr.p->m_reportAll = true;
1365 DBUG_PRINT("info",("Suma::Table[%u]::n_subscribers: %u",
1366 tabPtr.p->m_tableId, tabPtr.p->n_subscribers));
1367 DBUG_VOID_RETURN;
1369 switch(part){
1370 case SubscriptionData::MetaData:
1371 ndbrequire(false);
1372 #if 0
1373 ok = true;
1374 jam();
1375 if (subPtr.p->m_subscriptionType == SubCreateReq::DatabaseSnapshot) {
1376 TableList::DataBufferIterator it;
1377 syncPtr.p->m_tableList.first(it);
1378 if(it.isNull()) {
1380 * Get all tables from dict
1382 ListTablesReq * req = (ListTablesReq*)signal->getDataPtrSend();
1383 req->senderRef = reference();
1384 req->senderData = syncPtr.i;
1385 req->requestData = 0;
1387 * @todo: accomodate scan of index tables?
1389 req->setTableType(DictTabInfo::UserTable);
1391 sendSignal(DBDICT_REF, GSN_LIST_TABLES_REQ, signal,
1392 ListTablesReq::SignalLength, JBB);
1393 break;
1397 syncPtr.p->startMeta(signal);
1398 #endif
1399 break;
1400 case SubscriptionData::TableData: {
1401 ok = true;
1402 jam();
1403 syncPtr.p->startScan(signal);
1404 break;
1407 ndbrequire(ok);
1408 DBUG_VOID_RETURN;
1411 void
1412 Suma::sendSubSyncRef(Signal* signal, Uint32 errCode){
1413 jam();
1414 SubSyncRef * ref= (SubSyncRef *)signal->getDataPtrSend();
1415 ref->errorCode = errCode;
1416 releaseSections(signal);
1417 sendSignal(signal->getSendersBlockRef(),
1418 GSN_SUB_SYNC_REF,
1419 signal,
1420 SubSyncRef::SignalLength,
1421 JBB);
1422 return;
1425 /**********************************************************
1426 * Dict interface
1429 #if 0
1430 void
1431 Suma::execLIST_TABLES_CONF(Signal* signal){
1432 jamEntry();
1433 CRASH_INSERTION(13005);
1434 ListTablesConf* const conf = (ListTablesConf*)signal->getDataPtr();
1435 SyncRecord* tmp = c_syncPool.getPtr(conf->senderData);
1436 tmp->runLIST_TABLES_CONF(signal);
1438 #endif
1441 /*************************************************************************
1445 #if 0
1446 void
1447 Suma::Table::runLIST_TABLES_CONF(Signal* signal){
1448 jam();
1450 ListTablesConf * const conf = (ListTablesConf*)signal->getDataPtr();
1451 const Uint32 len = signal->length() - ListTablesConf::HeaderLength;
1453 SubscriptionPtr subPtr;
1454 suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
1456 for (unsigned i = 0; i < len; i++) {
1457 subPtr.p->m_maxTables++;
1458 suma.addTableId(ListTablesConf::getTableId(conf->tableData[i]), subPtr, this);
1461 // for (unsigned i = 0; i < len; i++)
1462 // conf->tableData[i] = ListTablesConf::getTableId(conf->tableData[i]);
1463 // m_tableList.append(&conf->tableData[0], len);
1465 #if 0
1466 TableList::DataBufferIterator it;
1467 int i = 0;
1468 for(m_tableList.first(it);!it.isNull();m_tableList.next(it)) {
1469 ndbout_c("%u listtableconf tableid %d", i++, *it.data);
1471 #endif
1473 if(len == ListTablesConf::DataLength){
1474 jam();
1475 // we expect more LIST_TABLE_CONF
1476 return;
1479 #if 0
1480 subPtr.p->m_currentTable = 0;
1481 subPtr.p->m_maxTables = 0;
1483 TableList::DataBufferIterator it;
1484 for(m_tableList.first(it); !it.isNull(); m_tableList.next(it)) {
1485 subPtr.p->m_maxTables++;
1486 suma.addTableId(*it.data, subPtr, NULL);
1487 #ifdef NODEFAIL_DEBUG
1488 ndbout_c(" listtableconf tableid %d",*it.data);
1489 #endif
1491 #endif
1493 startMeta(signal);
1495 #endif
1498 int
1499 Suma::initTable(Signal *signal, Uint32 tableId, TablePtr &tabPtr,
1500 SubscriberPtr subbPtr)
1502 DBUG_ENTER("Suma::initTable SubscriberPtr");
1503 DBUG_PRINT("enter",("tableId: %d", tableId));
1505 int r= initTable(signal,tableId,tabPtr);
1508 LocalDLList<Subscriber> subscribers(c_subscriberPool,
1509 tabPtr.p->c_subscribers);
1510 subscribers.add(subbPtr);
1513 DBUG_PRINT("info",("added subscriber: %i", subbPtr.i));
1515 if (r)
1517 jam();
1518 // we have to wait getting tab info
1519 DBUG_RETURN(1);
1522 if (tabPtr.p->setupTrigger(signal, *this))
1524 jam();
1525 // we have to wait for triggers to be setup
1526 DBUG_RETURN(1);
1529 int ret = completeOneSubscriber(signal, tabPtr, subbPtr);
1530 if (ret == -1)
1532 jam();
1533 LocalDLList<Subscriber> subscribers(c_subscriberPool,
1534 tabPtr.p->c_subscribers);
1535 subscribers.release(subbPtr);
1537 completeInitTable(signal, tabPtr);
1538 DBUG_RETURN(0);
1541 int
1542 Suma::initTable(Signal *signal, Uint32 tableId, TablePtr &tabPtr,
1543 Ptr<SyncRecord> syncPtr)
1545 jam();
1546 DBUG_ENTER("Suma::initTable Ptr<SyncRecord>");
1547 DBUG_PRINT("enter",("tableId: %d", tableId));
1549 int r= initTable(signal,tableId,tabPtr);
1552 LocalDLList<SyncRecord> syncRecords(c_syncPool,tabPtr.p->c_syncRecords);
1553 syncRecords.add(syncPtr);
1556 if (r)
1558 // we have to wait getting tab info
1559 DBUG_RETURN(1);
1561 completeInitTable(signal, tabPtr);
1562 DBUG_RETURN(0);
1566 Suma::initTable(Signal *signal, Uint32 tableId, TablePtr &tabPtr)
1568 jam();
1569 DBUG_ENTER("Suma::initTable");
1571 if (!c_tables.find(tabPtr, tableId) ||
1572 tabPtr.p->m_state == Table::DROPPED ||
1573 tabPtr.p->m_state == Table::ALTERED)
1575 // table not being prepared
1576 // seize a new table, initialize and add to c_tables
1577 ndbrequire(c_tablePool.seize(tabPtr));
1578 DBUG_PRINT("info",("c_tablePool size: %d free: %d",
1579 c_tablePool.getSize(),
1580 c_tablePool.getNoOfFree()));
1581 new (tabPtr.p) Table;
1583 tabPtr.p->m_tableId= tableId;
1584 tabPtr.p->m_ptrI= tabPtr.i;
1585 tabPtr.p->n_subscribers = 0;
1586 DBUG_PRINT("info",("Suma::Table[%u,i=%u]::n_subscribers: %u",
1587 tabPtr.p->m_tableId, tabPtr.i, tabPtr.p->n_subscribers));
1589 tabPtr.p->m_reportAll = false;
1591 tabPtr.p->m_error = 0;
1592 tabPtr.p->m_schemaVersion = RNIL;
1593 tabPtr.p->m_state = Table::DEFINING;
1594 tabPtr.p->m_drop_subbPtr.p = 0;
1595 for (int j= 0; j < 3; j++)
1597 tabPtr.p->m_hasTriggerDefined[j] = 0;
1598 tabPtr.p->m_hasOutstandingTriggerReq[j] = 0;
1599 tabPtr.p->m_triggerIds[j] = ILLEGAL_TRIGGER_ID;
1602 c_tables.add(tabPtr);
1604 GetTabInfoReq * req = (GetTabInfoReq *)signal->getDataPtrSend();
1605 req->senderRef = reference();
1606 req->senderData = tabPtr.i;
1607 req->requestType =
1608 GetTabInfoReq::RequestById | GetTabInfoReq::LongSignalConf;
1609 req->tableId = tableId;
1611 DBUG_PRINT("info",("GET_TABINFOREQ id %d", req->tableId));
1613 if (ERROR_INSERTED(13031))
1615 jam();
1616 CLEAR_ERROR_INSERT_VALUE;
1617 GetTabInfoRef* ref = (GetTabInfoRef*)signal->getDataPtrSend();
1618 ref->tableId = tableId;
1619 ref->senderData = tabPtr.i;
1620 ref->errorCode = GetTabInfoRef::TableNotDefined;
1621 sendSignal(reference(), GSN_GET_TABINFOREF, signal,
1622 GetTabInfoRef::SignalLength, JBB);
1623 DBUG_RETURN(1);
1626 sendSignal(DBDICT_REF, GSN_GET_TABINFOREQ, signal,
1627 GetTabInfoReq::SignalLength, JBB);
1628 DBUG_RETURN(1);
1630 if (tabPtr.p->m_state == Table::DEFINING)
1632 DBUG_RETURN(1);
1634 // ToDo should be a ref signal instead
1635 ndbrequire(tabPtr.p->m_state == Table::DEFINED);
1636 DBUG_RETURN(0);
1640 Suma::completeOneSubscriber(Signal *signal, TablePtr tabPtr, SubscriberPtr subbPtr)
1642 jam();
1643 DBUG_ENTER("Suma::completeOneSubscriber");
1645 if (tabPtr.p->m_error &&
1646 (c_startup.m_restart_server_node_id == 0 ||
1647 tabPtr.p->m_state != Table::DROPPED))
1649 jam();
1650 sendSubStartRef(signal,subbPtr,tabPtr.p->m_error,
1651 SubscriptionData::TableData);
1652 tabPtr.p->n_subscribers--;
1653 DBUG_RETURN(-1);
1655 else
1657 jam();
1658 SubscriptionPtr subPtr;
1659 c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
1660 subPtr.p->m_table_ptrI= tabPtr.i;
1661 sendSubStartComplete(signal,subbPtr, m_last_complete_gci + 3,
1662 SubscriptionData::TableData);
1664 DBUG_RETURN(0);
1667 void
1668 Suma::completeAllSubscribers(Signal *signal, TablePtr tabPtr)
1670 jam();
1671 DBUG_ENTER("Suma::completeAllSubscribers");
1672 // handle all subscribers
1674 LocalDLList<Subscriber> subscribers(c_subscriberPool,
1675 tabPtr.p->c_subscribers);
1676 SubscriberPtr subbPtr;
1677 for(subscribers.first(subbPtr); !subbPtr.isNull();)
1679 jam();
1680 Ptr<Subscriber> tmp = subbPtr;
1681 subscribers.next(subbPtr);
1682 int ret = completeOneSubscriber(signal, tabPtr, tmp);
1683 if (ret == -1)
1685 jam();
1686 subscribers.release(tmp);
1690 DBUG_VOID_RETURN;
1693 void
1694 Suma::completeInitTable(Signal *signal, TablePtr tabPtr)
1696 jam();
1697 DBUG_ENTER("Suma::completeInitTable");
1699 // handle all syncRecords
1700 while (!tabPtr.p->c_syncRecords.isEmpty())
1702 Ptr<SyncRecord> syncPtr;
1704 LocalDLList<SyncRecord> syncRecords(c_syncPool,
1705 tabPtr.p->c_syncRecords);
1706 syncRecords.first(syncPtr);
1707 syncRecords.remove(syncPtr);
1709 syncPtr.p->ptrI = syncPtr.i;
1710 if (tabPtr.p->m_error == 0)
1712 jam();
1713 syncPtr.p->startScan(signal);
1715 else
1717 jam();
1718 syncPtr.p->completeScan(signal, tabPtr.p->m_error);
1719 tabPtr.p->n_subscribers--;
1723 if (tabPtr.p->m_error)
1725 DBUG_PRINT("info",("Suma::Table[%u]::n_subscribers: %u",
1726 tabPtr.p->m_tableId, tabPtr.p->n_subscribers));
1727 tabPtr.p->checkRelease(*this);
1729 else
1731 tabPtr.p->m_state = Table::DEFINED;
1734 DBUG_VOID_RETURN;
1738 void
1739 Suma::execGET_TABINFOREF(Signal* signal){
1740 jamEntry();
1741 GetTabInfoRef* ref = (GetTabInfoRef*)signal->getDataPtr();
1742 Uint32 tableId = ref->tableId;
1743 Uint32 senderData = ref->senderData;
1744 GetTabInfoRef::ErrorCode errorCode =
1745 (GetTabInfoRef::ErrorCode) ref->errorCode;
1746 int do_resend_request = 0;
1747 TablePtr tabPtr;
1748 c_tablePool.getPtr(tabPtr, senderData);
1749 switch (errorCode)
1751 case GetTabInfoRef::TableNotDefined:
1752 // wrong state
1753 break;
1754 case GetTabInfoRef::InvalidTableId:
1755 // no such table
1756 break;
1757 case GetTabInfoRef::Busy:
1758 do_resend_request = 1;
1759 break;
1760 case GetTabInfoRef::TableNameTooLong:
1761 ndbrequire(false);
1762 break;
1763 case GetTabInfoRef::NoFetchByName:
1764 break;
1766 if (do_resend_request)
1768 GetTabInfoReq * req = (GetTabInfoReq *)signal->getDataPtrSend();
1769 req->senderRef = reference();
1770 req->senderData = senderData;
1771 req->requestType =
1772 GetTabInfoReq::RequestById | GetTabInfoReq::LongSignalConf;
1773 req->tableId = tableId;
1774 sendSignalWithDelay(DBDICT_REF, GSN_GET_TABINFOREQ, signal,
1775 30, GetTabInfoReq::SignalLength);
1776 return;
1778 tabPtr.p->m_state = Table::DROPPED;
1779 tabPtr.p->m_error = errorCode;
1780 completeAllSubscribers(signal, tabPtr);
1781 completeInitTable(signal, tabPtr);
1784 void
1785 Suma::execGET_TABINFO_CONF(Signal* signal){
1786 jamEntry();
1788 CRASH_INSERTION(13006);
1790 if(!assembleFragments(signal)){
1791 return;
1794 GetTabInfoConf* conf = (GetTabInfoConf*)signal->getDataPtr();
1795 Uint32 tableId = conf->tableId;
1796 TablePtr tabPtr;
1797 c_tablePool.getPtr(tabPtr, conf->senderData);
1798 SegmentedSectionPtr ptr(0,0,0);
1799 signal->getSection(ptr, GetTabInfoConf::DICT_TAB_INFO);
1800 ndbrequire(tabPtr.p->parseTable(ptr, *this));
1801 releaseSections(signal);
1803 * We need to gather fragment info
1805 jam();
1806 DihFragCountReq* req = (DihFragCountReq*)signal->getDataPtrSend();
1807 req->m_connectionData = RNIL;
1808 req->m_tableRef = tableId;
1809 req->m_senderData = tabPtr.i;
1810 sendSignal(DBDIH_REF, GSN_DI_FCOUNTREQ, signal,
1811 DihFragCountReq::SignalLength, JBB);
1814 bool
1815 Suma::Table::parseTable(SegmentedSectionPtr ptr,
1816 Suma &suma)
1818 DBUG_ENTER("Suma::Table::parseTable");
1820 SimplePropertiesSectionReader it(ptr, suma.getSectionSegmentPool());
1822 SimpleProperties::UnpackStatus s;
1823 DictTabInfo::Table tableDesc; tableDesc.init();
1824 s = SimpleProperties::unpack(it, &tableDesc,
1825 DictTabInfo::TableMapping,
1826 DictTabInfo::TableMappingSize,
1827 true, true);
1829 jam();
1830 suma.suma_ndbrequire(s == SimpleProperties::Break);
1832 #if 0
1833 ToDo handle this
1834 if(m_schemaVersion != tableDesc.TableVersion){
1835 jam();
1837 release(* this);
1839 // oops wrong schema version in stored tabledesc
1840 // we need to find all subscriptions with old table desc
1841 // and all subscribers to this
1842 // hopefully none
1843 c_tables.release(tabPtr);
1844 DBUG_PRINT("info",("c_tablePool size: %d free: %d",
1845 suma.c_tablePool.getSize(),
1846 suma.c_tablePool.getNoOfFree()));
1847 tabPtr.setNull();
1848 DLHashTable<Suma::Subscription>::Iterator i_subPtr;
1849 c_subscriptions.first(i_subPtr);
1850 SubscriptionPtr subPtr;
1851 for(;!i_subPtr.isNull();c_subscriptions.next(i_subPtr)){
1852 jam();
1853 c_subscriptions.getPtr(subPtr, i_subPtr.curr.i);
1854 SyncRecord* tmp = c_syncPool.getPtr(subPtr.p->m_syncPtrI);
1855 if (tmp == syncPtr_p) {
1856 jam();
1857 continue;
1859 if (subPtr.p->m_tables.get(tableId)) {
1860 jam();
1861 subPtr.p->m_tables.clear(tableId); // remove this old table reference
1862 TableList::DataBufferIterator it;
1863 for(tmp->m_tableList.first(it);!it.isNull();tmp->m_tableList.next(it)) {
1864 jam();
1865 if (*it.data == tableId){
1866 jam();
1867 Uint32 *pdata = it.data;
1868 tmp->m_tableList.next(it);
1869 for(;!it.isNull();tmp->m_tableList.next(it)) {
1870 jam();
1871 *pdata = *it.data;
1872 pdata = it.data;
1874 *pdata = RNIL; // todo remove this last item...
1875 break;
1881 #endif
1883 if(m_attributes.getSize() != 0){
1884 jam();
1885 DBUG_RETURN(true);
1889 * Initialize table object
1891 Uint32 noAttribs = tableDesc.NoOfAttributes;
1892 Uint32 notFixed = (tableDesc.NoOfNullable+tableDesc.NoOfVariable);
1893 m_schemaVersion = tableDesc.TableVersion;
1895 // The attribute buffer
1896 LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, m_attributes);
1898 // Temporary buffer
1899 DataBuffer<15> theRest(suma.c_dataBufferPool);
1901 if(!attrBuf.seize(noAttribs)){
1902 jam();
1903 suma.suma_ndbrequire(false);
1904 DBUG_RETURN(false);
1907 if(!theRest.seize(notFixed)){
1908 jam();
1909 suma.suma_ndbrequire(false);
1910 DBUG_RETURN(false);
1913 DataBuffer<15>::DataBufferIterator attrIt; // Fixed not nullable
1914 DataBuffer<15>::DataBufferIterator restIt; // variable + nullable
1915 attrBuf.first(attrIt);
1916 theRest.first(restIt);
1918 for(Uint32 i = 0; i < noAttribs; i++) {
1919 DictTabInfo::Attribute attrDesc; attrDesc.init();
1920 s = SimpleProperties::unpack(it, &attrDesc,
1921 DictTabInfo::AttributeMapping,
1922 DictTabInfo::AttributeMappingSize,
1923 true, true);
1924 jam();
1925 suma.suma_ndbrequire(s == SimpleProperties::Break);
1927 if (!attrDesc.AttributeNullableFlag
1928 /* && !attrDesc.AttributeVariableFlag */) {
1929 jam();
1930 * attrIt.data = attrDesc.AttributeId;
1931 attrBuf.next(attrIt);
1932 } else {
1933 jam();
1934 * restIt.data = attrDesc.AttributeId;
1935 theRest.next(restIt);
1938 // Move to next attribute
1939 it.next();
1943 * Put the rest in end of attrBuf
1945 theRest.first(restIt);
1946 for(; !restIt.isNull(); theRest.next(restIt)){
1947 * attrIt.data = * restIt.data;
1948 attrBuf.next(attrIt);
1951 theRest.release();
1953 DBUG_RETURN(true);
1956 void
1957 Suma::execDI_FCOUNTREF(Signal* signal)
1959 jamEntry();
1960 DBUG_ENTER("Suma::execDI_FCOUNTREF");
1961 DihFragCountRef * const ref = (DihFragCountRef*)signal->getDataPtr();
1962 switch ((DihFragCountRef::ErrorCode) ref->m_error)
1964 case DihFragCountRef::ErroneousTableState:
1965 jam();
1966 if (ref->m_tableStatus == Dbdih::TabRecord::TS_CREATING)
1968 const Uint32 tableId = ref->m_senderData;
1969 const Uint32 tabPtr_i = ref->m_tableRef;
1970 DihFragCountReq * const req = (DihFragCountReq*)signal->getDataPtrSend();
1972 req->m_connectionData = RNIL;
1973 req->m_tableRef = tabPtr_i;
1974 req->m_senderData = tableId;
1975 sendSignalWithDelay(DBDIH_REF, GSN_DI_FCOUNTREQ, signal,
1976 DihFragCountReq::SignalLength,
1977 DihFragCountReq::RetryInterval);
1978 DBUG_VOID_RETURN;
1980 ndbrequire(false);
1981 default:
1982 ndbrequire(false);
1985 DBUG_VOID_RETURN;
1988 void
1989 Suma::execDI_FCOUNTCONF(Signal* signal)
1991 jamEntry();
1992 DBUG_ENTER("Suma::execDI_FCOUNTCONF");
1993 ndbassert(signal->getNoOfSections() == 0);
1994 DihFragCountConf * const conf = (DihFragCountConf*)signal->getDataPtr();
1995 const Uint32 userPtr = conf->m_connectionData;
1996 const Uint32 fragCount = conf->m_fragmentCount;
1997 const Uint32 tableId = conf->m_tableRef;
1999 ndbrequire(userPtr == RNIL && signal->length() == 5);
2001 TablePtr tabPtr;
2002 tabPtr.i= conf->m_senderData;
2003 ndbrequire((tabPtr.p= c_tablePool.getPtr(tabPtr.i)) != 0);
2004 ndbrequire(tabPtr.p->m_tableId == tableId);
2006 LocalDataBuffer<15> fragBuf(c_dataBufferPool, tabPtr.p->m_fragments);
2007 ndbrequire(fragBuf.getSize() == 0);
2009 tabPtr.p->m_fragCount = fragCount;
2011 signal->theData[0] = RNIL;
2012 signal->theData[1] = tabPtr.i;
2013 signal->theData[2] = tableId;
2014 signal->theData[3] = 0; // Frag no
2015 sendSignal(DBDIH_REF, GSN_DIGETPRIMREQ, signal, 4, JBB);
2017 DBUG_VOID_RETURN;
2020 void
2021 Suma::execDIGETPRIMCONF(Signal* signal){
2022 jamEntry();
2023 DBUG_ENTER("Suma::execDIGETPRIMCONF");
2024 ndbassert(signal->getNoOfSections() == 0);
2026 const Uint32 userPtr = signal->theData[0];
2027 const Uint32 nodeCount = signal->theData[6];
2028 const Uint32 tableId = signal->theData[7];
2029 const Uint32 fragNo = signal->theData[8];
2031 ndbrequire(userPtr == RNIL && signal->length() == 9);
2032 ndbrequire(nodeCount > 0 && nodeCount <= MAX_REPLICAS);
2034 TablePtr tabPtr;
2035 tabPtr.i= signal->theData[1];
2036 ndbrequire((tabPtr.p= c_tablePool.getPtr(tabPtr.i)) != 0);
2037 ndbrequire(tabPtr.p->m_tableId == tableId);
2040 LocalDataBuffer<15> fragBuf(c_dataBufferPool,tabPtr.p->m_fragments);
2043 * Add primary node for fragment to list
2045 FragmentDescriptor fd;
2046 fd.m_fragDesc.m_nodeId = signal->theData[2];
2047 fd.m_fragDesc.m_fragmentNo = fragNo;
2048 signal->theData[2] = fd.m_dummy;
2049 fragBuf.append(&signal->theData[2], 1);
2052 const Uint32 nextFrag = fragNo + 1;
2053 if(nextFrag == tabPtr.p->m_fragCount)
2056 * Complete frag info for table
2057 * table is not up to date
2060 if (tabPtr.p->c_subscribers.isEmpty())
2062 completeInitTable(signal,tabPtr);
2063 DBUG_VOID_RETURN;
2065 tabPtr.p->setupTrigger(signal, *this);
2066 DBUG_VOID_RETURN;
2068 signal->theData[0] = RNIL;
2069 signal->theData[1] = tabPtr.i;
2070 signal->theData[2] = tableId;
2071 signal->theData[3] = nextFrag; // Frag no
2072 sendSignal(DBDIH_REF, GSN_DIGETPRIMREQ, signal, 4, JBB);
2074 DBUG_VOID_RETURN;
2077 #if 0
2078 void
2079 Suma::SyncRecord::completeTableInit(Signal* signal)
2081 jam();
2082 SubscriptionPtr subPtr;
2083 suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
2085 #if PRINT_ONLY
2086 ndbout_c("GSN_SUB_SYNC_CONF (meta)");
2087 #else
2089 suma.releaseSections(signal);
2091 if (m_error) {
2092 SubSyncRef * const ref = (SubSyncRef*)signal->getDataPtrSend();
2093 ref->senderRef = suma.reference();
2094 ref->senderData = subPtr.p->m_senderData;
2095 ref->errorCode = SubSyncRef::Undefined;
2096 suma.sendSignal(subPtr.p->m_senderRef, GSN_SUB_SYNC_REF, signal,
2097 SubSyncRef::SignalLength, JBB);
2098 } else {
2099 SubSyncConf * const conf = (SubSyncConf*)signal->getDataPtrSend();
2100 conf->senderRef = suma.reference();
2101 conf->senderData = subPtr.p->m_senderData;
2102 suma.sendSignal(subPtr.p->m_senderRef, GSN_SUB_SYNC_CONF, signal,
2103 SubSyncConf::SignalLength, JBB);
2105 #endif
2107 #endif
2109 /**********************************************************
2111 * Scan interface
2115 void
2116 Suma::SyncRecord::startScan(Signal* signal)
2118 jam();
2119 DBUG_ENTER("Suma::SyncRecord::startScan");
2122 * Get fraginfo
2124 m_currentTable = 0;
2125 m_currentFragment = 0;
2126 nextScan(signal);
2127 DBUG_VOID_RETURN;
2130 bool
2131 Suma::SyncRecord::getNextFragment(TablePtr * tab,
2132 FragmentDescriptor * fd)
2134 jam();
2135 SubscriptionPtr subPtr;
2136 suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
2137 TableList::DataBufferIterator tabIt;
2138 DataBuffer<15>::DataBufferIterator fragIt;
2140 m_tableList.position(tabIt, m_currentTable);
2141 for(; !tabIt.curr.isNull(); m_tableList.next(tabIt), m_currentTable++)
2143 TablePtr tabPtr;
2144 ndbrequire(suma.c_tables.find(tabPtr, * tabIt.data));
2145 LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool, tabPtr.p->m_fragments);
2147 fragBuf.position(fragIt, m_currentFragment);
2148 for(; !fragIt.curr.isNull(); fragBuf.next(fragIt), m_currentFragment++)
2150 FragmentDescriptor tmp;
2151 tmp.m_dummy = * fragIt.data;
2152 if(tmp.m_fragDesc.m_nodeId == suma.getOwnNodeId()){
2153 * fd = tmp;
2154 * tab = tabPtr;
2155 return true;
2158 m_currentFragment = 0;
2160 tabPtr.p->n_subscribers--;
2161 DBUG_PRINT("info",("Suma::Table[%u]::n_subscribers: %u",
2162 tabPtr.p->m_tableId, tabPtr.p->n_subscribers));
2163 tabPtr.p->checkRelease(suma);
2165 return false;
2168 void
2169 Suma::SyncRecord::nextScan(Signal* signal)
2171 jam();
2172 DBUG_ENTER("Suma::SyncRecord::nextScan");
2173 TablePtr tabPtr;
2174 FragmentDescriptor fd;
2175 SubscriptionPtr subPtr;
2176 if(!getNextFragment(&tabPtr, &fd)){
2177 jam();
2178 completeScan(signal);
2179 DBUG_VOID_RETURN;
2181 suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
2183 DataBuffer<15>::Head head = m_attributeList;
2184 if(head.getSize() == 0){
2185 head = tabPtr.p->m_attributes;
2187 LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, head);
2189 ScanFragReq * req = (ScanFragReq *)signal->getDataPtrSend();
2190 const Uint32 parallelism = 16;
2191 const Uint32 attrLen = 5 + attrBuf.getSize();
2193 req->senderData = ptrI;
2194 req->resultRef = suma.reference();
2195 req->tableId = tabPtr.p->m_tableId;
2196 req->requestInfo = 0;
2197 req->savePointId = 0;
2198 ScanFragReq::setLockMode(req->requestInfo, 0);
2199 ScanFragReq::setHoldLockFlag(req->requestInfo, 1);
2200 ScanFragReq::setKeyinfoFlag(req->requestInfo, 0);
2201 ScanFragReq::setAttrLen(req->requestInfo, attrLen);
2202 req->fragmentNoKeyLen = fd.m_fragDesc.m_fragmentNo;
2203 req->schemaVersion = tabPtr.p->m_schemaVersion;
2204 req->transId1 = 0;
2205 req->transId2 = (SUMA << 20) + (suma.getOwnNodeId() << 8);
2206 req->clientOpPtr = (ptrI << 16);
2207 req->batch_size_rows= parallelism;
2208 req->batch_size_bytes= 0;
2209 suma.sendSignal(DBLQH_REF, GSN_SCAN_FRAGREQ, signal,
2210 ScanFragReq::SignalLength, JBB);
2212 signal->theData[0] = ptrI;
2213 signal->theData[1] = 0;
2214 signal->theData[2] = (SUMA << 20) + (suma.getOwnNodeId() << 8);
2216 // Return all
2217 signal->theData[3] = attrBuf.getSize();
2218 signal->theData[4] = 0;
2219 signal->theData[5] = 0;
2220 signal->theData[6] = 0;
2221 signal->theData[7] = 0;
2223 Uint32 dataPos = 8;
2224 DataBuffer<15>::DataBufferIterator it;
2225 for(attrBuf.first(it); !it.curr.isNull(); attrBuf.next(it)){
2226 AttributeHeader::init(&signal->theData[dataPos++], * it.data, 0);
2227 if(dataPos == 25){
2228 suma.sendSignal(DBLQH_REF, GSN_ATTRINFO, signal, 25, JBB);
2229 dataPos = 3;
2232 if(dataPos != 3){
2233 suma.sendSignal(DBLQH_REF, GSN_ATTRINFO, signal, dataPos, JBB);
2236 m_currentTableId = tabPtr.p->m_tableId;
2237 m_currentNoOfAttributes = attrBuf.getSize();
2239 DBUG_VOID_RETURN;
2243 void
2244 Suma::execSCAN_FRAGREF(Signal* signal){
2245 jamEntry();
2247 // ScanFragRef * const ref = (ScanFragRef*)signal->getDataPtr();
2248 ndbrequire(false);
2251 void
2252 Suma::execSCAN_FRAGCONF(Signal* signal){
2253 jamEntry();
2254 DBUG_ENTER("Suma::execSCAN_FRAGCONF");
2255 ndbassert(signal->getNoOfSections() == 0);
2256 CRASH_INSERTION(13011);
2258 ScanFragConf * const conf = (ScanFragConf*)signal->getDataPtr();
2260 const Uint32 completed = conf->fragmentCompleted;
2261 const Uint32 senderData = conf->senderData;
2262 const Uint32 completedOps = conf->completedOps;
2264 Ptr<SyncRecord> syncPtr;
2265 c_syncPool.getPtr(syncPtr, senderData);
2267 if(completed != 2){
2268 jam();
2270 #if PRINT_ONLY
2271 SubSyncContinueConf * const conf =
2272 (SubSyncContinueConf*)signal->getDataPtrSend();
2273 conf->subscriptionId = subPtr.p->m_subscriptionId;
2274 conf->subscriptionKey = subPtr.p->m_subscriptionKey;
2275 execSUB_SYNC_CONTINUE_CONF(signal);
2276 #else
2277 SubSyncContinueReq * const req = (SubSyncContinueReq*)signal->getDataPtrSend();
2278 req->subscriberData = syncPtr.p->m_senderData;
2279 req->noOfRowsSent = completedOps;
2280 sendSignal(syncPtr.p->m_senderRef, GSN_SUB_SYNC_CONTINUE_REQ, signal,
2281 SubSyncContinueReq::SignalLength, JBB);
2282 #endif
2283 DBUG_VOID_RETURN;
2286 ndbrequire(completedOps == 0);
2288 syncPtr.p->m_currentFragment++;
2289 syncPtr.p->nextScan(signal);
2290 DBUG_VOID_RETURN;
2293 void
2294 Suma::execSUB_SYNC_CONTINUE_CONF(Signal* signal){
2295 jamEntry();
2296 ndbassert(signal->getNoOfSections() == 0);
2298 CRASH_INSERTION(13012);
2300 SubSyncContinueConf * const conf =
2301 (SubSyncContinueConf*)signal->getDataPtr();
2303 SubscriptionPtr subPtr;
2304 Subscription key;
2305 key.m_subscriptionId = conf->subscriptionId;
2306 key.m_subscriptionKey = conf->subscriptionKey;
2308 ndbrequire(c_subscriptions.find(subPtr, key));
2310 ScanFragNextReq * req = (ScanFragNextReq *)signal->getDataPtrSend();
2311 req->senderData = subPtr.p->m_current_sync_ptrI;
2312 req->closeFlag = 0;
2313 req->transId1 = 0;
2314 req->transId2 = (SUMA << 20) + (getOwnNodeId() << 8);
2315 req->batch_size_rows = 16;
2316 req->batch_size_bytes = 0;
2317 sendSignal(DBLQH_REF, GSN_SCAN_NEXTREQ, signal,
2318 ScanFragNextReq::SignalLength, JBB);
2321 void
2322 Suma::SyncRecord::completeScan(Signal* signal, int error)
2324 jam();
2325 DBUG_ENTER("Suma::SyncRecord::completeScan");
2326 // m_tableList.release();
2328 #if PRINT_ONLY
2329 ndbout_c("GSN_SUB_SYNC_CONF (data)");
2330 #else
2331 if (error == 0)
2333 SubSyncConf * const conf = (SubSyncConf*)signal->getDataPtrSend();
2334 conf->senderRef = suma.reference();
2335 conf->senderData = m_senderData;
2336 suma.sendSignal(m_senderRef, GSN_SUB_SYNC_CONF, signal,
2337 SubSyncConf::SignalLength, JBB);
2339 else
2341 SubSyncRef * const ref = (SubSyncRef*)signal->getDataPtrSend();
2342 ref->senderRef = suma.reference();
2343 ref->senderData = m_senderData;
2344 suma.sendSignal(m_senderRef, GSN_SUB_SYNC_REF, signal,
2345 SubSyncRef::SignalLength, JBB);
2347 #endif
2349 release();
2351 Ptr<Subscription> subPtr;
2352 suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
2353 ndbrequire(subPtr.p->m_current_sync_ptrI == ptrI);
2354 subPtr.p->m_current_sync_ptrI = RNIL;
2356 suma.c_syncPool.release(ptrI);
2357 DBUG_PRINT("info",("c_syncPool size: %d free: %d",
2358 suma.c_syncPool.getSize(),
2359 suma.c_syncPool.getNoOfFree()));
2360 DBUG_VOID_RETURN;
2363 void
2364 Suma::execSCAN_HBREP(Signal* signal){
2365 jamEntry();
2366 #if 0
2367 ndbout << "execSCAN_HBREP" << endl << hex;
2368 for(int i = 0; i<signal->length(); i++){
2369 ndbout << signal->theData[i] << " ";
2370 if(((i + 1) % 8) == 0)
2371 ndbout << endl << hex;
2373 ndbout << endl;
2374 #endif
2377 /**********************************************************
2379 * Suma participant interface
2381 * Creation of subscriber
2385 void
2386 Suma::execSUB_START_REQ(Signal* signal){
2387 jamEntry();
2388 ndbassert(signal->getNoOfSections() == 0);
2389 DBUG_ENTER("Suma::execSUB_START_REQ");
2390 SubStartReq * const req = (SubStartReq*)signal->getDataPtr();
2392 CRASH_INSERTION(13013);
2393 Uint32 senderRef = req->senderRef;
2394 Uint32 senderData = req->senderData;
2395 Uint32 subscriberData = req->subscriberData;
2396 Uint32 subscriberRef = req->subscriberRef;
2397 SubscriptionData::Part part = (SubscriptionData::Part)req->part;
2399 Subscription key;
2400 key.m_subscriptionId = req->subscriptionId;
2401 key.m_subscriptionKey = req->subscriptionKey;
2403 if (c_startup.m_restart_server_node_id &&
2404 senderRef != calcSumaBlockRef(c_startup.m_restart_server_node_id))
2407 * only allow "restart_server" Suma's to come through
2408 * for restart purposes
2410 jam();
2411 Uint32 err = c_startup.m_restart_server_node_id != RNIL ? 1405 :
2412 SubStartRef::NF_FakeErrorREF;
2414 sendSubStartRef(signal, err);
2415 DBUG_VOID_RETURN;
2418 SubscriptionPtr subPtr;
2419 if(!c_subscriptions.find(subPtr, key)){
2420 jam();
2421 sendSubStartRef(signal, 1407);
2422 DBUG_VOID_RETURN;
2425 if (subPtr.p->m_state == Subscription::LOCKED) {
2426 jam();
2427 DBUG_PRINT("info",("Locked"));
2428 sendSubStartRef(signal, 1411);
2429 DBUG_VOID_RETURN;
2432 if (subPtr.p->m_state == Subscription::DROPPED &&
2433 c_startup.m_restart_server_node_id == 0) {
2434 jam();
2435 DBUG_PRINT("info",("Dropped"));
2436 sendSubStartRef(signal, 1418);
2437 DBUG_VOID_RETURN;
2440 ndbrequire(subPtr.p->m_state == Subscription::DEFINED ||
2441 c_startup.m_restart_server_node_id);
2443 SubscriberPtr subbPtr;
2444 if(!c_subscriberPool.seize(subbPtr)){
2445 jam();
2446 sendSubStartRef(signal, 1412);
2447 DBUG_VOID_RETURN;
2450 if (c_startup.m_restart_server_node_id == 0 &&
2451 !c_connected_nodes.get(refToNode(subscriberRef)))
2454 jam();
2455 c_subscriberPool.release(subbPtr);
2456 sendSubStartRef(signal, SubStartRef::PartiallyConnected);
2457 DBUG_VOID_RETURN;
2460 DBUG_PRINT("info",("c_subscriberPool size: %d free: %d",
2461 c_subscriberPool.getSize(),
2462 c_subscriberPool.getNoOfFree()));
2464 c_subscriber_nodes.set(refToNode(subscriberRef));
2466 // setup subscription record
2467 if (subPtr.p->m_state == Subscription::DEFINED)
2468 subPtr.p->m_state = Subscription::LOCKED;
2469 // store these here for later use
2470 subPtr.p->m_senderRef = senderRef;
2471 subPtr.p->m_senderData = senderData;
2473 // setup subscriber record
2474 subbPtr.p->m_senderRef = subscriberRef;
2475 subbPtr.p->m_senderData = subscriberData;
2476 subbPtr.p->m_subPtrI= subPtr.i;
2478 DBUG_PRINT("info",("subscriber: %u[%u,%u] subscription: %u[%u,%u] "
2479 "tableId: %u id: %u key: %u",
2480 subbPtr.i, subbPtr.p->m_senderRef, subbPtr.p->m_senderData,
2481 subPtr.i, subPtr.p->m_senderRef, subPtr.p->m_senderData,
2482 subPtr.p->m_tableId,
2483 subPtr.p->m_subscriptionId,subPtr.p->m_subscriptionKey));
2485 TablePtr tabPtr;
2486 switch(part){
2487 case SubscriptionData::MetaData:
2488 jam();
2489 c_metaSubscribers.add(subbPtr);
2490 sendSubStartComplete(signal, subbPtr, 0, part);
2491 DBUG_VOID_RETURN;
2492 case SubscriptionData::TableData:
2493 jam();
2494 initTable(signal,subPtr.p->m_tableId,tabPtr,subbPtr);
2495 tabPtr.p->n_subscribers++;
2496 if (subPtr.p->m_options & Subscription::REPORT_ALL)
2497 tabPtr.p->m_reportAll = true;
2498 DBUG_PRINT("info",("Suma::Table[%u]::n_subscribers: %u",
2499 tabPtr.p->m_tableId, tabPtr.p->n_subscribers));
2500 DBUG_VOID_RETURN;
2502 ndbrequire(false);
2505 void
2506 Suma::sendSubStartComplete(Signal* signal,
2507 SubscriberPtr subbPtr,
2508 Uint32 firstGCI,
2509 SubscriptionData::Part part)
2511 jam();
2512 DBUG_ENTER("Suma::sendSubStartComplete");
2514 SubscriptionPtr subPtr;
2515 c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
2516 ndbrequire(subPtr.p->m_state == Subscription::LOCKED ||
2517 (subPtr.p->m_state == Subscription::DROPPED &&
2518 c_startup.m_restart_server_node_id));
2519 if (subPtr.p->m_state == Subscription::LOCKED)
2521 jam();
2522 subPtr.p->m_state = Subscription::DEFINED;
2524 subPtr.p->n_subscribers++;
2526 DBUG_PRINT("info",("subscriber: %u[%u,%u] subscription: %u[%u,%u] "
2527 "tableId: %u[i=%u] id: %u key: %u",
2528 subbPtr.i, subbPtr.p->m_senderRef, subbPtr.p->m_senderData,
2529 subPtr.i, subPtr.p->m_senderRef, subPtr.p->m_senderData,
2530 subPtr.p->m_tableId, subPtr.p->m_table_ptrI,
2531 subPtr.p->m_subscriptionId,subPtr.p->m_subscriptionKey));
2533 SubStartConf * const conf = (SubStartConf*)signal->getDataPtrSend();
2535 conf->senderRef = reference();
2536 conf->senderData = subPtr.p->m_senderData;
2537 conf->subscriptionId = subPtr.p->m_subscriptionId;
2538 conf->subscriptionKey = subPtr.p->m_subscriptionKey;
2539 conf->firstGCI = firstGCI;
2540 conf->part = (Uint32) part;
2542 DBUG_PRINT("info",("subscriber: %u id: %u key: %u", subbPtr.i,
2543 subPtr.p->m_subscriptionId,subPtr.p->m_subscriptionKey));
2544 sendSignal(subPtr.p->m_senderRef, GSN_SUB_START_CONF, signal,
2545 SubStartConf::SignalLength, JBB);
2547 reportAllSubscribers(signal, NdbDictionary::Event::_TE_SUBSCRIBE,
2548 subPtr, subbPtr);
2550 DBUG_VOID_RETURN;
2553 void
2554 Suma::sendSubStartRef(Signal* signal, Uint32 errCode)
2556 jam();
2557 SubStartRef * ref = (SubStartRef *)signal->getDataPtrSend();
2558 ref->senderRef = reference();
2559 ref->errorCode = errCode;
2560 releaseSections(signal);
2561 sendSignal(signal->getSendersBlockRef(), GSN_SUB_START_REF, signal,
2562 SubStartRef::SignalLength, JBB);
2564 void
2565 Suma::sendSubStartRef(Signal* signal,
2566 SubscriberPtr subbPtr, Uint32 error,
2567 SubscriptionData::Part part)
2569 jam();
2571 SubscriptionPtr subPtr;
2572 c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
2574 ndbrequire(subPtr.p->m_state == Subscription::LOCKED ||
2575 (subPtr.p->m_state == Subscription::DROPPED &&
2576 c_startup.m_restart_server_node_id));
2577 if (subPtr.p->m_state == Subscription::LOCKED)
2579 jam();
2580 subPtr.p->m_state = Subscription::DEFINED;
2583 SubStartRef * ref= (SubStartRef *)signal->getDataPtrSend();
2584 ref->senderRef = reference();
2585 ref->senderData = subPtr.p->m_senderData;
2586 ref->subscriptionId = subPtr.p->m_subscriptionId;
2587 ref->subscriptionKey = subPtr.p->m_subscriptionKey;
2588 ref->part = (Uint32) part;
2589 ref->errorCode = error;
2591 sendSignal(subPtr.p->m_senderRef, GSN_SUB_START_REF, signal,
2592 SubStartRef::SignalLength, JBB);
2595 /**********************************************************
2596 * Suma participant interface
2598 * Stopping and removing of subscriber
2602 void
2603 Suma::execSUB_STOP_REQ(Signal* signal){
2604 jamEntry();
2605 ndbassert(signal->getNoOfSections() == 0);
2606 DBUG_ENTER("Suma::execSUB_STOP_REQ");
2608 CRASH_INSERTION(13019);
2610 SubStopReq * const req = (SubStopReq*)signal->getDataPtr();
2611 Uint32 senderRef = req->senderRef;
2612 Uint32 senderData = req->senderData;
2613 Uint32 subscriberRef = req->subscriberRef;
2614 Uint32 subscriberData = req->subscriberData;
2615 SubscriptionPtr subPtr;
2616 Subscription key;
2617 key.m_subscriptionId = req->subscriptionId;
2618 key.m_subscriptionKey = req->subscriptionKey;
2619 Uint32 part = req->part;
2621 if (key.m_subscriptionKey == 0 &&
2622 key.m_subscriptionId == 0 &&
2623 subscriberData == 0)
2625 SubStopConf* conf = (SubStopConf*)signal->getDataPtrSend();
2627 conf->senderRef = reference();
2628 conf->senderData = senderData;
2629 conf->subscriptionId = key.m_subscriptionId;
2630 conf->subscriptionKey = key.m_subscriptionKey;
2631 conf->subscriberData = subscriberData;
2633 sendSignal(senderRef, GSN_SUB_STOP_CONF, signal,
2634 SubStopConf::SignalLength, JBB);
2636 removeSubscribersOnNode(signal, refToNode(senderRef));
2637 DBUG_VOID_RETURN;
2640 if (c_startup.m_restart_server_node_id &&
2641 senderRef != calcSumaBlockRef(c_startup.m_restart_server_node_id))
2644 * only allow "restart_server" Suma's to come through
2645 * for restart purposes
2647 jam();
2648 Uint32 err = c_startup.m_restart_server_node_id != RNIL ? 1405 :
2649 SubStopRef::NF_FakeErrorREF;
2651 sendSubStopRef(signal, err);
2652 DBUG_VOID_RETURN;
2655 if(!c_subscriptions.find(subPtr, key)){
2656 jam();
2657 DBUG_PRINT("error", ("not found"));
2658 sendSubStopRef(signal, 1407);
2659 DBUG_VOID_RETURN;
2662 if (subPtr.p->m_state == Subscription::LOCKED) {
2663 jam();
2664 DBUG_PRINT("error", ("locked"));
2665 sendSubStopRef(signal, 1411);
2666 DBUG_VOID_RETURN;
2669 ndbrequire(part == SubscriptionData::TableData);
2671 TablePtr tabPtr;
2672 tabPtr.i = subPtr.p->m_table_ptrI;
2673 if (tabPtr.i == RNIL ||
2674 !(tabPtr.p = c_tables.getPtr(tabPtr.i)) ||
2675 tabPtr.p->m_tableId != subPtr.p->m_tableId)
2677 jam();
2678 DBUG_PRINT("error", ("no such table id %u[i=%u]",
2679 subPtr.p->m_tableId, subPtr.p->m_table_ptrI));
2680 sendSubStopRef(signal, 1417);
2681 DBUG_VOID_RETURN;
2684 if (tabPtr.p->m_drop_subbPtr.p != 0) {
2685 jam();
2686 DBUG_PRINT("error", ("table locked"));
2687 sendSubStopRef(signal, 1420);
2688 DBUG_VOID_RETURN;
2691 DBUG_PRINT("info",("subscription: %u tableId: %u[i=%u] id: %u key: %u",
2692 subPtr.i, subPtr.p->m_tableId, tabPtr.i,
2693 subPtr.p->m_subscriptionId,subPtr.p->m_subscriptionKey));
2695 SubscriberPtr subbPtr;
2696 if (senderRef == reference()){
2697 jam();
2698 c_subscriberPool.getPtr(subbPtr, senderData);
2699 ndbrequire(subbPtr.p->m_subPtrI == subPtr.i &&
2700 subbPtr.p->m_senderRef == subscriberRef &&
2701 subbPtr.p->m_senderData == subscriberData);
2702 c_removeDataSubscribers.remove(subbPtr);
2704 else
2706 jam();
2707 LocalDLList<Subscriber>
2708 subscribers(c_subscriberPool,tabPtr.p->c_subscribers);
2710 DBUG_PRINT("info",("search: subscription: %u, ref: %u, data: %d",
2711 subPtr.i, subscriberRef, subscriberData));
2712 for (subscribers.first(subbPtr);!subbPtr.isNull();subscribers.next(subbPtr))
2714 jam();
2715 DBUG_PRINT("info",
2716 ("search: subscription: %u, ref: %u, data: %u, subscriber %u",
2717 subbPtr.p->m_subPtrI, subbPtr.p->m_senderRef,
2718 subbPtr.p->m_senderData, subbPtr.i));
2719 if (subbPtr.p->m_subPtrI == subPtr.i &&
2720 subbPtr.p->m_senderRef == subscriberRef &&
2721 subbPtr.p->m_senderData == subscriberData)
2723 jam();
2724 DBUG_PRINT("info",("found"));
2725 break;
2729 * If we didn't find anyone, send ref
2731 if (subbPtr.isNull()) {
2732 jam();
2733 DBUG_PRINT("error", ("subscriber not found"));
2734 sendSubStopRef(signal, 1407);
2735 DBUG_VOID_RETURN;
2737 subscribers.remove(subbPtr);
2740 subPtr.p->m_senderRef = senderRef; // store ref to requestor
2741 subPtr.p->m_senderData = senderData; // store ref to requestor
2743 tabPtr.p->m_drop_subbPtr = subbPtr;
2745 if (subPtr.p->m_state == Subscription::DEFINED)
2747 jam();
2748 subPtr.p->m_state = Subscription::LOCKED;
2751 if (tabPtr.p->m_state == Table::DROPPED)
2752 // not ALTERED here since trigger must be removed
2754 jam();
2755 tabPtr.p->n_subscribers--;
2756 DBUG_PRINT("info",("Suma::Table[%u]::n_subscribers: %u",
2757 tabPtr.p->m_tableId, tabPtr.p->n_subscribers));
2758 tabPtr.p->checkRelease(*this);
2759 sendSubStopComplete(signal, tabPtr.p->m_drop_subbPtr);
2760 tabPtr.p->m_drop_subbPtr.p = 0;
2762 else
2764 jam();
2765 tabPtr.p->dropTrigger(signal,*this);
2767 DBUG_VOID_RETURN;
2770 void
2771 Suma::sendSubStopComplete(Signal* signal, SubscriberPtr subbPtr)
2773 jam();
2774 DBUG_ENTER("Suma::sendSubStopComplete");
2775 CRASH_INSERTION(13020);
2777 DBUG_PRINT("info",("removed subscriber: %i", subbPtr.i));
2779 SubscriptionPtr subPtr;
2780 c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
2782 Uint32 senderRef= subPtr.p->m_senderRef;
2783 Uint32 senderData= subPtr.p->m_senderData;
2785 subPtr.p->n_subscribers--;
2786 ndbassert( subPtr.p->m_state == Subscription::LOCKED ||
2787 subPtr.p->m_state == Subscription::DROPPED );
2788 if ( subPtr.p->m_state == Subscription::LOCKED )
2790 jam();
2791 subPtr.p->m_state = Subscription::DEFINED;
2792 if (subPtr.p->n_subscribers == 0)
2794 jam();
2795 #if 1
2796 subPtr.p->m_table_ptrI = RNIL;
2797 #else
2798 TablePtr tabPtr;
2799 tabPtr.i = subPtr.p->m_table_ptrI;
2800 if ((tabPtr.p= c_tablePool.getPtr(tabPtr.i)) &&
2801 (tabPtr.p->m_state == Table::DROPPED ||
2802 tabPtr.p->m_state == Table::ALTERED) &&
2803 false)
2805 // last subscriber, and table is dropped
2806 // safe to drop subscription
2807 c_subscriptions.release(subPtr);
2808 DBUG_PRINT("info",("c_subscriptionPool size: %d free: %d",
2809 c_subscriptionPool.getSize(),
2810 c_subscriptionPool.getNoOfFree()));
2812 else
2814 subPtr.p->m_table_ptrI = RNIL;
2816 ndbassert(tabPtr.p != 0);
2817 #endif
2820 else if ( subPtr.p->n_subscribers == 0 )
2822 // subscription is marked to be removed
2823 // and there are no subscribers left
2824 jam();
2825 ndbassert(subPtr.p->m_state == Subscription::DROPPED);
2826 completeSubRemove(subPtr);
2829 // let subscriber know that subscrber is stopped
2831 SubTableData * data = (SubTableData*)signal->getDataPtrSend();
2832 data->gci = m_last_complete_gci + 1; // XXX ???
2833 data->tableId = 0;
2834 data->requestInfo = 0;
2835 SubTableData::setOperation(data->requestInfo,
2836 NdbDictionary::Event::_TE_STOP);
2837 SubTableData::setNdbdNodeId(data->requestInfo,
2838 getOwnNodeId());
2839 data->senderData = subbPtr.p->m_senderData;
2840 sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
2841 SubTableData::SignalLength, JBB);
2844 SubStopConf * const conf = (SubStopConf*)signal->getDataPtrSend();
2846 conf->senderRef= reference();
2847 conf->senderData= senderData;
2849 sendSignal(senderRef, GSN_SUB_STOP_CONF, signal,
2850 SubStopConf::SignalLength, JBB);
2852 c_subscriberPool.release(subbPtr);
2853 DBUG_PRINT("info",("c_subscriberPool size: %d free: %d",
2854 c_subscriberPool.getSize(),
2855 c_subscriberPool.getNoOfFree()));
2857 reportAllSubscribers(signal, NdbDictionary::Event::_TE_UNSUBSCRIBE,
2858 subPtr, subbPtr);
2860 DBUG_VOID_RETURN;
2863 // report new started subscriber to all other subscribers
2864 void
2865 Suma::reportAllSubscribers(Signal *signal,
2866 NdbDictionary::Event::_TableEvent table_event,
2867 SubscriptionPtr subPtr,
2868 SubscriberPtr subbPtr)
2870 SubTableData * data = (SubTableData*)signal->getDataPtrSend();
2872 if (table_event == NdbDictionary::Event::_TE_SUBSCRIBE &&
2873 !c_startup.m_restart_server_node_id)
2875 data->gci = m_last_complete_gci + 1;
2876 data->tableId = subPtr.p->m_tableId;
2877 data->requestInfo = 0;
2878 SubTableData::setOperation(data->requestInfo,
2879 NdbDictionary::Event::_TE_ACTIVE);
2880 SubTableData::setNdbdNodeId(data->requestInfo, getOwnNodeId());
2881 SubTableData::setReqNodeId(data->requestInfo,
2882 refToNode(subbPtr.p->m_senderRef));
2883 data->changeMask = 0;
2884 data->totalLen = 0;
2885 data->senderData = subbPtr.p->m_senderData;
2886 sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
2887 SubTableData::SignalLength, JBB);
2890 if (!(subPtr.p->m_options & Subscription::REPORT_SUBSCRIBE))
2892 return;
2894 if (subPtr.p->n_subscribers == 0)
2896 ndbrequire(table_event != NdbDictionary::Event::_TE_SUBSCRIBE);
2897 return;
2900 //#ifdef VM_TRACE
2901 ndbout_c("reportAllSubscribers subPtr.i: %d subPtr.p->n_subscribers: %d",
2902 subPtr.i, subPtr.p->n_subscribers);
2903 //#endif
2904 data->gci = m_last_complete_gci + 1;
2905 data->tableId = subPtr.p->m_tableId;
2906 data->requestInfo = 0;
2907 SubTableData::setOperation(data->requestInfo, table_event);
2908 SubTableData::setNdbdNodeId(data->requestInfo, getOwnNodeId());
2909 data->changeMask = 0;
2910 data->totalLen = 0;
2912 TablePtr tabPtr;
2913 c_tables.getPtr(tabPtr, subPtr.p->m_table_ptrI);
2914 LocalDLList<Subscriber> subbs(c_subscriberPool, tabPtr.p->c_subscribers);
2915 SubscriberPtr i_subbPtr;
2916 for(subbs.first(i_subbPtr); !i_subbPtr.isNull(); subbs.next(i_subbPtr))
2918 if (i_subbPtr.p->m_subPtrI == subPtr.i)
2920 SubTableData::setReqNodeId(data->requestInfo,
2921 refToNode(subbPtr.p->m_senderRef));
2922 data->senderData = i_subbPtr.p->m_senderData;
2923 sendSignal(i_subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
2924 SubTableData::SignalLength, JBB);
2925 //#ifdef VM_TRACE
2926 ndbout_c("sent %s(%d) to node %d, req_nodeid: %d senderData: %d",
2927 table_event == NdbDictionary::Event::_TE_SUBSCRIBE ?
2928 "SUBSCRIBE" : "UNSUBSCRIBE", (int) table_event,
2929 refToNode(i_subbPtr.p->m_senderRef),
2930 refToNode(subbPtr.p->m_senderRef), data->senderData
2932 //#endif
2933 if (i_subbPtr.i != subbPtr.i)
2935 SubTableData::setReqNodeId(data->requestInfo,
2936 refToNode(i_subbPtr.p->m_senderRef));
2938 data->senderData = subbPtr.p->m_senderData;
2939 sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
2940 SubTableData::SignalLength, JBB);
2941 //#ifdef VM_TRACE
2942 ndbout_c("sent %s(%d) to node %d, req_nodeid: %d senderData: %d",
2943 table_event == NdbDictionary::Event::_TE_SUBSCRIBE ?
2944 "SUBSCRIBE" : "UNSUBSCRIBE", (int) table_event,
2945 refToNode(subbPtr.p->m_senderRef),
2946 refToNode(i_subbPtr.p->m_senderRef), data->senderData
2948 //#endif
2954 void
2955 Suma::sendSubStopRef(Signal* signal, Uint32 errCode)
2957 jam();
2958 DBUG_ENTER("Suma::sendSubStopRef");
2959 SubStopRef * ref = (SubStopRef *)signal->getDataPtrSend();
2960 ref->senderRef = reference();
2961 ref->errorCode = errCode;
2962 sendSignal(signal->getSendersBlockRef(),
2963 GSN_SUB_STOP_REF,
2964 signal,
2965 SubStopRef::SignalLength,
2966 JBB);
2967 DBUG_VOID_RETURN;
2970 /**********************************************************
2972 * Trigger admin interface
2977 Suma::Table::setupTrigger(Signal* signal,
2978 Suma &suma)
2980 jam();
2981 DBUG_ENTER("Suma::Table::setupTrigger");
2983 int ret= 0;
2985 AttributeMask attrMask;
2986 createAttributeMask(attrMask, suma);
2988 for(Uint32 j = 0; j<3; j++)
2990 Uint32 triggerId = (m_schemaVersion << 18) | (j << 16) | m_ptrI;
2991 if(m_hasTriggerDefined[j] == 0)
2993 suma.suma_ndbrequire(m_triggerIds[j] == ILLEGAL_TRIGGER_ID);
2994 DBUG_PRINT("info",("DEFINING trigger on table %u[%u]", m_tableId, j));
2995 CreateTrigReq * const req = (CreateTrigReq*)signal->getDataPtrSend();
2996 req->setUserRef(SUMA_REF);
2997 req->setConnectionPtr(m_ptrI);
2998 req->setTriggerType(TriggerType::SUBSCRIPTION_BEFORE);
2999 req->setTriggerActionTime(TriggerActionTime::TA_DETACHED);
3000 req->setMonitorReplicas(true);
3001 req->setMonitorAllAttributes(j == TriggerEvent::TE_DELETE);
3002 req->setReceiverRef(SUMA_REF);
3003 req->setTriggerId(triggerId);
3004 req->setTriggerEvent((TriggerEvent::Value)j);
3005 req->setTableId(m_tableId);
3006 req->setAttributeMask(attrMask);
3007 req->setReportAllMonitoredAttributes(m_reportAll);
3008 suma.sendSignal(DBTUP_REF, GSN_CREATE_TRIG_REQ,
3009 signal, CreateTrigReq::SignalLength, JBB);
3010 ret= 1;
3012 else
3014 m_hasTriggerDefined[j]++;
3015 DBUG_PRINT("info",("REFCOUNT trigger on table %u[%u] %u",
3016 m_tableId, j, m_hasTriggerDefined[j]));
3019 DBUG_RETURN(ret);
3022 void
3023 Suma::Table::createAttributeMask(AttributeMask& mask,
3024 Suma &suma)
3026 jam();
3027 mask.clear();
3028 DataBuffer<15>::DataBufferIterator it;
3029 LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, m_attributes);
3030 for(attrBuf.first(it); !it.curr.isNull(); attrBuf.next(it)){
3031 mask.set(* it.data);
3035 void
3036 Suma::execCREATE_TRIG_CONF(Signal* signal){
3037 jamEntry();
3038 DBUG_ENTER("Suma::execCREATE_TRIG_CONF");
3039 ndbassert(signal->getNoOfSections() == 0);
3040 CreateTrigConf * const conf = (CreateTrigConf*)signal->getDataPtr();
3041 const Uint32 triggerId = conf->getTriggerId();
3042 Uint32 type = (triggerId >> 16) & 0x3;
3043 Uint32 tableId = conf->getTableId();
3046 DBUG_PRINT("enter", ("type: %u tableId: %u[i=%u==%u]",
3047 type, tableId,conf->getConnectionPtr(),triggerId & 0xFFFF));
3049 TablePtr tabPtr;
3050 c_tables.getPtr(tabPtr, conf->getConnectionPtr());
3051 ndbrequire(tabPtr.p->m_tableId == tableId);
3052 ndbrequire(tabPtr.p->m_state == Table::DEFINING);
3054 ndbrequire(type < 3);
3055 tabPtr.p->m_triggerIds[type] = triggerId;
3056 ndbrequire(tabPtr.p->m_hasTriggerDefined[type] == 0);
3057 tabPtr.p->m_hasTriggerDefined[type] = 1;
3059 if (type == 2)
3061 completeAllSubscribers(signal, tabPtr);
3062 completeInitTable(signal,tabPtr);
3063 DBUG_VOID_RETURN;
3065 DBUG_VOID_RETURN;
3068 void
3069 Suma::execCREATE_TRIG_REF(Signal* signal){
3070 jamEntry();
3071 DBUG_ENTER("Suma::execCREATE_TRIG_REF");
3072 ndbassert(signal->getNoOfSections() == 0);
3073 CreateTrigRef * const ref = (CreateTrigRef*)signal->getDataPtr();
3074 const Uint32 triggerId = ref->getTriggerId();
3075 Uint32 type = (triggerId >> 16) & 0x3;
3076 Uint32 tableId = ref->getTableId();
3078 DBUG_PRINT("enter", ("type: %u tableId: %u[i=%u==%u]",
3079 type, tableId,ref->getConnectionPtr(),triggerId & 0xFFFF));
3081 TablePtr tabPtr;
3082 c_tables.getPtr(tabPtr, ref->getConnectionPtr());
3083 ndbrequire(tabPtr.p->m_tableId == tableId);
3084 ndbrequire(tabPtr.p->m_state == Table::DEFINING);
3086 tabPtr.p->m_error= ref->getErrorCode();
3088 ndbrequire(type < 3);
3090 if (type == 2)
3092 completeAllSubscribers(signal, tabPtr);
3093 completeInitTable(signal,tabPtr);
3094 DBUG_VOID_RETURN;
3097 DBUG_VOID_RETURN;
3100 void
3101 Suma::Table::dropTrigger(Signal* signal,Suma& suma)
3103 jam();
3104 DBUG_ENTER("Suma::dropTrigger");
3106 m_hasOutstandingTriggerReq[0] =
3107 m_hasOutstandingTriggerReq[1] =
3108 m_hasOutstandingTriggerReq[2] = 1;
3109 for(Uint32 j = 0; j<3; j++){
3110 jam();
3111 suma.suma_ndbrequire(m_triggerIds[j] != ILLEGAL_TRIGGER_ID);
3112 if(m_hasTriggerDefined[j] == 1) {
3113 jam();
3115 DropTrigReq * const req = (DropTrigReq*)signal->getDataPtrSend();
3116 req->setConnectionPtr(m_ptrI);
3117 req->setUserRef(SUMA_REF); // Sending to myself
3118 req->setRequestType(DropTrigReq::RT_USER);
3119 req->setTriggerType(TriggerType::SUBSCRIPTION_BEFORE);
3120 req->setTriggerActionTime(TriggerActionTime::TA_DETACHED);
3121 req->setIndexId(RNIL);
3123 req->setTableId(m_tableId);
3124 req->setTriggerId(m_triggerIds[j]);
3125 req->setTriggerEvent((TriggerEvent::Value)j);
3127 DBUG_PRINT("info",("DROPPING trigger %u = %u %u %u on table %u[%u]",
3128 m_triggerIds[j],
3129 TriggerType::SUBSCRIPTION_BEFORE,
3130 TriggerActionTime::TA_DETACHED,
3132 m_tableId, j));
3133 suma.sendSignal(DBTUP_REF, GSN_DROP_TRIG_REQ,
3134 signal, DropTrigReq::SignalLength, JBB);
3135 } else {
3136 jam();
3137 suma.suma_ndbrequire(m_hasTriggerDefined[j] > 1);
3138 runDropTrigger(signal,m_triggerIds[j],suma);
3141 DBUG_VOID_RETURN;
3144 void
3145 Suma::execDROP_TRIG_REF(Signal* signal){
3146 jamEntry();
3147 DBUG_ENTER("Suma::execDROP_TRIG_REF");
3148 ndbassert(signal->getNoOfSections() == 0);
3149 DropTrigRef * const ref = (DropTrigRef*)signal->getDataPtr();
3150 if (ref->getErrorCode() != DropTrigRef::TriggerNotFound)
3152 ndbrequire(false);
3154 TablePtr tabPtr;
3155 c_tables.getPtr(tabPtr, ref->getConnectionPtr());
3156 ndbrequire(ref->getTableId() == tabPtr.p->m_tableId);
3158 tabPtr.p->runDropTrigger(signal, ref->getTriggerId(), *this);
3159 DBUG_VOID_RETURN;
3162 void
3163 Suma::execDROP_TRIG_CONF(Signal* signal){
3164 jamEntry();
3165 DBUG_ENTER("Suma::execDROP_TRIG_CONF");
3166 ndbassert(signal->getNoOfSections() == 0);
3168 DropTrigConf * const conf = (DropTrigConf*)signal->getDataPtr();
3169 TablePtr tabPtr;
3170 c_tables.getPtr(tabPtr, conf->getConnectionPtr());
3171 ndbrequire(conf->getTableId() == tabPtr.p->m_tableId);
3173 tabPtr.p->runDropTrigger(signal, conf->getTriggerId(),*this);
3174 DBUG_VOID_RETURN;
3177 void
3178 Suma::Table::runDropTrigger(Signal* signal,
3179 Uint32 triggerId,
3180 Suma &suma)
3182 jam();
3183 Uint32 type = (triggerId >> 16) & 0x3;
3185 suma.suma_ndbrequire(type < 3);
3186 suma.suma_ndbrequire(m_triggerIds[type] == triggerId);
3187 suma.suma_ndbrequire(m_hasTriggerDefined[type] > 0);
3188 suma.suma_ndbrequire(m_hasOutstandingTriggerReq[type] == 1);
3189 m_hasTriggerDefined[type]--;
3190 m_hasOutstandingTriggerReq[type] = 0;
3191 if (m_hasTriggerDefined[type] == 0)
3193 jam();
3194 m_triggerIds[type] = ILLEGAL_TRIGGER_ID;
3196 if( m_hasOutstandingTriggerReq[0] ||
3197 m_hasOutstandingTriggerReq[1] ||
3198 m_hasOutstandingTriggerReq[2])
3200 // more to come
3201 jam();
3202 return;
3205 #if 0
3206 ndbout_c("trigger completed");
3207 #endif
3210 n_subscribers--;
3211 DBUG_PRINT("info",("Suma::Table[%u]::n_subscribers: %u",
3212 m_tableId, n_subscribers));
3213 checkRelease(suma);
3215 suma.sendSubStopComplete(signal, m_drop_subbPtr);
3216 m_drop_subbPtr.p = 0;
3219 void Suma::suma_ndbrequire(bool v) { ndbrequire(v); }
3221 void
3222 Suma::Table::checkRelease(Suma &suma)
3224 jam();
3225 DBUG_ENTER("Suma::Table::checkRelease");
3226 if (n_subscribers == 0)
3228 jam();
3229 suma.suma_ndbrequire(m_hasTriggerDefined[0] == 0);
3230 suma.suma_ndbrequire(m_hasTriggerDefined[1] == 0);
3231 suma.suma_ndbrequire(m_hasTriggerDefined[2] == 0);
3232 if (!c_subscribers.isEmpty())
3234 LocalDLList<Subscriber>
3235 subscribers(suma.c_subscriberPool,c_subscribers);
3236 SubscriberPtr subbPtr;
3237 for (subscribers.first(subbPtr);!subbPtr.isNull();
3238 subscribers.next(subbPtr))
3240 jam();
3241 DBUG_PRINT("info",("subscriber: %u", subbPtr.i));
3243 suma.suma_ndbrequire(false);
3245 if (!c_syncRecords.isEmpty())
3247 LocalDLList<SyncRecord>
3248 syncRecords(suma.c_syncPool,c_syncRecords);
3249 Ptr<SyncRecord> syncPtr;
3250 for (syncRecords.first(syncPtr);!syncPtr.isNull();
3251 syncRecords.next(syncPtr))
3253 jam();
3254 DBUG_PRINT("info",("syncRecord: %u", syncPtr.i));
3256 suma.suma_ndbrequire(false);
3258 release(suma);
3259 suma.c_tables.remove(m_ptrI);
3260 suma.c_tablePool.release(m_ptrI);
3261 DBUG_PRINT("info",("c_tablePool size: %d free: %d",
3262 suma.c_tablePool.getSize(),
3263 suma.c_tablePool.getNoOfFree()));
3265 else
3267 DBUG_PRINT("info",("n_subscribers: %d", n_subscribers));
3269 DBUG_VOID_RETURN;
3272 /**********************************************************
3273 * Scan data interface
3275 * Assumption: one execTRANSID_AI contains all attr info
3279 #define SUMA_BUF_SZ1 MAX_KEY_SIZE_IN_WORDS + MAX_TUPLE_SIZE_IN_WORDS
3280 #define SUMA_BUF_SZ MAX_ATTRIBUTES_IN_TABLE + SUMA_BUF_SZ1
3282 static Uint32 f_bufferLock = 0;
3283 static Uint32 f_buffer[SUMA_BUF_SZ];
3284 static Uint32 f_trigBufferSize = 0;
3285 static Uint32 b_bufferLock = 0;
3286 static Uint32 b_buffer[SUMA_BUF_SZ];
3287 static Uint32 b_trigBufferSize = 0;
3289 void
3290 Suma::execTRANSID_AI(Signal* signal)
3292 jamEntry();
3293 DBUG_ENTER("Suma::execTRANSID_AI");
3295 CRASH_INSERTION(13015);
3296 TransIdAI * const data = (TransIdAI*)signal->getDataPtr();
3297 const Uint32 opPtrI = data->connectPtr;
3298 const Uint32 length = signal->length() - 3;
3300 if(f_bufferLock == 0){
3301 f_bufferLock = opPtrI;
3302 } else {
3303 ndbrequire(f_bufferLock == opPtrI);
3306 Ptr<SyncRecord> syncPtr;
3307 c_syncPool.getPtr(syncPtr, (opPtrI >> 16));
3309 Uint32 sum = 0;
3310 Uint32 * dst = f_buffer + MAX_ATTRIBUTES_IN_TABLE;
3311 Uint32 * headers = f_buffer;
3312 const Uint32 * src = &data->attrData[0];
3313 const Uint32 * const end = &src[length];
3315 const Uint32 attribs = syncPtr.p->m_currentNoOfAttributes;
3316 for(Uint32 i = 0; i<attribs; i++){
3317 Uint32 tmp = * src++;
3318 * headers++ = tmp;
3319 Uint32 len = AttributeHeader::getDataSize(tmp);
3321 memcpy(dst, src, 4 * len);
3322 dst += len;
3323 src += len;
3324 sum += len;
3327 ndbrequire(src == end);
3330 * Send data to subscriber
3332 LinearSectionPtr ptr[3];
3333 ptr[0].p = f_buffer;
3334 ptr[0].sz = attribs;
3336 ptr[1].p = f_buffer + MAX_ATTRIBUTES_IN_TABLE;
3337 ptr[1].sz = sum;
3339 SubscriptionPtr subPtr;
3340 c_subscriptions.getPtr(subPtr, syncPtr.p->m_subscriptionPtrI);
3343 * Initialize signal
3345 SubTableData * sdata = (SubTableData*)signal->getDataPtrSend();
3346 Uint32 ref = subPtr.p->m_senderRef;
3347 sdata->tableId = syncPtr.p->m_currentTableId;
3348 sdata->senderData = subPtr.p->m_senderData;
3349 sdata->requestInfo = 0;
3350 SubTableData::setOperation(sdata->requestInfo,
3351 NdbDictionary::Event::_TE_SCAN); // Scan
3352 sdata->gci = 0; // Undefined
3353 #if PRINT_ONLY
3354 ndbout_c("GSN_SUB_TABLE_DATA (scan) #attr: %d len: %d", attribs, sum);
3355 #else
3356 sendSignal(ref,
3357 GSN_SUB_TABLE_DATA,
3358 signal,
3359 SubTableData::SignalLength, JBB,
3360 ptr, 2);
3361 #endif
3364 * Reset f_bufferLock
3366 f_bufferLock = 0;
3368 DBUG_VOID_RETURN;
3371 /**********************************************************
3373 * Trigger data interface
3377 void
3378 Suma::execTRIG_ATTRINFO(Signal* signal)
3380 jamEntry();
3381 DBUG_ENTER("Suma::execTRIG_ATTRINFO");
3383 CRASH_INSERTION(13016);
3384 TrigAttrInfo* const trg = (TrigAttrInfo*)signal->getDataPtr();
3385 const Uint32 trigId = trg->getTriggerId();
3387 const Uint32 dataLen = signal->length() - TrigAttrInfo::StaticLength;
3389 if(trg->getAttrInfoType() == TrigAttrInfo::BEFORE_VALUES){
3390 jam();
3392 ndbrequire(b_bufferLock == trigId);
3394 memcpy(b_buffer + b_trigBufferSize, trg->getData(), 4 * dataLen);
3395 b_trigBufferSize += dataLen;
3397 // printf("before values %u %u %u\n",trigId, dataLen, b_trigBufferSize);
3398 } else {
3399 jam();
3401 if(f_bufferLock == 0){
3402 f_bufferLock = trigId;
3403 f_trigBufferSize = 0;
3404 b_bufferLock = trigId;
3405 b_trigBufferSize = 0;
3406 } else {
3407 ndbrequire(f_bufferLock == trigId);
3410 memcpy(f_buffer + f_trigBufferSize, trg->getData(), 4 * dataLen);
3411 f_trigBufferSize += dataLen;
3415 DBUG_VOID_RETURN;
3418 #ifdef NODEFAIL_DEBUG2
3419 static int theCounts[64] = {0};
3420 #endif
3422 Uint32
3423 Suma::get_responsible_node(Uint32 bucket) const
3425 // id will contain id to responsible suma or
3426 // RNIL if we don't have nodegroup info yet
3428 jam();
3429 Uint32 node;
3430 const Bucket* ptr= c_buckets + bucket;
3431 for(Uint32 i = 0; i<MAX_REPLICAS; i++)
3433 node= ptr->m_nodes[i];
3434 if(c_alive_nodes.get(node))
3436 break;
3441 #ifdef NODEFAIL_DEBUG2
3442 if(node != 0)
3444 theCounts[node]++;
3445 ndbout_c("Suma:responsible n=%u, D=%u, id = %u, count=%u",
3446 n,D, id, theCounts[node]);
3448 #endif
3449 return node;
3452 Uint32
3453 Suma::get_responsible_node(Uint32 bucket, const NdbNodeBitmask& mask) const
3455 jam();
3456 Uint32 node;
3457 const Bucket* ptr= c_buckets + bucket;
3458 for(Uint32 i = 0; i<MAX_REPLICAS; i++)
3460 node= ptr->m_nodes[i];
3461 if(mask.get(node))
3463 return node;
3467 return 0;
3470 bool
3471 Suma::check_switchover(Uint32 bucket, Uint32 gci)
3473 const Uint32 send_mask = (Bucket::BUCKET_STARTING | Bucket::BUCKET_TAKEOVER);
3474 bool send = c_buckets[bucket].m_state & send_mask;
3475 ndbassert(m_switchover_buckets.get(bucket));
3476 if(unlikely(gci >= c_buckets[bucket].m_switchover_gci))
3478 return send;
3480 return !send;
3483 static
3484 Uint32
3485 reformat(Signal* signal, LinearSectionPtr ptr[3],
3486 Uint32 * src_1, Uint32 sz_1,
3487 Uint32 * src_2, Uint32 sz_2)
3489 Uint32 noOfAttrs = 0, dataLen = 0;
3490 Uint32 * headers = signal->theData + 25;
3491 Uint32 * dst = signal->theData + 25 + MAX_ATTRIBUTES_IN_TABLE;
3493 ptr[0].p = headers;
3494 ptr[1].p = dst;
3496 while(sz_1 > 0){
3497 jam();
3498 Uint32 tmp = * src_1 ++;
3499 * headers ++ = tmp;
3500 Uint32 len = AttributeHeader::getDataSize(tmp);
3501 memcpy(dst, src_1, 4 * len);
3502 dst += len;
3503 src_1 += len;
3505 noOfAttrs++;
3506 dataLen += len;
3507 sz_1 -= (1 + len);
3509 assert(sz_1 == 0);
3511 ptr[0].sz = noOfAttrs;
3512 ptr[1].sz = dataLen;
3514 ptr[2].p = src_2;
3515 ptr[2].sz = sz_2;
3517 return sz_2 > 0 ? 3 : 2;
3520 void
3521 Suma::execFIRE_TRIG_ORD(Signal* signal)
3523 jamEntry();
3524 DBUG_ENTER("Suma::execFIRE_TRIG_ORD");
3525 ndbassert(signal->getNoOfSections() == 0);
3527 CRASH_INSERTION(13016);
3528 FireTrigOrd* const trg = (FireTrigOrd*)signal->getDataPtr();
3529 const Uint32 trigId = trg->getTriggerId();
3530 const Uint32 hashValue = trg->getHashValue();
3531 const Uint32 gci = trg->getGCI();
3532 const Uint32 event = trg->getTriggerEvent();
3533 const Uint32 any_value = trg->getAnyValue();
3534 TablePtr tabPtr;
3535 tabPtr.i = trigId & 0xFFFF;
3537 DBUG_PRINT("enter",("tabPtr.i=%u", tabPtr.i));
3538 ndbrequire(f_bufferLock == trigId);
3540 * Reset f_bufferLock
3542 f_bufferLock = 0;
3543 b_bufferLock = 0;
3545 ndbrequire((tabPtr.p = c_tablePool.getPtr(tabPtr.i)) != 0);
3546 Uint32 tableId = tabPtr.p->m_tableId;
3548 Uint32 bucket= hashValue % c_no_of_buckets;
3549 m_max_seen_gci = (gci > m_max_seen_gci ? gci : m_max_seen_gci);
3550 if(m_active_buckets.get(bucket) ||
3551 (m_switchover_buckets.get(bucket) && (check_switchover(bucket, gci))))
3553 m_max_sent_gci = (gci > m_max_sent_gci ? gci : m_max_sent_gci);
3554 Uint32 sz = trg->getNoOfPrimaryKeyWords()+trg->getNoOfAfterValueWords();
3555 ndbrequire(sz == f_trigBufferSize);
3557 LinearSectionPtr ptr[3];
3558 const Uint32 nptr= reformat(signal, ptr,
3559 f_buffer, sz, b_buffer, b_trigBufferSize);
3560 Uint32 ptrLen= 0;
3561 for(Uint32 i =0; i < nptr; i++)
3562 ptrLen+= ptr[i].sz;
3564 * Signal to subscriber(s)
3566 ndbrequire((tabPtr.p = c_tablePool.getPtr(tabPtr.i)) != 0);
3568 SubTableData * data = (SubTableData*)signal->getDataPtrSend();//trg;
3569 data->gci = gci;
3570 data->tableId = tableId;
3571 data->requestInfo = 0;
3572 SubTableData::setOperation(data->requestInfo, event);
3573 data->logType = 0;
3574 data->anyValue = any_value;
3575 data->totalLen = ptrLen;
3578 LocalDLList<Subscriber> list(c_subscriberPool,tabPtr.p->c_subscribers);
3579 SubscriberPtr subbPtr;
3580 for(list.first(subbPtr); !subbPtr.isNull(); list.next(subbPtr))
3582 DBUG_PRINT("info",("GSN_SUB_TABLE_DATA to node %d",
3583 refToNode(subbPtr.p->m_senderRef)));
3584 data->senderData = subbPtr.p->m_senderData;
3585 sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
3586 SubTableData::SignalLength, JBB, ptr, nptr);
3590 else
3592 const uint buffer_header_sz = 4;
3593 Uint32* dst;
3594 Uint32 sz = f_trigBufferSize + b_trigBufferSize + buffer_header_sz;
3595 if((dst = get_buffer_ptr(signal, bucket, gci, sz)))
3597 * dst++ = tableId;
3598 * dst++ = tabPtr.p->m_schemaVersion;
3599 * dst++ = (event << 16) | f_trigBufferSize;
3600 * dst++ = any_value;
3601 memcpy(dst, f_buffer, f_trigBufferSize << 2);
3602 dst += f_trigBufferSize;
3603 memcpy(dst, b_buffer, b_trigBufferSize << 2);
3607 DBUG_VOID_RETURN;
3610 void
3611 Suma::execSUB_GCP_COMPLETE_REP(Signal* signal)
3613 jamEntry();
3614 ndbassert(signal->getNoOfSections() == 0);
3616 SubGcpCompleteRep * rep = (SubGcpCompleteRep*)signal->getDataPtrSend();
3617 Uint32 gci = m_last_complete_gci = rep->gci;
3618 m_max_seen_gci = (gci > m_max_seen_gci ? gci : m_max_seen_gci);
3623 if(!m_switchover_buckets.isclear())
3625 NdbNodeBitmask takeover_nodes;
3626 NdbNodeBitmask handover_nodes;
3627 Uint32 i = m_switchover_buckets.find(0);
3628 for(; i != Bucket_mask::NotFound; i = m_switchover_buckets.find(i + 1))
3630 if(c_buckets[i].m_switchover_gci == gci)
3632 Uint32 state = c_buckets[i].m_state;
3633 m_switchover_buckets.clear(i);
3634 printf("switchover complete bucket %d state: %x", i, state);
3635 if(state & Bucket::BUCKET_STARTING)
3638 * NR case
3640 m_active_buckets.set(i);
3641 c_buckets[i].m_state &= ~(Uint32)Bucket::BUCKET_STARTING;
3642 ndbout_c("starting");
3643 m_gcp_complete_rep_count = 1;
3645 else if(state & Bucket::BUCKET_TAKEOVER)
3648 * NF case
3650 Bucket* bucket= c_buckets + i;
3651 Page_pos pos= bucket->m_buffer_head;
3652 ndbrequire(pos.m_max_gci < gci);
3654 Buffer_page* page= (Buffer_page*)
3655 m_tup->c_page_pool.getPtr(pos.m_page_id);
3656 ndbout_c("takeover %d", pos.m_page_id);
3657 page->m_max_gci = pos.m_max_gci;
3658 page->m_words_used = pos.m_page_pos;
3659 page->m_next_page = RNIL;
3660 memset(&bucket->m_buffer_head, 0, sizeof(bucket->m_buffer_head));
3661 bucket->m_buffer_head.m_page_id = RNIL;
3662 bucket->m_buffer_head.m_page_pos = Buffer_page::DATA_WORDS + 1;
3664 m_active_buckets.set(i);
3665 c_buckets[i].m_state &= ~(Uint32)Bucket::BUCKET_TAKEOVER;
3666 takeover_nodes.set(c_buckets[i].m_switchover_node);
3668 else
3671 * NR, living node
3673 ndbrequire(state & Bucket::BUCKET_HANDOVER);
3674 c_buckets[i].m_state &= ~(Uint32)Bucket::BUCKET_HANDOVER;
3675 handover_nodes.set(c_buckets[i].m_switchover_node);
3676 ndbout_c("handover");
3680 ndbassert(handover_nodes.count() == 0 ||
3681 m_gcp_complete_rep_count > handover_nodes.count());
3682 m_gcp_complete_rep_count -= handover_nodes.count();
3683 m_gcp_complete_rep_count += takeover_nodes.count();
3685 if(getNodeState().startLevel == NodeState::SL_STARTING &&
3686 m_switchover_buckets.isclear() &&
3687 c_startup.m_handover_nodes.isclear())
3689 sendSTTORRY(signal);
3693 if(ERROR_INSERTED(13010))
3695 CLEAR_ERROR_INSERT_VALUE;
3696 ndbout_c("Don't send GCP_COMPLETE_REP(%d)", gci);
3697 return;
3701 * Signal to subscribers
3703 rep->gci = gci;
3704 rep->senderRef = reference();
3705 rep->gcp_complete_rep_count = m_gcp_complete_rep_count;
3707 if(m_gcp_complete_rep_count && !c_subscriber_nodes.isclear())
3709 CRASH_INSERTION(13033);
3711 NodeReceiverGroup rg(API_CLUSTERMGR, c_subscriber_nodes);
3712 sendSignal(rg, GSN_SUB_GCP_COMPLETE_REP, signal,
3713 SubGcpCompleteRep::SignalLength, JBB);
3715 Ptr<Gcp_record> gcp;
3716 if(c_gcp_list.seize(gcp))
3718 gcp.p->m_gci = gci;
3719 gcp.p->m_subscribers = c_subscriber_nodes;
3724 * Add GCP COMPLETE REP to buffer
3726 for(Uint32 i = 0; i<c_no_of_buckets; i++)
3728 if(m_active_buckets.get(i))
3729 continue;
3731 if (!c_subscriber_nodes.isclear())
3733 //Uint32* dst;
3734 get_buffer_ptr(signal, i, gci, 0);
3738 if(gci == m_out_of_buffer_gci)
3740 infoEvent("Reenable event buffer");
3741 m_out_of_buffer_gci = 0;
3745 void
3746 Suma::execCREATE_TAB_CONF(Signal *signal)
3748 jamEntry();
3749 DBUG_ENTER("Suma::execCREATE_TAB_CONF");
3751 #if 0
3752 CreateTabConf * const conf = (CreateTabConf*)signal->getDataPtr();
3753 Uint32 tableId = conf->senderData;
3755 TablePtr tabPtr;
3756 initTable(signal,tableId,tabPtr);
3757 #endif
3758 DBUG_VOID_RETURN;
3761 void
3762 Suma::execDROP_TAB_CONF(Signal *signal)
3764 jamEntry();
3765 DBUG_ENTER("Suma::execDROP_TAB_CONF");
3766 ndbassert(signal->getNoOfSections() == 0);
3768 DropTabConf * const conf = (DropTabConf*)signal->getDataPtr();
3769 Uint32 senderRef= conf->senderRef;
3770 Uint32 tableId= conf->tableId;
3772 TablePtr tabPtr;
3773 if (!c_tables.find(tabPtr, tableId) ||
3774 tabPtr.p->m_state == Table::DROPPED ||
3775 tabPtr.p->m_state == Table::ALTERED)
3777 DBUG_VOID_RETURN;
3780 DBUG_PRINT("info",("drop table id: %d[i=%u]", tableId, tabPtr.i));
3782 tabPtr.p->m_state = Table::DROPPED;
3783 for (int j= 0; j < 3; j++)
3785 if (!tabPtr.p->m_hasOutstandingTriggerReq[j])
3787 tabPtr.p->m_hasTriggerDefined[j] = 0;
3788 tabPtr.p->m_hasOutstandingTriggerReq[j] = 0;
3789 tabPtr.p->m_triggerIds[j] = ILLEGAL_TRIGGER_ID;
3791 else
3792 tabPtr.p->m_hasTriggerDefined[j] = 1;
3794 if (senderRef == 0)
3796 DBUG_VOID_RETURN;
3798 // dict coordinator sends info to API
3800 SubTableData * data = (SubTableData*)signal->getDataPtrSend();
3801 data->gci = m_last_complete_gci+1;
3802 data->tableId = tableId;
3803 data->requestInfo = 0;
3804 SubTableData::setOperation(data->requestInfo,NdbDictionary::Event::_TE_DROP);
3805 SubTableData::setReqNodeId(data->requestInfo, refToNode(senderRef));
3808 LocalDLList<Subscriber> subbs(c_subscriberPool,tabPtr.p->c_subscribers);
3809 SubscriberPtr subbPtr;
3810 for(subbs.first(subbPtr);!subbPtr.isNull();subbs.next(subbPtr))
3812 jam();
3814 * get subscription ptr for this subscriber
3816 SubscriptionPtr subPtr;
3817 c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
3818 if(subPtr.p->m_subscriptionType != SubCreateReq::TableEvent) {
3819 jam();
3820 continue;
3821 //continue in for-loop if the table is not part of
3822 //the subscription. Otherwise, send data to subscriber.
3824 data->senderData= subbPtr.p->m_senderData;
3825 sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
3826 SubTableData::SignalLength, JBB);
3827 DBUG_PRINT("info",("sent to subscriber %d", subbPtr.i));
3830 DBUG_VOID_RETURN;
3833 static Uint32 b_dti_buf[MAX_WORDS_META_FILE];
3835 void
3836 Suma::execALTER_TAB_REQ(Signal *signal)
3838 jamEntry();
3839 DBUG_ENTER("Suma::execALTER_TAB_REQ");
3840 ndbassert(signal->getNoOfSections() == 1);
3842 AlterTabReq * const req = (AlterTabReq*)signal->getDataPtr();
3843 Uint32 senderRef= req->senderRef;
3844 Uint32 tableId= req->tableId;
3845 Uint32 changeMask= req->changeMask;
3846 TablePtr tabPtr;
3847 if (!c_tables.find(tabPtr, tableId) ||
3848 tabPtr.p->m_state == Table::DROPPED ||
3849 tabPtr.p->m_state == Table::ALTERED)
3851 DBUG_VOID_RETURN;
3854 DBUG_PRINT("info",("alter table id: %d[i=%u]", tableId, tabPtr.i));
3855 Table::State old_state = tabPtr.p->m_state;
3856 tabPtr.p->m_state = Table::ALTERED;
3857 // triggers must be removed, waiting for sub stop req for that
3859 if (senderRef == 0)
3861 DBUG_VOID_RETURN;
3863 // dict coordinator sends info to API
3865 // Copy DICT_TAB_INFO to local buffer
3866 SegmentedSectionPtr tabInfoPtr;
3867 signal->getSection(tabInfoPtr, AlterTabReq::DICT_TAB_INFO);
3868 #ifndef DBUG_OFF
3869 ndbout_c("DICT_TAB_INFO in SUMA, tabInfoPtr.sz = %d", tabInfoPtr.sz);
3870 SimplePropertiesSectionReader reader(tabInfoPtr, getSectionSegmentPool());
3871 reader.printAll(ndbout);
3872 #endif
3873 copy(b_dti_buf, tabInfoPtr);
3874 LinearSectionPtr ptr[3];
3875 ptr[0].p = b_dti_buf;
3876 ptr[0].sz = tabInfoPtr.sz;
3878 releaseSections(signal);
3880 SubTableData * data = (SubTableData*)signal->getDataPtrSend();
3881 data->gci = m_last_complete_gci+1;
3882 data->tableId = tableId;
3883 data->requestInfo = 0;
3884 SubTableData::setOperation(data->requestInfo,
3885 NdbDictionary::Event::_TE_ALTER);
3886 SubTableData::setReqNodeId(data->requestInfo, refToNode(senderRef));
3887 data->logType = 0;
3888 data->changeMask = changeMask;
3889 data->totalLen = tabInfoPtr.sz;
3891 LocalDLList<Subscriber> subbs(c_subscriberPool,tabPtr.p->c_subscribers);
3892 SubscriberPtr subbPtr;
3893 for(subbs.first(subbPtr);!subbPtr.isNull();subbs.next(subbPtr))
3895 jam();
3897 * get subscription ptr for this subscriber
3899 SubscriptionPtr subPtr;
3900 c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
3901 if(subPtr.p->m_subscriptionType != SubCreateReq::TableEvent) {
3902 jam();
3903 continue;
3904 //continue in for-loop if the table is not part of
3905 //the subscription. Otherwise, send data to subscriber.
3908 data->senderData= subbPtr.p->m_senderData;
3909 Callback c = { 0, 0 };
3910 sendFragmentedSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
3911 SubTableData::SignalLength, JBB, ptr, 1, c);
3912 DBUG_PRINT("info",("sent to subscriber %d", subbPtr.i));
3915 if (AlterTableReq::getFrmFlag(changeMask))
3917 // Frm changes only are handled on-line
3918 tabPtr.p->m_state = old_state;
3920 DBUG_VOID_RETURN;
3923 void
3924 Suma::execSUB_GCP_COMPLETE_ACK(Signal* signal)
3926 jamEntry();
3927 ndbassert(signal->getNoOfSections() == 0);
3929 SubGcpCompleteAck * const ack = (SubGcpCompleteAck*)signal->getDataPtr();
3930 Uint32 gci = ack->rep.gci;
3931 Uint32 senderRef = ack->rep.senderRef;
3932 m_max_seen_gci = (gci > m_max_seen_gci ? gci : m_max_seen_gci);
3934 if (refToBlock(senderRef) == SUMA) {
3935 jam();
3936 // Ack from other SUMA
3937 Uint32 nodeId= refToNode(senderRef);
3938 for(Uint32 i = 0; i<c_no_of_buckets; i++)
3940 if(m_active_buckets.get(i) ||
3941 (m_switchover_buckets.get(i) && (check_switchover(i, gci))) ||
3942 (!m_switchover_buckets.get(i) && get_responsible_node(i) == nodeId))
3944 release_gci(signal, i, gci);
3947 return;
3950 // Ack from User and not an ack from other SUMA, redistribute in nodegroup
3952 Uint32 nodeId = refToNode(senderRef);
3954 jam();
3955 Ptr<Gcp_record> gcp;
3956 for(c_gcp_list.first(gcp); !gcp.isNull(); c_gcp_list.next(gcp))
3958 if(gcp.p->m_gci == gci)
3960 gcp.p->m_subscribers.clear(nodeId);
3961 if(!gcp.p->m_subscribers.isclear())
3963 jam();
3964 return;
3966 break;
3970 if(gcp.isNull())
3972 ndbout_c("ACK wo/ gcp record (gci: %d)", gci);
3974 else
3976 c_gcp_list.release(gcp);
3979 CRASH_INSERTION(13011);
3980 if(ERROR_INSERTED(13012))
3982 CLEAR_ERROR_INSERT_VALUE;
3983 ndbout_c("Don't redistribute SUB_GCP_COMPLETE_ACK");
3984 return;
3987 ack->rep.senderRef = reference();
3988 NodeReceiverGroup rg(SUMA, c_nodes_in_nodegroup_mask);
3989 sendSignal(rg, GSN_SUB_GCP_COMPLETE_ACK, signal,
3990 SubGcpCompleteAck::SignalLength, JBB);
3993 /**************************************************************
3995 * Removing subscription
3999 void
4000 Suma::execSUB_REMOVE_REQ(Signal* signal)
4002 jamEntry();
4003 DBUG_ENTER("Suma::execSUB_REMOVE_REQ");
4004 ndbassert(signal->getNoOfSections() == 0);
4006 CRASH_INSERTION(13021);
4008 const SubRemoveReq req = *(SubRemoveReq*)signal->getDataPtr();
4009 SubscriptionPtr subPtr;
4010 Subscription key;
4011 key.m_subscriptionId = req.subscriptionId;
4012 key.m_subscriptionKey = req.subscriptionKey;
4014 DBUG_PRINT("enter",("key.m_subscriptionId: %u, key.m_subscriptionKey: %u",
4015 key.m_subscriptionId, key.m_subscriptionKey));
4017 if(!c_subscriptions.find(subPtr, key))
4019 jam();
4020 DBUG_PRINT("info",("Not found"));
4021 sendSubRemoveRef(signal, req, 1407);
4022 DBUG_VOID_RETURN;
4024 if (subPtr.p->m_state == Subscription::LOCKED)
4027 * we are currently setting up triggers etc. for this event
4029 jam();
4030 sendSubRemoveRef(signal, req, 1413);
4031 DBUG_VOID_RETURN;
4033 if (subPtr.p->m_state == Subscription::DROPPED)
4036 * already dropped
4038 jam();
4039 sendSubRemoveRef(signal, req, 1419);
4040 DBUG_VOID_RETURN;
4043 ndbrequire(subPtr.p->m_state == Subscription::DEFINED);
4044 DBUG_PRINT("info",("n_subscribers: %u", subPtr.p->n_subscribers));
4046 if (subPtr.p->n_subscribers == 0)
4048 // no subscribers on the subscription
4049 // remove it
4050 jam();
4051 completeSubRemove(subPtr);
4053 else
4055 // subscribers left on the subscription
4056 // mark it to be removed once all subscribers
4057 // are removed
4058 jam();
4059 subPtr.p->m_state = Subscription::DROPPED;
4062 SubRemoveConf * const conf = (SubRemoveConf*)signal->getDataPtrSend();
4063 conf->senderRef = reference();
4064 conf->senderData = req.senderData;
4065 conf->subscriptionId = req.subscriptionId;
4066 conf->subscriptionKey = req.subscriptionKey;
4068 sendSignal(req.senderRef, GSN_SUB_REMOVE_CONF, signal,
4069 SubRemoveConf::SignalLength, JBB);
4071 DBUG_VOID_RETURN;
4074 void
4075 Suma::completeSubRemove(SubscriptionPtr subPtr)
4077 DBUG_ENTER("Suma::completeSubRemove");
4078 //Uint32 subscriptionId = subPtr.p->m_subscriptionId;
4079 //Uint32 subscriptionKey = subPtr.p->m_subscriptionKey;
4081 c_subscriptions.release(subPtr);
4082 DBUG_PRINT("info",("c_subscriptionPool size: %d free: %d",
4083 c_subscriptionPool.getSize(),
4084 c_subscriptionPool.getNoOfFree()));
4087 * I was the last subscription to be remove so clear c_tables
4089 #if 0
4090 ndbout_c("c_subscriptionPool.getSize() %d c_subscriptionPool.getNoOfFree()%d",
4091 c_subscriptionPool.getSize(),c_subscriptionPool.getNoOfFree());
4092 #endif
4094 if(c_subscriptionPool.getSize() == c_subscriptionPool.getNoOfFree()) {
4095 jam();
4096 #if 0
4097 ndbout_c("SUB_REMOVE_REQ:Clearing c_tables");
4098 #endif
4099 int count= 0;
4100 KeyTable<Table>::Iterator it;
4101 for(c_tables.first(it); !it.isNull(); )
4103 // ndbrequire(false);
4105 DBUG_PRINT("error",("trailing table id: %d[i=%d] n_subscribers: %d m_state: %d",
4106 it.curr.p->m_tableId,
4107 it.curr.p->m_ptrI,
4108 it.curr.p->n_subscribers,
4109 it.curr.p->m_state));
4111 LocalDLList<Subscriber> subbs(c_subscriberPool,it.curr.p->c_subscribers);
4112 SubscriberPtr subbPtr;
4113 for(subbs.first(subbPtr);!subbPtr.isNull();subbs.next(subbPtr))
4115 DBUG_PRINT("error",("subscriber %d, m_subPtrI: %d", subbPtr.i, subbPtr.p->m_subPtrI));
4118 it.curr.p->release(* this);
4119 TablePtr tabPtr = it.curr;
4120 c_tables.next(it);
4121 c_tables.remove(tabPtr);
4122 c_tablePool.release(tabPtr);
4123 DBUG_PRINT("info",("c_tablePool size: %d free: %d",
4124 c_tablePool.getSize(),
4125 c_tablePool.getNoOfFree()));
4126 count++;
4128 DBUG_ASSERT(count == 0);
4130 DBUG_VOID_RETURN;
4133 void
4134 Suma::sendSubRemoveRef(Signal* signal, const SubRemoveReq& req,
4135 Uint32 errCode)
4137 jam();
4138 DBUG_ENTER("Suma::sendSubRemoveRef");
4139 SubRemoveRef * ref = (SubRemoveRef *)signal->getDataPtrSend();
4140 ref->senderRef = reference();
4141 ref->senderData = req.senderData;
4142 ref->subscriptionId = req.subscriptionId;
4143 ref->subscriptionKey = req.subscriptionKey;
4144 ref->errorCode = errCode;
4145 releaseSections(signal);
4146 sendSignal(signal->getSendersBlockRef(), GSN_SUB_REMOVE_REF,
4147 signal, SubRemoveRef::SignalLength, JBB);
4148 DBUG_VOID_RETURN;
4151 void
4152 Suma::Table::release(Suma & suma){
4153 jam();
4155 LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, m_attributes);
4156 attrBuf.release();
4158 LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool, m_fragments);
4159 fragBuf.release();
4161 m_state = UNDEFINED;
4162 #ifndef DBUG_OFF
4163 if (n_subscribers != 0)
4164 abort();
4165 #endif
4168 void
4169 Suma::SyncRecord::release(){
4170 jam();
4171 m_tableList.release();
4173 LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, m_attributeList);
4174 attrBuf.release();
4178 /**************************************************************
4180 * Restarting remote node functions, master functionality
4181 * (slave does nothing special)
4182 * - triggered on INCL_NODEREQ calling startNode
4183 * - included node will issue START_ME when it's ready to start
4184 * the subscribers
4188 void
4189 Suma::execSUMA_START_ME_REQ(Signal* signal) {
4190 jamEntry();
4191 DBUG_ENTER("Suma::execSUMA_START_ME");
4192 ndbassert(signal->getNoOfSections() == 0);
4193 Restart.runSUMA_START_ME_REQ(signal, signal->getSendersBlockRef());
4194 DBUG_VOID_RETURN;
4197 void
4198 Suma::execSUB_CREATE_REF(Signal* signal) {
4199 jamEntry();
4200 DBUG_ENTER("Suma::execSUB_CREATE_REF");
4201 ndbassert(signal->getNoOfSections() == 0);
4202 SubCreateRef *const ref= (SubCreateRef *)signal->getDataPtr();
4203 Uint32 error= ref->errorCode;
4204 if (error != 1415)
4207 * This will happen if an api node connects during while other node
4208 * is restarting, and in this case the subscription will already
4209 * have been created.
4210 * ToDo: more complete handling of api nodes joining during
4211 * node restart
4213 Uint32 senderRef = signal->getSendersBlockRef();
4214 BlockReference cntrRef = calcNdbCntrBlockRef(refToNode(senderRef));
4215 // for some reason we did not manage to create a subscription
4216 // on the starting node
4217 SystemError * const sysErr = (SystemError*)&signal->theData[0];
4218 sysErr->errorCode = SystemError::CopySubscriptionRef;
4219 sysErr->errorRef = reference();
4220 sysErr->data1 = error;
4221 sysErr->data2 = 0;
4222 sendSignal(cntrRef, GSN_SYSTEM_ERROR, signal,
4223 SystemError::SignalLength, JBB);
4224 Restart.resetRestart(signal);
4225 DBUG_VOID_RETURN;
4227 // SubCreateConf has same signaldata as SubCreateRef
4228 Restart.runSUB_CREATE_CONF(signal);
4229 DBUG_VOID_RETURN;
4232 void
4233 Suma::execSUB_CREATE_CONF(Signal* signal)
4235 jamEntry();
4236 DBUG_ENTER("Suma::execSUB_CREATE_CONF");
4237 ndbassert(signal->getNoOfSections() == 0);
4238 Restart.runSUB_CREATE_CONF(signal);
4239 DBUG_VOID_RETURN;
4242 void
4243 Suma::execSUB_START_CONF(Signal* signal)
4245 jamEntry();
4246 DBUG_ENTER("Suma::execSUB_START_CONF");
4247 ndbassert(signal->getNoOfSections() == 0);
4248 Restart.runSUB_START_CONF(signal);
4249 DBUG_VOID_RETURN;
4252 void
4253 Suma::execSUB_START_REF(Signal* signal) {
4254 jamEntry();
4255 DBUG_ENTER("Suma::execSUB_START_REF");
4256 ndbassert(signal->getNoOfSections() == 0);
4257 SubStartRef *const ref= (SubStartRef *)signal->getDataPtr();
4258 Uint32 error= ref->errorCode;
4260 Uint32 senderRef = signal->getSendersBlockRef();
4261 BlockReference cntrRef = calcNdbCntrBlockRef(refToNode(senderRef));
4262 // for some reason we did not manage to start a subscriber
4263 // on the starting node
4264 SystemError * const sysErr = (SystemError*)&signal->theData[0];
4265 sysErr->errorCode = SystemError::CopySubscriberRef;
4266 sysErr->errorRef = reference();
4267 sysErr->data1 = error;
4268 sysErr->data2 = 0;
4269 sendSignal(cntrRef, GSN_SYSTEM_ERROR, signal,
4270 SystemError::SignalLength, JBB);
4271 Restart.resetRestart(signal);
4273 DBUG_VOID_RETURN;
4276 Suma::Restart::Restart(Suma& s) : suma(s)
4278 nodeId = 0;
4281 void
4282 Suma::Restart::runSUMA_START_ME_REQ(Signal* signal, Uint32 sumaRef)
4284 jam();
4285 DBUG_ENTER("Suma::Restart::runSUMA_START_ME");
4287 if(nodeId != 0)
4289 SumaStartMeRef* ref= (SumaStartMeRef*)signal->getDataPtrSend();
4290 ref->errorCode = SumaStartMeRef::Busy;
4291 suma.sendSignal(sumaRef, GSN_SUMA_START_ME_REF, signal,
4292 SumaStartMeRef::SignalLength, JBB);
4293 DBUG_VOID_RETURN;
4296 nodeId = refToNode(sumaRef);
4297 startNode(signal, sumaRef);
4299 DBUG_VOID_RETURN;
4302 void
4303 Suma::Restart::startNode(Signal* signal, Uint32 sumaRef)
4305 jam();
4306 DBUG_ENTER("Suma::Restart::startNode");
4308 // right now we can only handle restarting one node
4309 // at a time in a node group
4311 createSubscription(signal, sumaRef);
4312 DBUG_VOID_RETURN;
4315 void
4316 Suma::Restart::createSubscription(Signal* signal, Uint32 sumaRef)
4318 jam();
4319 DBUG_ENTER("Suma::Restart::createSubscription");
4320 suma.c_subscriptions.first(c_subIt);
4321 nextSubscription(signal, sumaRef);
4322 DBUG_VOID_RETURN;
4325 void
4326 Suma::Restart::nextSubscription(Signal* signal, Uint32 sumaRef)
4328 jam();
4329 DBUG_ENTER("Suma::Restart::nextSubscription");
4331 if (c_subIt.isNull())
4333 jam();
4334 completeSubscription(signal, sumaRef);
4335 DBUG_VOID_RETURN;
4337 SubscriptionPtr subPtr;
4338 subPtr.i = c_subIt.curr.i;
4339 subPtr.p = suma.c_subscriptions.getPtr(subPtr.i);
4341 suma.c_subscriptions.next(c_subIt);
4343 SubCreateReq * req = (SubCreateReq *)signal->getDataPtrSend();
4345 req->senderRef = suma.reference();
4346 req->senderData = subPtr.i;
4347 req->subscriptionId = subPtr.p->m_subscriptionId;
4348 req->subscriptionKey = subPtr.p->m_subscriptionKey;
4349 req->subscriptionType = subPtr.p->m_subscriptionType |
4350 SubCreateReq::RestartFlag;
4352 switch (subPtr.p->m_subscriptionType) {
4353 case SubCreateReq::TableEvent:
4354 jam();
4355 req->tableId = subPtr.p->m_tableId;
4356 req->state = subPtr.p->m_state;
4357 suma.sendSignal(sumaRef, GSN_SUB_CREATE_REQ, signal,
4358 SubCreateReq::SignalLength2, JBB);
4359 DBUG_VOID_RETURN;
4360 case SubCreateReq::SingleTableScan:
4361 jam();
4362 nextSubscription(signal, sumaRef);
4363 DBUG_VOID_RETURN;
4364 case SubCreateReq::SelectiveTableSnapshot:
4365 case SubCreateReq::DatabaseSnapshot:
4366 ndbrequire(false);
4368 ndbrequire(false);
4371 void
4372 Suma::Restart::runSUB_CREATE_CONF(Signal* signal)
4374 jam();
4375 DBUG_ENTER("Suma::Restart::runSUB_CREATE_CONF");
4377 const Uint32 senderRef = signal->senderBlockRef();
4378 Uint32 sumaRef = signal->getSendersBlockRef();
4380 SubCreateConf * const conf = (SubCreateConf *)signal->getDataPtr();
4382 SubscriptionPtr subPtr;
4383 suma.c_subscriptions.getPtr(subPtr,conf->senderData);
4385 switch(subPtr.p->m_subscriptionType) {
4386 case SubCreateReq::TableEvent:
4387 if (1)
4389 jam();
4390 nextSubscription(signal, sumaRef);
4391 } else {
4392 jam();
4393 SubCreateReq * req = (SubCreateReq *)signal->getDataPtrSend();
4395 req->senderRef = suma.reference();
4396 req->senderData = subPtr.i;
4397 req->subscriptionId = subPtr.p->m_subscriptionId;
4398 req->subscriptionKey = subPtr.p->m_subscriptionKey;
4399 req->subscriptionType = subPtr.p->m_subscriptionType |
4400 SubCreateReq::RestartFlag |
4401 SubCreateReq::AddTableFlag;
4403 req->tableId = 0;
4405 suma.sendSignal(senderRef, GSN_SUB_CREATE_REQ, signal,
4406 SubCreateReq::SignalLength, JBB);
4408 DBUG_VOID_RETURN;
4409 case SubCreateReq::SingleTableScan:
4410 case SubCreateReq::SelectiveTableSnapshot:
4411 case SubCreateReq::DatabaseSnapshot:
4412 ndbrequire(false);
4414 ndbrequire(false);
4417 void
4418 Suma::Restart::completeSubscription(Signal* signal, Uint32 sumaRef)
4420 jam();
4421 DBUG_ENTER("Suma::Restart::completeSubscription");
4422 startSubscriber(signal, sumaRef);
4423 DBUG_VOID_RETURN;
4426 void
4427 Suma::Restart::startSubscriber(Signal* signal, Uint32 sumaRef)
4429 jam();
4430 DBUG_ENTER("Suma::Restart::startSubscriber");
4431 suma.c_tables.first(c_tabIt);
4432 if (c_tabIt.isNull())
4434 completeSubscriber(signal, sumaRef);
4435 DBUG_VOID_RETURN;
4437 SubscriberPtr subbPtr;
4439 LocalDLList<Subscriber>
4440 subbs(suma.c_subscriberPool,c_tabIt.curr.p->c_subscribers);
4441 subbs.first(subbPtr);
4443 nextSubscriber(signal, sumaRef, subbPtr);
4444 DBUG_VOID_RETURN;
4447 void
4448 Suma::Restart::nextSubscriber(Signal* signal, Uint32 sumaRef,
4449 SubscriberPtr subbPtr)
4451 jam();
4452 DBUG_ENTER("Suma::Restart::nextSubscriber");
4453 while (subbPtr.isNull())
4455 jam();
4456 DBUG_PRINT("info",("prev tableId %u",c_tabIt.curr.p->m_tableId));
4457 suma.c_tables.next(c_tabIt);
4458 if (c_tabIt.isNull())
4460 completeSubscriber(signal, sumaRef);
4461 DBUG_VOID_RETURN;
4463 DBUG_PRINT("info",("next tableId %u",c_tabIt.curr.p->m_tableId));
4465 LocalDLList<Subscriber>
4466 subbs(suma.c_subscriberPool,c_tabIt.curr.p->c_subscribers);
4467 subbs.first(subbPtr);
4471 * get subscription ptr for this subscriber
4474 SubscriptionPtr subPtr;
4475 suma.c_subscriptions.getPtr(subPtr, subbPtr.p->m_subPtrI);
4476 switch (subPtr.p->m_subscriptionType) {
4477 case SubCreateReq::TableEvent:
4478 jam();
4479 sendSubStartReq(subPtr, subbPtr, signal, sumaRef);
4480 DBUG_VOID_RETURN;
4481 case SubCreateReq::SelectiveTableSnapshot:
4482 case SubCreateReq::DatabaseSnapshot:
4483 case SubCreateReq::SingleTableScan:
4484 ndbrequire(false);
4486 ndbrequire(false);
4489 void
4490 Suma::Restart::sendSubStartReq(SubscriptionPtr subPtr, SubscriberPtr subbPtr,
4491 Signal* signal, Uint32 sumaRef)
4493 jam();
4494 DBUG_ENTER("Suma::Restart::sendSubStartReq");
4495 SubStartReq * req = (SubStartReq *)signal->getDataPtrSend();
4497 req->senderRef = suma.reference();
4498 req->senderData = subbPtr.i;
4499 req->subscriptionId = subPtr.p->m_subscriptionId;
4500 req->subscriptionKey = subPtr.p->m_subscriptionKey;
4501 req->part = SubscriptionData::TableData;
4502 req->subscriberData = subbPtr.p->m_senderData;
4503 req->subscriberRef = subbPtr.p->m_senderRef;
4505 // restarting suma will not respond to this until startphase 5
4506 // since it is not until then data copying has been completed
4507 DBUG_PRINT("info",("Restarting subscriber: %u on key: [%u,%u] %u",
4508 subbPtr.i,
4509 subPtr.p->m_subscriptionId,
4510 subPtr.p->m_subscriptionKey,
4511 subPtr.p->m_tableId));
4513 suma.sendSignal(sumaRef, GSN_SUB_START_REQ,
4514 signal, SubStartReq::SignalLength2, JBB);
4515 DBUG_VOID_RETURN;
4518 void
4519 Suma::Restart::runSUB_START_CONF(Signal* signal)
4521 jam();
4522 DBUG_ENTER("Suma::Restart::runSUB_START_CONF");
4524 SubStartConf * const conf = (SubStartConf*)signal->getDataPtr();
4526 Subscription key;
4527 SubscriptionPtr subPtr;
4528 key.m_subscriptionId = conf->subscriptionId;
4529 key.m_subscriptionKey = conf->subscriptionKey;
4530 ndbrequire(suma.c_subscriptions.find(subPtr, key));
4532 TablePtr tabPtr;
4533 ndbrequire(suma.c_tables.find(tabPtr, subPtr.p->m_tableId));
4535 SubscriberPtr subbPtr;
4537 LocalDLList<Subscriber>
4538 subbs(suma.c_subscriberPool,tabPtr.p->c_subscribers);
4539 subbs.getPtr(subbPtr, conf->senderData);
4540 DBUG_PRINT("info",("Restarted subscriber: %u on key: [%u,%u] table: %u",
4541 subbPtr.i,key.m_subscriptionId,key.m_subscriptionKey,
4542 subPtr.p->m_tableId));
4543 subbs.next(subbPtr);
4546 Uint32 sumaRef = signal->getSendersBlockRef();
4547 nextSubscriber(signal, sumaRef, subbPtr);
4549 DBUG_VOID_RETURN;
4552 void
4553 Suma::Restart::completeSubscriber(Signal* signal, Uint32 sumaRef)
4555 DBUG_ENTER("Suma::Restart::completeSubscriber");
4556 completeRestartingNode(signal, sumaRef);
4557 DBUG_VOID_RETURN;
4560 void
4561 Suma::Restart::completeRestartingNode(Signal* signal, Uint32 sumaRef)
4563 jam();
4564 DBUG_ENTER("Suma::Restart::completeRestartingNode");
4565 //SumaStartMeConf *conf= (SumaStartMeConf*)signal->getDataPtrSend();
4566 suma.sendSignal(sumaRef, GSN_SUMA_START_ME_CONF, signal,
4567 SumaStartMeConf::SignalLength, JBB);
4568 resetRestart(signal);
4569 DBUG_VOID_RETURN;
4572 void
4573 Suma::Restart::resetRestart(Signal* signal)
4575 jam();
4576 DBUG_ENTER("Suma::Restart::resetRestart");
4577 nodeId = 0;
4578 DBUG_VOID_RETURN;
4581 // only run on restarting suma
4583 void
4584 Suma::execSUMA_HANDOVER_REQ(Signal* signal)
4586 jamEntry();
4587 DBUG_ENTER("Suma::execSUMA_HANDOVER_REQ");
4588 // Uint32 sumaRef = signal->getSendersBlockRef();
4589 SumaHandoverReq const * req = (SumaHandoverReq *)signal->getDataPtr();
4591 Uint32 gci = req->gci;
4592 Uint32 nodeId = req->nodeId;
4593 Uint32 new_gci = m_last_complete_gci + MAX_CONCURRENT_GCP + 1;
4595 Uint32 start_gci = (gci > new_gci ? gci : new_gci);
4596 // mark all active buckets really belonging to restarting SUMA
4598 Bucket_mask tmp;
4599 for( Uint32 i = 0; i < c_no_of_buckets; i++)
4601 if(get_responsible_node(i) == nodeId)
4603 if (m_active_buckets.get(i))
4605 // I'm running this bucket but it should really be the restarted node
4606 tmp.set(i);
4607 m_active_buckets.clear(i);
4608 m_switchover_buckets.set(i);
4609 c_buckets[i].m_switchover_gci = start_gci;
4610 c_buckets[i].m_state |= Bucket::BUCKET_HANDOVER;
4611 c_buckets[i].m_switchover_node = nodeId;
4612 ndbout_c("prepare to handover bucket: %d", i);
4614 else if(m_switchover_buckets.get(i))
4616 ndbout_c("dont handover bucket: %d %d", i, nodeId);
4621 SumaHandoverConf* conf= (SumaHandoverConf*)signal->getDataPtrSend();
4622 tmp.copyto(BUCKET_MASK_SIZE, conf->theBucketMask);
4623 conf->gci = start_gci;
4624 conf->nodeId = getOwnNodeId();
4625 sendSignal(calcSumaBlockRef(nodeId), GSN_SUMA_HANDOVER_CONF, signal,
4626 SumaHandoverConf::SignalLength, JBB);
4628 DBUG_VOID_RETURN;
4631 // only run on all but restarting suma
4632 void
4633 Suma::execSUMA_HANDOVER_REF(Signal* signal)
4635 ndbrequire(false);
4638 void
4639 Suma::execSUMA_HANDOVER_CONF(Signal* signal) {
4640 jamEntry();
4641 DBUG_ENTER("Suma::execSUMA_HANDOVER_CONF");
4643 SumaHandoverConf const * conf = (SumaHandoverConf *)signal->getDataPtr();
4645 Uint32 gci = conf->gci;
4646 Uint32 nodeId = conf->nodeId;
4647 Bucket_mask tmp;
4648 tmp.assign(BUCKET_MASK_SIZE, conf->theBucketMask);
4649 #ifdef HANDOVER_DEBUG
4650 ndbout_c("Suma::execSUMA_HANDOVER_CONF, gci = %u", gci);
4651 #endif
4653 for( Uint32 i = 0; i < c_no_of_buckets; i++)
4655 if (tmp.get(i))
4657 ndbrequire(get_responsible_node(i) == getOwnNodeId());
4658 // We should run this bucket, but _nodeId_ is
4659 c_buckets[i].m_switchover_gci = gci;
4660 c_buckets[i].m_state |= Bucket::BUCKET_STARTING;
4664 char buf[255];
4665 tmp.getText(buf);
4666 infoEvent("Suma: handover from node %d gci: %d buckets: %s (%d)",
4667 nodeId, gci, buf, c_no_of_buckets);
4668 m_switchover_buckets.bitOR(tmp);
4669 c_startup.m_handover_nodes.clear(nodeId);
4670 DBUG_VOID_RETURN;
4673 #ifdef NOT_USED
4674 static
4675 NdbOut&
4676 operator<<(NdbOut & out, const Suma::Page_pos & pos)
4678 out << "[ Page_pos:"
4679 << " m_page_id: " << pos.m_page_id
4680 << " m_page_pos: " << pos.m_page_pos
4681 << " m_max_gci: " << pos.m_max_gci
4682 << " ]";
4683 return out;
4685 #endif
4687 Uint32*
4688 Suma::get_buffer_ptr(Signal* signal, Uint32 buck, Uint32 gci, Uint32 sz)
4690 sz += 1; // len
4691 Bucket* bucket= c_buckets+buck;
4692 Page_pos pos= bucket->m_buffer_head;
4694 Buffer_page* page = 0;
4695 Uint32 *ptr = 0;
4697 if (likely(pos.m_page_id != RNIL))
4699 page= (Buffer_page*)m_tup->c_page_pool.getPtr(pos.m_page_id);
4700 ptr= page->m_data + pos.m_page_pos;
4703 const bool same_gci = (gci == pos.m_last_gci) && (!ERROR_INSERTED(13022));
4705 pos.m_page_pos += sz;
4706 pos.m_last_gci = gci;
4707 Uint32 max = pos.m_max_gci > gci ? pos.m_max_gci : gci;
4709 if(likely(same_gci && pos.m_page_pos <= Buffer_page::DATA_WORDS))
4711 pos.m_max_gci = max;
4712 bucket->m_buffer_head = pos;
4713 * ptr++ = (0x8000 << 16) | sz; // Same gci
4714 return ptr;
4716 else if(pos.m_page_pos + 1 <= Buffer_page::DATA_WORDS)
4718 loop:
4719 pos.m_max_gci = max;
4720 pos.m_page_pos += 1;
4721 bucket->m_buffer_head = pos;
4722 * ptr++ = (sz + 1);
4723 * ptr++ = gci;
4724 return ptr;
4726 else
4729 * new page
4730 * 1) save header on last page
4731 * 2) seize new page
4733 Uint32 next;
4734 if(unlikely((next= seize_page()) == RNIL))
4737 * Out of buffer
4739 out_of_buffer(signal);
4740 return 0;
4743 if(likely(pos.m_page_id != RNIL))
4745 page->m_max_gci = pos.m_max_gci;
4746 page->m_words_used = pos.m_page_pos - sz;
4747 page->m_next_page= next;
4749 else
4751 bucket->m_buffer_tail = next;
4754 memset(&pos, 0, sizeof(pos));
4755 pos.m_page_id = next;
4756 pos.m_page_pos = sz;
4757 pos.m_last_gci = gci;
4759 page= (Buffer_page*)m_tup->c_page_pool.getPtr(pos.m_page_id);
4760 page->m_next_page= RNIL;
4761 ptr= page->m_data;
4762 goto loop; //
4766 void
4767 Suma::out_of_buffer(Signal* signal)
4769 if(m_out_of_buffer_gci)
4771 return;
4774 m_out_of_buffer_gci = m_last_complete_gci - 1;
4775 infoEvent("Out of event buffer: nodefailure will cause event failures");
4777 out_of_buffer_release(signal, 0);
4780 void
4781 Suma::out_of_buffer_release(Signal* signal, Uint32 buck)
4783 Bucket* bucket= c_buckets+buck;
4784 Uint32 tail= bucket->m_buffer_tail;
4786 if(tail != RNIL)
4788 Buffer_page* page= (Buffer_page*)m_tup->c_page_pool.getPtr(tail);
4789 bucket->m_buffer_tail = page->m_next_page;
4790 free_page(tail, page);
4791 signal->theData[0] = SumaContinueB::OUT_OF_BUFFER_RELEASE;
4792 signal->theData[1] = buck;
4793 sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 2, JBB);
4794 return;
4798 * Clear head
4800 bucket->m_buffer_head.m_page_id = RNIL;
4801 bucket->m_buffer_head.m_page_pos = Buffer_page::DATA_WORDS + 1;
4803 buck++;
4804 if(buck != c_no_of_buckets)
4806 signal->theData[0] = SumaContinueB::OUT_OF_BUFFER_RELEASE;
4807 signal->theData[1] = buck;
4808 sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 2, JBB);
4809 return;
4813 * Finished will all release
4814 * prepare for inclusion
4816 m_out_of_buffer_gci = m_max_seen_gci > m_last_complete_gci
4817 ? m_max_seen_gci + 1 : m_last_complete_gci + 1;
4820 Uint32
4821 Suma::seize_page()
4823 if(unlikely(m_out_of_buffer_gci))
4825 return RNIL;
4827 loop:
4828 Ptr<Page_chunk> ptr;
4829 Uint32 ref= m_first_free_page;
4830 if(likely(ref != RNIL))
4832 m_first_free_page = ((Buffer_page*)m_tup->c_page_pool.getPtr(ref))->m_next_page;
4833 Uint32 chunk = ((Buffer_page*)m_tup->c_page_pool.getPtr(ref))->m_page_chunk_ptr_i;
4834 c_page_chunk_pool.getPtr(ptr, chunk);
4835 ndbassert(ptr.p->m_free);
4836 ptr.p->m_free--;
4837 return ref;
4840 if(!c_page_chunk_pool.seize(ptr))
4841 return RNIL;
4843 Uint32 count;
4844 m_tup->allocConsPages(16, count, ref);
4845 if (count == 0)
4846 return RNIL;
4848 ndbout_c("alloc_chunk(%d %d) - ", ref, count);
4850 m_first_free_page = ptr.p->m_page_id = ref;
4851 ptr.p->m_size = count;
4852 ptr.p->m_free = count;
4854 Buffer_page* page;
4855 LINT_INIT(page);
4856 for(Uint32 i = 0; i<count; i++)
4858 page = (Buffer_page*)m_tup->c_page_pool.getPtr(ref);
4859 page->m_page_state= SUMA_SEQUENCE;
4860 page->m_page_chunk_ptr_i = ptr.i;
4861 page->m_next_page = ++ref;
4863 page->m_next_page = RNIL;
4865 goto loop;
4868 void
4869 Suma::free_page(Uint32 page_id, Buffer_page* page)
4871 Ptr<Page_chunk> ptr;
4872 ndbrequire(page->m_page_state == SUMA_SEQUENCE);
4874 Uint32 chunk= page->m_page_chunk_ptr_i;
4876 c_page_chunk_pool.getPtr(ptr, chunk);
4878 ptr.p->m_free ++;
4879 page->m_next_page = m_first_free_page;
4880 ndbrequire(ptr.p->m_free <= ptr.p->m_size);
4882 m_first_free_page = page_id;
4885 void
4886 Suma::release_gci(Signal* signal, Uint32 buck, Uint32 gci)
4888 Bucket* bucket= c_buckets+buck;
4889 Uint32 tail= bucket->m_buffer_tail;
4890 Page_pos head= bucket->m_buffer_head;
4891 Uint32 max_acked = bucket->m_max_acked_gci;
4893 const Uint32 mask = Bucket::BUCKET_TAKEOVER | Bucket::BUCKET_RESEND;
4894 if(unlikely(bucket->m_state & mask))
4896 jam();
4897 ndbout_c("release_gci(%d, %d) -> node failure -> abort", buck, gci);
4898 return;
4901 bucket->m_max_acked_gci = (max_acked > gci ? max_acked : gci);
4902 if(unlikely(tail == RNIL))
4904 return;
4907 if(tail == head.m_page_id)
4909 if(gci >= head.m_max_gci)
4911 jam();
4912 if (ERROR_INSERTED(13034))
4914 jam();
4915 SET_ERROR_INSERT_VALUE(13035);
4916 return;
4918 if (ERROR_INSERTED(13035))
4920 CLEAR_ERROR_INSERT_VALUE;
4921 NodeReceiverGroup rg(CMVMI, c_nodes_in_nodegroup_mask);
4922 rg.m_nodes.clear(getOwnNodeId());
4923 signal->theData[0] = 9999;
4924 sendSignal(rg, GSN_NDB_TAMPER, signal, 1, JBA);
4925 return;
4927 head.m_page_pos = 0;
4928 head.m_max_gci = gci;
4929 head.m_last_gci = 0;
4930 bucket->m_buffer_head = head;
4932 return;
4934 else
4936 jam();
4937 Buffer_page* page= (Buffer_page*)m_tup->c_page_pool.getPtr(tail);
4938 Uint32 max_gci = page->m_max_gci;
4939 Uint32 next_page = page->m_next_page;
4941 ndbassert(max_gci);
4943 if(gci >= max_gci)
4945 jam();
4946 free_page(tail, page);
4948 bucket->m_buffer_tail = next_page;
4949 signal->theData[0] = SumaContinueB::RELEASE_GCI;
4950 signal->theData[1] = buck;
4951 signal->theData[2] = gci;
4952 sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
4953 return;
4955 else
4957 //ndbout_c("do nothing...");
4962 static Uint32 g_cnt = 0;
4964 void
4965 Suma::start_resend(Signal* signal, Uint32 buck)
4967 printf("start_resend(%d, ", buck);
4969 if(m_out_of_buffer_gci)
4971 progError(__LINE__, NDBD_EXIT_SYSTEM_ERROR,
4972 "Nodefailure while out of event buffer");
4973 return;
4977 * Resend from m_max_acked_gci + 1 until max_gci + 1
4979 Bucket* bucket= c_buckets + buck;
4980 Page_pos pos= bucket->m_buffer_head;
4982 if(pos.m_page_id == RNIL)
4984 jam();
4985 m_active_buckets.set(buck);
4986 m_gcp_complete_rep_count ++;
4987 ndbout_c("empty bucket(RNIL) -> active");
4988 return;
4991 Uint32 min= bucket->m_max_acked_gci + 1;
4992 Uint32 max = pos.m_max_gci;
4994 ndbrequire(max <= m_max_seen_gci);
4996 if(min > max)
4998 ndbrequire(pos.m_page_id == bucket->m_buffer_tail);
4999 m_active_buckets.set(buck);
5000 m_gcp_complete_rep_count ++;
5001 ndbout_c("empty bucket -> active");
5002 return;
5005 g_cnt = 0;
5006 bucket->m_state |= (Bucket::BUCKET_TAKEOVER | Bucket::BUCKET_RESEND);
5007 bucket->m_switchover_node = get_responsible_node(buck);
5008 bucket->m_switchover_gci = max + 1;
5010 m_switchover_buckets.set(buck);
5012 signal->theData[1] = buck;
5013 signal->theData[2] = min;
5014 signal->theData[3] = 0;
5015 signal->theData[4] = 0;
5016 sendSignal(reference(), GSN_CONTINUEB, signal, 5, JBB);
5018 ndbout_c("min: %d - max: %d) page: %d", min, max, bucket->m_buffer_tail);
5019 ndbrequire(max >= min);
5022 void
5023 Suma::resend_bucket(Signal* signal, Uint32 buck, Uint32 min_gci,
5024 Uint32 pos, Uint32 last_gci)
5026 Bucket* bucket= c_buckets+buck;
5027 Uint32 tail= bucket->m_buffer_tail;
5029 Buffer_page* page= (Buffer_page*)m_tup->c_page_pool.getPtr(tail);
5030 Uint32 max_gci = page->m_max_gci;
5031 Uint32 next_page = page->m_next_page;
5032 Uint32 *ptr = page->m_data + pos;
5033 Uint32 *end = page->m_data + page->m_words_used;
5034 bool delay = false;
5036 ndbrequire(tail != RNIL);
5038 if(tail == bucket->m_buffer_head.m_page_id)
5040 max_gci= bucket->m_buffer_head.m_max_gci;
5041 end= page->m_data + bucket->m_buffer_head.m_page_pos;
5042 next_page= RNIL;
5044 if(ptr == end)
5046 delay = true;
5047 goto next;
5050 else if(pos == 0 && min_gci > max_gci)
5052 free_page(tail, page);
5053 tail = bucket->m_buffer_tail = next_page;
5054 ndbout_c("pos==0 && min_gci(%d) > max_gci(%d) resend switching page to %d", min_gci, max_gci, tail);
5055 goto next;
5058 #if 0
5059 for(Uint32 i = 0; i<page->m_words_used; i++)
5061 printf("%.8x ", page->m_data[i]);
5062 if(((i + 1) % 8) == 0)
5063 printf("\n");
5065 printf("\n");
5066 #endif
5068 while(ptr < end)
5070 Uint32 *src = ptr;
5071 Uint32 tmp = * src++;
5072 Uint32 sz = tmp & 0xFFFF;
5074 ptr += sz;
5076 if(! (tmp & (0x8000 << 16)))
5078 sz--;
5079 last_gci = * src ++;
5081 else
5083 ndbrequire(ptr - sz > page->m_data);
5086 if(last_gci < min_gci)
5088 continue;
5091 ndbrequire(sz);
5092 sz --; // remove *len* part of sz
5094 if(sz == 0)
5096 SubGcpCompleteRep * rep = (SubGcpCompleteRep*)signal->getDataPtrSend();
5097 rep->gci = last_gci;
5098 rep->senderRef = reference();
5099 rep->gcp_complete_rep_count = 1;
5101 char buf[255];
5102 c_subscriber_nodes.getText(buf);
5103 ndbout_c("resending GCI: %d rows: %d -> %s", last_gci, g_cnt, buf);
5104 g_cnt = 0;
5106 NodeReceiverGroup rg(API_CLUSTERMGR, c_subscriber_nodes);
5107 sendSignal(rg, GSN_SUB_GCP_COMPLETE_REP, signal,
5108 SubGcpCompleteRep::SignalLength, JBB);
5110 else
5112 const uint buffer_header_sz = 4;
5113 g_cnt++;
5114 Uint32 table = * src++ ;
5115 Uint32 schemaVersion = * src++;
5116 Uint32 event = * src >> 16;
5117 Uint32 sz_1 = (* src ++) & 0xFFFF;
5118 Uint32 any_value = * src++;
5120 ndbassert(sz - buffer_header_sz >= sz_1);
5122 LinearSectionPtr ptr[3];
5123 const Uint32 nptr= reformat(signal, ptr,
5124 src, sz_1,
5125 src + sz_1, sz - buffer_header_sz - sz_1);
5126 Uint32 ptrLen= 0;
5127 for(Uint32 i =0; i < nptr; i++)
5128 ptrLen+= ptr[i].sz;
5131 * Signal to subscriber(s)
5133 Ptr<Table> tabPtr;
5134 if (c_tables.find(tabPtr, table) &&
5135 tabPtr.p->m_schemaVersion == schemaVersion)
5137 SubTableData * data = (SubTableData*)signal->getDataPtrSend();//trg;
5138 data->gci = last_gci;
5139 data->tableId = table;
5140 data->requestInfo = 0;
5141 SubTableData::setOperation(data->requestInfo, event);
5142 data->logType = 0;
5143 data->anyValue = any_value;
5144 data->totalLen = ptrLen;
5147 LocalDLList<Subscriber>
5148 list(c_subscriberPool,tabPtr.p->c_subscribers);
5149 SubscriberPtr subbPtr;
5150 for(list.first(subbPtr); !subbPtr.isNull(); list.next(subbPtr))
5152 DBUG_PRINT("info",("GSN_SUB_TABLE_DATA to node %d",
5153 refToNode(subbPtr.p->m_senderRef)));
5154 data->senderData = subbPtr.p->m_senderData;
5155 sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
5156 SubTableData::SignalLength, JBB, ptr, nptr);
5162 break;
5165 if(ptr == end && (tail != bucket->m_buffer_head.m_page_id))
5168 * release...
5170 free_page(tail, page);
5171 tail = bucket->m_buffer_tail = next_page;
5172 pos = 0;
5173 last_gci = 0;
5174 ndbout_c("ptr == end -> resend switching page to %d", tail);
5176 else
5178 pos = (ptr - page->m_data);
5181 next:
5182 if(tail == RNIL)
5184 bucket->m_state &= ~(Uint32)Bucket::BUCKET_RESEND;
5185 ndbassert(! (bucket->m_state & Bucket::BUCKET_TAKEOVER));
5186 ndbout_c("resend done...");
5187 return;
5190 signal->theData[0] = SumaContinueB::RESEND_BUCKET;
5191 signal->theData[1] = buck;
5192 signal->theData[2] = min_gci;
5193 signal->theData[3] = pos;
5194 signal->theData[4] = last_gci;
5195 if(!delay)
5196 sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 5, JBB);
5197 else
5198 sendSignalWithDelay(SUMA_REF, GSN_CONTINUEB, signal, 10, 5);
5201 template void append(DataBuffer<11>&,SegmentedSectionPtr,SectionSegmentPool&);