mySQL 5.0.11 sources for tomato
[tomato.git] / release / src / router / mysql / storage / ndb / src / kernel / blocks / suma / Suma.hpp
blob6d0aead653da02a3527a0c1b6ef6fe707eed28c6
1 /* Copyright (c) 2003-2007 MySQL AB
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
16 #ifndef SUMA_H
17 #define SUMA_H
19 #include <ndb_limits.h>
20 #include <SimulatedBlock.hpp>
22 #include <NodeBitmask.hpp>
24 #include <SLList.hpp>
25 #include <DLList.hpp>
26 #include <DLFifoList.hpp>
27 #include <KeyTable.hpp>
28 #include <DataBuffer.hpp>
29 #include <SignalCounter.hpp>
30 #include <AttributeHeader.hpp>
31 #include <AttributeList.hpp>
33 #include <signaldata/UtilSequence.hpp>
34 #include <signaldata/SumaImpl.hpp>
35 #include <ndbapi/NdbDictionary.hpp>
37 class Suma : public SimulatedBlock {
38 BLOCK_DEFINES(Suma);
39 public:
40 Suma(Block_context& ctx);
41 virtual ~Suma();
43 /**
44 * Private interface
46 void execSUB_CREATE_REQ(Signal* signal);
47 void execSUB_REMOVE_REQ(Signal* signal);
49 void execSUB_START_REQ(Signal* signal);
50 void execSUB_STOP_REQ(Signal* signal);
52 void execSUB_SYNC_REQ(Signal* signal);
53 void execSUB_ABORT_SYNC_REQ(Signal* signal);
55 void execSUB_STOP_CONF(Signal* signal);
56 void execSUB_STOP_REF(Signal* signal);
58 /**
59 * Dict interface
61 #if 0
62 void execLIST_TABLES_REF(Signal* signal);
63 void execLIST_TABLES_CONF(Signal* signal);
64 #endif
65 void execGET_TABINFOREF(Signal* signal);
66 void execGET_TABINFO_CONF(Signal* signal);
68 void execGET_TABLEID_CONF(Signal* signal);
69 void execGET_TABLEID_REF(Signal* signal);
71 void execDROP_TAB_CONF(Signal* signal);
72 void execALTER_TAB_REQ(Signal* signal);
73 void execCREATE_TAB_CONF(Signal* signal);
74 /**
75 * Scan interface
77 void execSCAN_HBREP(Signal* signal);
78 void execSCAN_FRAGREF(Signal* signal);
79 void execSCAN_FRAGCONF(Signal* signal);
80 void execTRANSID_AI(Signal* signal);
81 void execSUB_SYNC_CONTINUE_REF(Signal* signal);
82 void execSUB_SYNC_CONTINUE_CONF(Signal* signal);
84 /**
85 * Trigger logging
87 void execTRIG_ATTRINFO(Signal* signal);
88 void execFIRE_TRIG_ORD(Signal* signal);
89 void execSUB_GCP_COMPLETE_REP(Signal* signal);
91 /**
92 * DIH signals
94 void execDI_FCOUNTREF(Signal* signal);
95 void execDI_FCOUNTCONF(Signal* signal);
96 void execDIGETPRIMREF(Signal* signal);
97 void execDIGETPRIMCONF(Signal* signal);
99 /**
100 * Trigger administration
102 void execCREATE_TRIG_REF(Signal* signal);
103 void execCREATE_TRIG_CONF(Signal* signal);
104 void execDROP_TRIG_REF(Signal* signal);
105 void execDROP_TRIG_CONF(Signal* signal);
108 * continueb
110 void execCONTINUEB(Signal* signal);
112 public:
114 void suma_ndbrequire(bool v);
116 typedef DataBuffer<15> TableList;
118 union FragmentDescriptor {
119 struct {
120 Uint16 m_fragmentNo;
121 Uint16 m_nodeId;
122 } m_fragDesc;
123 Uint32 m_dummy;
127 * Used when sending SCAN_FRAG
129 union AttributeDescriptor {
130 struct {
131 Uint16 attrId;
132 Uint16 unused;
133 } m_attrDesc;
134 Uint32 m_dummy;
137 struct Subscriber {
138 Uint32 m_senderRef;
139 Uint32 m_senderData;
140 Uint32 m_subPtrI; //reference to subscription
141 Uint32 nextList;
143 union { Uint32 nextPool; Uint32 prevList; };
145 typedef Ptr<Subscriber> SubscriberPtr;
148 * Subscriptions
151 struct Subscription {
152 Subscription() {}
153 Uint32 m_senderRef;
154 Uint32 m_senderData;
155 Uint32 m_subscriptionId;
156 Uint32 m_subscriptionKey;
157 Uint32 m_subscriptionType;
158 Uint16 m_options;
160 enum Options {
161 REPORT_ALL = 0x1,
162 REPORT_SUBSCRIBE = 0x2
165 enum State {
166 UNDEFINED,
167 LOCKED,
168 DEFINED,
169 DROPPED
171 State m_state;
172 Uint32 n_subscribers;
174 Uint32 nextHash;
175 union { Uint32 prevHash; Uint32 nextPool; };
177 Uint32 hashValue() const {
178 return m_subscriptionId + m_subscriptionKey;
181 bool equal(const Subscription & s) const {
182 return
183 m_subscriptionId == s.m_subscriptionId &&
184 m_subscriptionKey == s.m_subscriptionKey;
187 * The following holds the tables included
188 * in the subscription.
190 Uint32 m_tableId;
191 Uint32 m_table_ptrI;
192 Uint32 m_current_sync_ptrI;
194 typedef Ptr<Subscription> SubscriptionPtr;
196 class Table;
197 friend class Table;
198 typedef Ptr<Table> TablePtr;
200 struct SyncRecord {
201 SyncRecord(Suma& s, DataBuffer<15>::DataBufferPool & p)
202 : m_tableList(p), suma(s)
203 #ifdef ERROR_INSERT
204 , cerrorInsert(s.cerrorInsert)
205 #endif
208 void release();
210 Uint32 m_senderRef;
211 Uint32 m_senderData;
213 Uint32 m_subscriptionPtrI;
214 Uint32 m_error;
215 Uint32 m_currentTable;
216 TableList m_tableList; // Tables to sync
217 TableList::DataBufferIterator m_tableList_it;
220 * Sync data
222 Uint32 m_currentFragment; // Index in tabPtr.p->m_fragments
223 DataBuffer<15>::Head m_attributeList; // Attribute if other than default
224 DataBuffer<15>::Head m_tabList; // tables if other than default
226 Uint32 m_currentTableId; // Current table
227 Uint32 m_currentNoOfAttributes; // No of attributes for current table
229 void startScan(Signal*);
230 void nextScan(Signal*);
231 bool getNextFragment(TablePtr * tab, FragmentDescriptor * fd);
232 void completeScan(Signal*, int error= 0);
234 Suma & suma;
235 #ifdef ERROR_INSERT
236 UintR &cerrorInsert;
237 #endif
238 BlockNumber number() const { return suma.number(); }
239 void progError(int line, int cause, const char * extra) {
240 suma.progError(line, cause, extra);
243 Uint32 prevList; Uint32 ptrI;
244 union { Uint32 nextPool; Uint32 nextList; };
246 friend struct SyncRecord;
248 int initTable(Signal *signal,Uint32 tableId, TablePtr &tabPtr,
249 Ptr<SyncRecord> syncPtr);
250 int initTable(Signal *signal,Uint32 tableId, TablePtr &tabPtr,
251 SubscriberPtr subbPtr);
252 int initTable(Signal *signal,Uint32 tableId, TablePtr &tabPtr);
254 int completeOneSubscriber(Signal* signal, TablePtr tabPtr, SubscriberPtr subbPtr);
255 void completeAllSubscribers(Signal* signal, TablePtr tabPtr);
256 void completeInitTable(Signal* signal, TablePtr tabPtr);
258 struct Table {
259 Table() { m_tableId = ~0; n_subscribers = 0; }
260 void release(Suma&);
261 void checkRelease(Suma &suma);
263 DLList<Subscriber>::Head c_subscribers;
264 DLList<SyncRecord>::Head c_syncRecords;
266 enum State {
267 UNDEFINED,
268 DEFINING,
269 DEFINED,
270 DROPPED,
271 ALTERED
273 State m_state;
275 Uint32 m_ptrI;
276 SubscriberPtr m_drop_subbPtr;
278 Uint32 n_subscribers;
279 bool m_reportAll;
281 bool parseTable(SegmentedSectionPtr ptr, Suma &suma);
283 * Create triggers
285 int setupTrigger(Signal* signal, Suma &suma);
286 void completeTrigger(Signal* signal);
287 void createAttributeMask(AttributeMask&, Suma &suma);
290 * Drop triggers
292 void dropTrigger(Signal* signal,Suma&);
293 void runDropTrigger(Signal* signal, Uint32 triggerId,Suma&);
296 * Sync meta
298 #if 0
299 void runLIST_TABLES_CONF(Signal* signal);
300 #endif
302 union { Uint32 m_tableId; Uint32 key; };
303 Uint32 m_schemaVersion;
304 Uint8 m_hasTriggerDefined[3]; // Insert/Update/Delete
305 Uint8 m_hasOutstandingTriggerReq[3]; // Insert/Update/Delete
306 Uint32 m_triggerIds[3]; // Insert/Update/Delete
308 Uint32 m_error;
310 * Default order in which to ask for attributes during scan
311 * 1) Fixed, not nullable
312 * 2) Rest
314 DataBuffer<15>::Head m_attributes; // Attribute id's
317 * Fragments
319 Uint32 m_fragCount;
320 DataBuffer<15>::Head m_fragments; // Fragment descriptors
323 * Hash table stuff
325 Uint32 nextHash;
326 union { Uint32 prevHash; Uint32 nextPool; };
327 Uint32 hashValue() const {
328 return m_tableId;
330 bool equal(const Table& rec) const {
331 return m_tableId == rec.m_tableId;
338 DLList<Subscriber> c_metaSubscribers;
339 DLList<Subscriber> c_removeDataSubscribers;
342 * Lists
344 KeyTable<Table> c_tables;
345 DLHashTable<Subscription> c_subscriptions;
348 * Pools
350 ArrayPool<Subscriber> c_subscriberPool;
351 ArrayPool<Table> c_tablePool;
352 ArrayPool<Subscription> c_subscriptionPool;
353 ArrayPool<SyncRecord> c_syncPool;
354 DataBuffer<15>::DataBufferPool c_dataBufferPool;
356 NodeBitmask c_failedApiNodes;
359 * Functions
361 bool removeSubscribersOnNode(Signal *signal, Uint32 nodeId);
363 bool checkTableTriggers(SegmentedSectionPtr ptr);
365 void addTableId(Uint32 TableId,
366 SubscriptionPtr subPtr, SyncRecord *psyncRec);
368 void sendSubIdRef(Signal* signal,Uint32 senderRef,Uint32 senderData,Uint32 errorCode);
369 void sendSubCreateRef(Signal* signal, Uint32 errorCode);
370 void sendSubStartRef(Signal*, SubscriberPtr, Uint32 errorCode, SubscriptionData::Part);
371 void sendSubStartRef(Signal* signal, Uint32 errorCode);
372 void sendSubStopRef(Signal* signal, Uint32 errorCode);
373 void sendSubSyncRef(Signal* signal, Uint32 errorCode);
374 void sendSubRemoveRef(Signal* signal, const SubRemoveReq& ref,
375 Uint32 errorCode);
376 void sendSubStartComplete(Signal*, SubscriberPtr, Uint32,
377 SubscriptionData::Part);
378 void sendSubStopComplete(Signal*, SubscriberPtr);
379 void sendSubStopReq(Signal* signal, bool unlock= false);
381 void completeSubRemove(SubscriptionPtr subPtr);
383 void reportAllSubscribers(Signal *signal,
384 NdbDictionary::Event::_TableEvent table_event,
385 SubscriptionPtr subPtr,
386 SubscriberPtr subbPtr);
388 Uint32 getFirstGCI(Signal* signal);
391 * Table admin
393 void convertNameToId( SubscriptionPtr subPtr, Signal * signal);
396 * Public interface
398 void execCREATE_SUBSCRIPTION_REQ(Signal* signal);
399 void execDROP_SUBSCRIPTION_REQ(Signal* signal);
401 void execSTART_SUBSCRIPTION_REQ(Signal* signal);
402 void execSTOP_SUBSCRIPTION_REQ(Signal* signal);
404 void execSYNC_SUBSCRIPTION_REQ(Signal* signal);
405 void execABORT_SYNC_REQ(Signal* signal);
408 * Framework signals
411 void getNodeGroupMembers(Signal* signal);
413 void execREAD_CONFIG_REQ(Signal* signal);
415 void execSTTOR(Signal* signal);
416 void sendSTTORRY(Signal*);
417 void execNDB_STTOR(Signal* signal);
418 void execDUMP_STATE_ORD(Signal* signal);
419 void execREAD_NODESCONF(Signal* signal);
420 void execNODE_FAILREP(Signal* signal);
421 void execINCL_NODEREQ(Signal* signal);
422 void execSIGNAL_DROPPED_REP(Signal* signal);
423 void execAPI_START_REP(Signal* signal);
424 void execAPI_FAILREQ(Signal* signal) ;
426 void execSUB_GCP_COMPLETE_ACK(Signal* signal);
429 * Controller interface
431 void execSUB_CREATE_REF(Signal* signal);
432 void execSUB_CREATE_CONF(Signal* signal);
434 void execSUB_DROP_REF(Signal* signal);
435 void execSUB_DROP_CONF(Signal* signal);
437 void execSUB_START_REF(Signal* signal);
438 void execSUB_START_CONF(Signal* signal);
440 void execSUB_ABORT_SYNC_REF(Signal* signal);
441 void execSUB_ABORT_SYNC_CONF(Signal* signal);
443 void execSUMA_START_ME_REQ(Signal* signal);
444 void execSUMA_START_ME_REF(Signal* signal);
445 void execSUMA_START_ME_CONF(Signal* signal);
446 void execSUMA_HANDOVER_REQ(Signal* signal);
447 void execSUMA_HANDOVER_REF(Signal* signal);
448 void execSUMA_HANDOVER_CONF(Signal* signal);
451 * Subscription generation interface
453 void createSequence(Signal* signal);
454 void createSequenceReply(Signal* signal,
455 UtilSequenceConf* conf,
456 UtilSequenceRef* ref);
457 void execUTIL_SEQUENCE_CONF(Signal* signal);
458 void execUTIL_SEQUENCE_REF(Signal* signal);
459 void execCREATE_SUBID_REQ(Signal* signal);
462 * for Suma that is restarting another
465 struct Restart {
466 Restart(Suma& s);
468 Suma & suma;
469 Uint32 nodeId;
471 DLHashTable<Subscription>::Iterator c_subIt;
472 KeyTable<Table>::Iterator c_tabIt;
474 void progError(int line, int cause, const char * extra) {
475 suma.progError(line, cause, extra);
478 void resetNode(Uint32 sumaRef);
479 void runSUMA_START_ME_REQ(Signal*, Uint32 sumaRef);
480 void startNode(Signal*, Uint32 sumaRef);
482 void createSubscription(Signal* signal, Uint32 sumaRef);
483 void nextSubscription(Signal* signal, Uint32 sumaRef);
484 void runSUB_CREATE_CONF(Signal* signal);
485 void completeSubscription(Signal* signal, Uint32 sumaRef);
487 void startSubscriber(Signal* signal, Uint32 sumaRef);
488 void nextSubscriber(Signal* signal, Uint32 sumaRef, SubscriberPtr subbPtr);
489 void sendSubStartReq(SubscriptionPtr subPtr, SubscriberPtr subbPtr,
490 Signal* signal, Uint32 sumaRef);
491 void runSUB_START_CONF(Signal* signal);
492 void completeSubscriber(Signal* signal, Uint32 sumaRef);
494 void completeRestartingNode(Signal* signal, Uint32 sumaRef);
495 void resetRestart(Signal* signal);
496 } Restart;
498 private:
499 friend class Restart;
501 * Variables
503 NodeId c_masterNodeId;
504 NdbNodeBitmask c_alive_nodes;
507 * for restarting Suma not to start sending data too early
509 struct Startup
511 bool m_wait_handover;
512 Uint32 m_restart_server_node_id;
513 NdbNodeBitmask m_handover_nodes;
514 } c_startup;
516 NodeBitmask c_connected_nodes; // (NODE/API) START REP / (API/NODE) FAIL REQ
517 NodeBitmask c_subscriber_nodes; //
520 * for all Suma's to keep track of other Suma's in Node group
522 Uint32 c_nodeGroup;
523 Uint32 c_noNodesInGroup;
524 Uint32 c_nodesInGroup[MAX_REPLICAS];
525 NdbNodeBitmask c_nodes_in_nodegroup_mask; // NodeId's of nodes in nodegroup
527 void send_start_me_req(Signal* signal);
528 void check_start_handover(Signal* signal);
529 void send_handover_req(Signal* signal);
531 Uint32 get_responsible_node(Uint32 B) const;
532 Uint32 get_responsible_node(Uint32 B, const NdbNodeBitmask& mask) const;
533 bool check_switchover(Uint32 bucket, Uint32 gci);
535 public:
536 struct Page_pos
538 Uint32 m_page_id;
539 Uint32 m_page_pos;
540 Uint32 m_max_gci; // max gci on page
541 Uint32 m_last_gci; // last gci on page
543 private:
545 struct Bucket
547 enum {
548 BUCKET_STARTING = 0x1 // On starting node
549 ,BUCKET_HANDOVER = 0x2 // On running node
550 ,BUCKET_TAKEOVER = 0x4 // On takeing over node
551 ,BUCKET_RESEND = 0x8 // On takeing over node
553 Uint16 m_state;
554 Uint16 m_switchover_node;
555 Uint16 m_nodes[MAX_REPLICAS];
556 Uint32 m_switchover_gci;
557 Uint32 m_max_acked_gci;
558 Uint32 m_buffer_tail; // Page
559 Page_pos m_buffer_head;
562 struct Buffer_page
564 STATIC_CONST( DATA_WORDS = 8192 - 9);
565 Uint32 _tupdata1;
566 Uint32 _tupdata2;
567 Uint32 _tupdata3;
568 Uint32 _tupdata4;
569 Uint32 m_page_state; // Used by TUP buddy algorithm
570 Uint32 m_page_chunk_ptr_i;
571 Uint32 m_next_page;
572 Uint32 m_words_used; //
573 Uint32 m_max_gci; //
574 Uint32 m_data[DATA_WORDS];
577 STATIC_CONST( NO_OF_BUCKETS = 24 ); // 24 = 4*3*2*1!
578 Uint32 c_no_of_buckets;
579 struct Bucket c_buckets[NO_OF_BUCKETS];
581 STATIC_CONST( BUCKET_MASK_SIZE = (((NO_OF_BUCKETS+31)>> 5)) );
582 typedef Bitmask<BUCKET_MASK_SIZE> Bucket_mask;
583 Bucket_mask m_active_buckets;
584 Bucket_mask m_switchover_buckets;
586 class Dbtup* m_tup;
587 void init_buffers();
588 Uint32* get_buffer_ptr(Signal*, Uint32 buck, Uint32 gci, Uint32 sz);
589 Uint32 seize_page();
590 void free_page(Uint32 page_id, Buffer_page* page);
591 void out_of_buffer(Signal*);
592 void out_of_buffer_release(Signal* signal, Uint32 buck);
594 void start_resend(Signal*, Uint32 bucket);
595 void resend_bucket(Signal*, Uint32 bucket, Uint32 gci,
596 Uint32 page_pos, Uint32 last_gci);
597 void release_gci(Signal*, Uint32 bucket, Uint32 gci);
599 Uint32 m_max_seen_gci; // FIRE_TRIG_ORD
600 Uint32 m_max_sent_gci; // FIRE_TRIG_ORD -> send
601 Uint32 m_last_complete_gci; // SUB_GCP_COMPLETE_REP
602 Uint32 m_out_of_buffer_gci;
603 Uint32 m_gcp_complete_rep_count;
605 struct Gcp_record
607 Uint32 m_gci;
608 NodeBitmask m_subscribers;
609 union {
610 Uint32 nextPool;
611 Uint32 nextList;
613 Uint32 prevList;
615 ArrayPool<Gcp_record> c_gcp_pool;
616 DLFifoList<Gcp_record> c_gcp_list;
618 struct Page_chunk
620 Uint32 m_page_id;
621 Uint32 m_size;
622 Uint32 m_free;
623 union {
624 Uint32 nextPool;
625 Uint32 nextList;
627 Uint32 prevList;
630 Uint32 m_first_free_page;
631 ArrayPool<Page_chunk> c_page_chunk_pool;
634 #endif