1 /* Copyright (c) 2003-2007 MySQL AB
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
19 #include <ndb_limits.h>
20 #include <SimulatedBlock.hpp>
22 #include <NodeBitmask.hpp>
26 #include <DLFifoList.hpp>
27 #include <KeyTable.hpp>
28 #include <DataBuffer.hpp>
29 #include <SignalCounter.hpp>
30 #include <AttributeHeader.hpp>
31 #include <AttributeList.hpp>
33 #include <signaldata/UtilSequence.hpp>
34 #include <signaldata/SumaImpl.hpp>
35 #include <ndbapi/NdbDictionary.hpp>
37 class Suma
: public SimulatedBlock
{
40 Suma(Block_context
& ctx
);
46 void execSUB_CREATE_REQ(Signal
* signal
);
47 void execSUB_REMOVE_REQ(Signal
* signal
);
49 void execSUB_START_REQ(Signal
* signal
);
50 void execSUB_STOP_REQ(Signal
* signal
);
52 void execSUB_SYNC_REQ(Signal
* signal
);
53 void execSUB_ABORT_SYNC_REQ(Signal
* signal
);
55 void execSUB_STOP_CONF(Signal
* signal
);
56 void execSUB_STOP_REF(Signal
* signal
);
62 void execLIST_TABLES_REF(Signal
* signal
);
63 void execLIST_TABLES_CONF(Signal
* signal
);
65 void execGET_TABINFOREF(Signal
* signal
);
66 void execGET_TABINFO_CONF(Signal
* signal
);
68 void execGET_TABLEID_CONF(Signal
* signal
);
69 void execGET_TABLEID_REF(Signal
* signal
);
71 void execDROP_TAB_CONF(Signal
* signal
);
72 void execALTER_TAB_REQ(Signal
* signal
);
73 void execCREATE_TAB_CONF(Signal
* signal
);
77 void execSCAN_HBREP(Signal
* signal
);
78 void execSCAN_FRAGREF(Signal
* signal
);
79 void execSCAN_FRAGCONF(Signal
* signal
);
80 void execTRANSID_AI(Signal
* signal
);
81 void execSUB_SYNC_CONTINUE_REF(Signal
* signal
);
82 void execSUB_SYNC_CONTINUE_CONF(Signal
* signal
);
87 void execTRIG_ATTRINFO(Signal
* signal
);
88 void execFIRE_TRIG_ORD(Signal
* signal
);
89 void execSUB_GCP_COMPLETE_REP(Signal
* signal
);
94 void execDI_FCOUNTREF(Signal
* signal
);
95 void execDI_FCOUNTCONF(Signal
* signal
);
96 void execDIGETPRIMREF(Signal
* signal
);
97 void execDIGETPRIMCONF(Signal
* signal
);
100 * Trigger administration
102 void execCREATE_TRIG_REF(Signal
* signal
);
103 void execCREATE_TRIG_CONF(Signal
* signal
);
104 void execDROP_TRIG_REF(Signal
* signal
);
105 void execDROP_TRIG_CONF(Signal
* signal
);
110 void execCONTINUEB(Signal
* signal
);
114 void suma_ndbrequire(bool v
);
116 typedef DataBuffer
<15> TableList
;
118 union FragmentDescriptor
{
127 * Used when sending SCAN_FRAG
129 union AttributeDescriptor
{
140 Uint32 m_subPtrI
; //reference to subscription
143 union { Uint32 nextPool
; Uint32 prevList
; };
145 typedef Ptr
<Subscriber
> SubscriberPtr
;
151 struct Subscription
{
155 Uint32 m_subscriptionId
;
156 Uint32 m_subscriptionKey
;
157 Uint32 m_subscriptionType
;
162 REPORT_SUBSCRIBE
= 0x2
172 Uint32 n_subscribers
;
175 union { Uint32 prevHash
; Uint32 nextPool
; };
177 Uint32
hashValue() const {
178 return m_subscriptionId
+ m_subscriptionKey
;
181 bool equal(const Subscription
& s
) const {
183 m_subscriptionId
== s
.m_subscriptionId
&&
184 m_subscriptionKey
== s
.m_subscriptionKey
;
187 * The following holds the tables included
188 * in the subscription.
192 Uint32 m_current_sync_ptrI
;
194 typedef Ptr
<Subscription
> SubscriptionPtr
;
198 typedef Ptr
<Table
> TablePtr
;
201 SyncRecord(Suma
& s
, DataBuffer
<15>::DataBufferPool
& p
)
202 : m_tableList(p
), suma(s
)
204 , cerrorInsert(s
.cerrorInsert
)
213 Uint32 m_subscriptionPtrI
;
215 Uint32 m_currentTable
;
216 TableList m_tableList
; // Tables to sync
217 TableList::DataBufferIterator m_tableList_it
;
222 Uint32 m_currentFragment
; // Index in tabPtr.p->m_fragments
223 DataBuffer
<15>::Head m_attributeList
; // Attribute if other than default
224 DataBuffer
<15>::Head m_tabList
; // tables if other than default
226 Uint32 m_currentTableId
; // Current table
227 Uint32 m_currentNoOfAttributes
; // No of attributes for current table
229 void startScan(Signal
*);
230 void nextScan(Signal
*);
231 bool getNextFragment(TablePtr
* tab
, FragmentDescriptor
* fd
);
232 void completeScan(Signal
*, int error
= 0);
238 BlockNumber
number() const { return suma
.number(); }
239 void progError(int line
, int cause
, const char * extra
) {
240 suma
.progError(line
, cause
, extra
);
243 Uint32 prevList
; Uint32 ptrI
;
244 union { Uint32 nextPool
; Uint32 nextList
; };
246 friend struct SyncRecord
;
248 int initTable(Signal
*signal
,Uint32 tableId
, TablePtr
&tabPtr
,
249 Ptr
<SyncRecord
> syncPtr
);
250 int initTable(Signal
*signal
,Uint32 tableId
, TablePtr
&tabPtr
,
251 SubscriberPtr subbPtr
);
252 int initTable(Signal
*signal
,Uint32 tableId
, TablePtr
&tabPtr
);
254 int completeOneSubscriber(Signal
* signal
, TablePtr tabPtr
, SubscriberPtr subbPtr
);
255 void completeAllSubscribers(Signal
* signal
, TablePtr tabPtr
);
256 void completeInitTable(Signal
* signal
, TablePtr tabPtr
);
259 Table() { m_tableId
= ~0; n_subscribers
= 0; }
261 void checkRelease(Suma
&suma
);
263 DLList
<Subscriber
>::Head c_subscribers
;
264 DLList
<SyncRecord
>::Head c_syncRecords
;
276 SubscriberPtr m_drop_subbPtr
;
278 Uint32 n_subscribers
;
281 bool parseTable(SegmentedSectionPtr ptr
, Suma
&suma
);
285 int setupTrigger(Signal
* signal
, Suma
&suma
);
286 void completeTrigger(Signal
* signal
);
287 void createAttributeMask(AttributeMask
&, Suma
&suma
);
292 void dropTrigger(Signal
* signal
,Suma
&);
293 void runDropTrigger(Signal
* signal
, Uint32 triggerId
,Suma
&);
299 void runLIST_TABLES_CONF(Signal
* signal
);
302 union { Uint32 m_tableId
; Uint32 key
; };
303 Uint32 m_schemaVersion
;
304 Uint8 m_hasTriggerDefined
[3]; // Insert/Update/Delete
305 Uint8 m_hasOutstandingTriggerReq
[3]; // Insert/Update/Delete
306 Uint32 m_triggerIds
[3]; // Insert/Update/Delete
310 * Default order in which to ask for attributes during scan
311 * 1) Fixed, not nullable
314 DataBuffer
<15>::Head m_attributes
; // Attribute id's
320 DataBuffer
<15>::Head m_fragments
; // Fragment descriptors
326 union { Uint32 prevHash
; Uint32 nextPool
; };
327 Uint32
hashValue() const {
330 bool equal(const Table
& rec
) const {
331 return m_tableId
== rec
.m_tableId
;
338 DLList
<Subscriber
> c_metaSubscribers
;
339 DLList
<Subscriber
> c_removeDataSubscribers
;
344 KeyTable
<Table
> c_tables
;
345 DLHashTable
<Subscription
> c_subscriptions
;
350 ArrayPool
<Subscriber
> c_subscriberPool
;
351 ArrayPool
<Table
> c_tablePool
;
352 ArrayPool
<Subscription
> c_subscriptionPool
;
353 ArrayPool
<SyncRecord
> c_syncPool
;
354 DataBuffer
<15>::DataBufferPool c_dataBufferPool
;
356 NodeBitmask c_failedApiNodes
;
361 bool removeSubscribersOnNode(Signal
*signal
, Uint32 nodeId
);
363 bool checkTableTriggers(SegmentedSectionPtr ptr
);
365 void addTableId(Uint32 TableId
,
366 SubscriptionPtr subPtr
, SyncRecord
*psyncRec
);
368 void sendSubIdRef(Signal
* signal
,Uint32 senderRef
,Uint32 senderData
,Uint32 errorCode
);
369 void sendSubCreateRef(Signal
* signal
, Uint32 errorCode
);
370 void sendSubStartRef(Signal
*, SubscriberPtr
, Uint32 errorCode
, SubscriptionData::Part
);
371 void sendSubStartRef(Signal
* signal
, Uint32 errorCode
);
372 void sendSubStopRef(Signal
* signal
, Uint32 errorCode
);
373 void sendSubSyncRef(Signal
* signal
, Uint32 errorCode
);
374 void sendSubRemoveRef(Signal
* signal
, const SubRemoveReq
& ref
,
376 void sendSubStartComplete(Signal
*, SubscriberPtr
, Uint32
,
377 SubscriptionData::Part
);
378 void sendSubStopComplete(Signal
*, SubscriberPtr
);
379 void sendSubStopReq(Signal
* signal
, bool unlock
= false);
381 void completeSubRemove(SubscriptionPtr subPtr
);
383 void reportAllSubscribers(Signal
*signal
,
384 NdbDictionary::Event::_TableEvent table_event
,
385 SubscriptionPtr subPtr
,
386 SubscriberPtr subbPtr
);
388 Uint32
getFirstGCI(Signal
* signal
);
393 void convertNameToId( SubscriptionPtr subPtr
, Signal
* signal
);
398 void execCREATE_SUBSCRIPTION_REQ(Signal
* signal
);
399 void execDROP_SUBSCRIPTION_REQ(Signal
* signal
);
401 void execSTART_SUBSCRIPTION_REQ(Signal
* signal
);
402 void execSTOP_SUBSCRIPTION_REQ(Signal
* signal
);
404 void execSYNC_SUBSCRIPTION_REQ(Signal
* signal
);
405 void execABORT_SYNC_REQ(Signal
* signal
);
411 void getNodeGroupMembers(Signal
* signal
);
413 void execREAD_CONFIG_REQ(Signal
* signal
);
415 void execSTTOR(Signal
* signal
);
416 void sendSTTORRY(Signal
*);
417 void execNDB_STTOR(Signal
* signal
);
418 void execDUMP_STATE_ORD(Signal
* signal
);
419 void execREAD_NODESCONF(Signal
* signal
);
420 void execNODE_FAILREP(Signal
* signal
);
421 void execINCL_NODEREQ(Signal
* signal
);
422 void execSIGNAL_DROPPED_REP(Signal
* signal
);
423 void execAPI_START_REP(Signal
* signal
);
424 void execAPI_FAILREQ(Signal
* signal
) ;
426 void execSUB_GCP_COMPLETE_ACK(Signal
* signal
);
429 * Controller interface
431 void execSUB_CREATE_REF(Signal
* signal
);
432 void execSUB_CREATE_CONF(Signal
* signal
);
434 void execSUB_DROP_REF(Signal
* signal
);
435 void execSUB_DROP_CONF(Signal
* signal
);
437 void execSUB_START_REF(Signal
* signal
);
438 void execSUB_START_CONF(Signal
* signal
);
440 void execSUB_ABORT_SYNC_REF(Signal
* signal
);
441 void execSUB_ABORT_SYNC_CONF(Signal
* signal
);
443 void execSUMA_START_ME_REQ(Signal
* signal
);
444 void execSUMA_START_ME_REF(Signal
* signal
);
445 void execSUMA_START_ME_CONF(Signal
* signal
);
446 void execSUMA_HANDOVER_REQ(Signal
* signal
);
447 void execSUMA_HANDOVER_REF(Signal
* signal
);
448 void execSUMA_HANDOVER_CONF(Signal
* signal
);
451 * Subscription generation interface
453 void createSequence(Signal
* signal
);
454 void createSequenceReply(Signal
* signal
,
455 UtilSequenceConf
* conf
,
456 UtilSequenceRef
* ref
);
457 void execUTIL_SEQUENCE_CONF(Signal
* signal
);
458 void execUTIL_SEQUENCE_REF(Signal
* signal
);
459 void execCREATE_SUBID_REQ(Signal
* signal
);
462 * for Suma that is restarting another
471 DLHashTable
<Subscription
>::Iterator c_subIt
;
472 KeyTable
<Table
>::Iterator c_tabIt
;
474 void progError(int line
, int cause
, const char * extra
) {
475 suma
.progError(line
, cause
, extra
);
478 void resetNode(Uint32 sumaRef
);
479 void runSUMA_START_ME_REQ(Signal
*, Uint32 sumaRef
);
480 void startNode(Signal
*, Uint32 sumaRef
);
482 void createSubscription(Signal
* signal
, Uint32 sumaRef
);
483 void nextSubscription(Signal
* signal
, Uint32 sumaRef
);
484 void runSUB_CREATE_CONF(Signal
* signal
);
485 void completeSubscription(Signal
* signal
, Uint32 sumaRef
);
487 void startSubscriber(Signal
* signal
, Uint32 sumaRef
);
488 void nextSubscriber(Signal
* signal
, Uint32 sumaRef
, SubscriberPtr subbPtr
);
489 void sendSubStartReq(SubscriptionPtr subPtr
, SubscriberPtr subbPtr
,
490 Signal
* signal
, Uint32 sumaRef
);
491 void runSUB_START_CONF(Signal
* signal
);
492 void completeSubscriber(Signal
* signal
, Uint32 sumaRef
);
494 void completeRestartingNode(Signal
* signal
, Uint32 sumaRef
);
495 void resetRestart(Signal
* signal
);
499 friend class Restart
;
503 NodeId c_masterNodeId
;
504 NdbNodeBitmask c_alive_nodes
;
507 * for restarting Suma not to start sending data too early
511 bool m_wait_handover
;
512 Uint32 m_restart_server_node_id
;
513 NdbNodeBitmask m_handover_nodes
;
516 NodeBitmask c_connected_nodes
; // (NODE/API) START REP / (API/NODE) FAIL REQ
517 NodeBitmask c_subscriber_nodes
; //
520 * for all Suma's to keep track of other Suma's in Node group
523 Uint32 c_noNodesInGroup
;
524 Uint32 c_nodesInGroup
[MAX_REPLICAS
];
525 NdbNodeBitmask c_nodes_in_nodegroup_mask
; // NodeId's of nodes in nodegroup
527 void send_start_me_req(Signal
* signal
);
528 void check_start_handover(Signal
* signal
);
529 void send_handover_req(Signal
* signal
);
531 Uint32
get_responsible_node(Uint32 B
) const;
532 Uint32
get_responsible_node(Uint32 B
, const NdbNodeBitmask
& mask
) const;
533 bool check_switchover(Uint32 bucket
, Uint32 gci
);
540 Uint32 m_max_gci
; // max gci on page
541 Uint32 m_last_gci
; // last gci on page
548 BUCKET_STARTING
= 0x1 // On starting node
549 ,BUCKET_HANDOVER
= 0x2 // On running node
550 ,BUCKET_TAKEOVER
= 0x4 // On takeing over node
551 ,BUCKET_RESEND
= 0x8 // On takeing over node
554 Uint16 m_switchover_node
;
555 Uint16 m_nodes
[MAX_REPLICAS
];
556 Uint32 m_switchover_gci
;
557 Uint32 m_max_acked_gci
;
558 Uint32 m_buffer_tail
; // Page
559 Page_pos m_buffer_head
;
564 STATIC_CONST( DATA_WORDS
= 8192 - 9);
569 Uint32 m_page_state
; // Used by TUP buddy algorithm
570 Uint32 m_page_chunk_ptr_i
;
572 Uint32 m_words_used
; //
574 Uint32 m_data
[DATA_WORDS
];
577 STATIC_CONST( NO_OF_BUCKETS
= 24 ); // 24 = 4*3*2*1!
578 Uint32 c_no_of_buckets
;
579 struct Bucket c_buckets
[NO_OF_BUCKETS
];
581 STATIC_CONST( BUCKET_MASK_SIZE
= (((NO_OF_BUCKETS
+31)>> 5)) );
582 typedef Bitmask
<BUCKET_MASK_SIZE
> Bucket_mask
;
583 Bucket_mask m_active_buckets
;
584 Bucket_mask m_switchover_buckets
;
588 Uint32
* get_buffer_ptr(Signal
*, Uint32 buck
, Uint32 gci
, Uint32 sz
);
590 void free_page(Uint32 page_id
, Buffer_page
* page
);
591 void out_of_buffer(Signal
*);
592 void out_of_buffer_release(Signal
* signal
, Uint32 buck
);
594 void start_resend(Signal
*, Uint32 bucket
);
595 void resend_bucket(Signal
*, Uint32 bucket
, Uint32 gci
,
596 Uint32 page_pos
, Uint32 last_gci
);
597 void release_gci(Signal
*, Uint32 bucket
, Uint32 gci
);
599 Uint32 m_max_seen_gci
; // FIRE_TRIG_ORD
600 Uint32 m_max_sent_gci
; // FIRE_TRIG_ORD -> send
601 Uint32 m_last_complete_gci
; // SUB_GCP_COMPLETE_REP
602 Uint32 m_out_of_buffer_gci
;
603 Uint32 m_gcp_complete_rep_count
;
608 NodeBitmask m_subscribers
;
615 ArrayPool
<Gcp_record
> c_gcp_pool
;
616 DLFifoList
<Gcp_record
> c_gcp_list
;
630 Uint32 m_first_free_page
;
631 ArrayPool
<Page_chunk
> c_page_chunk_pool
;