1 /* Copyright (c) 2003-2007 MySQL AB
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
16 #include <HugoOperations.hpp>
18 int HugoOperations::startTransaction(Ndb
* pNdb
,
19 const NdbDictionary::Table
*table
,
20 const char *keyData
, Uint32 keyLen
){
23 ndbout
<< "HugoOperations::startTransaction, pTrans != NULL" << endl
;
26 pTrans
= pNdb
->startTransaction(table
, keyData
, keyLen
);
28 const NdbError err
= pNdb
->getNdbError();
35 int HugoOperations::setTransaction(NdbTransaction
* new_trans
, bool not_null_ok
){
37 if (pTrans
!= NULL
&& !not_null_ok
){
38 ndbout
<< "HugoOperations::startTransaction, pTrans != NULL" << endl
;
49 HugoOperations::setTransactionId(Uint64 id
){
51 pTrans
->setTransactionId(id
);
55 int HugoOperations::closeTransaction(Ndb
* pNdb
){
57 UtilTransactions::closeTransaction(pNdb
);
59 m_result_sets
.clear();
60 m_executed_result_sets
.clear();
65 NdbConnection
* HugoOperations::getTransaction(){
69 int HugoOperations::pkReadRecord(Ndb
* pNdb
,
72 NdbOperation::LockMode lm
){
74 allocRows(numRecords
);
77 NdbOperation
* pOp
= 0;
80 for(int r
=0; r
< numRecords
; r
++){
84 pOp
= getOperation(pTrans
, NdbOperation::ReadRequest
);
87 ERR(pTrans
->getNdbError());
93 case NdbOperation::LM_Read
:
94 case NdbOperation::LM_Exclusive
:
95 case NdbOperation::LM_CommittedRead
:
96 case NdbOperation::LM_SimpleRead
:
97 if(idx
&& idx
->getType() == NdbDictionary::Index::OrderedIndex
&&
100 pIndexScanOp
= ((NdbIndexScanOperation
*)pOp
);
101 check
= pIndexScanOp
->readTuples(lm
);
104 check
= pOp
->readTuple(lm
);
107 lm
= (NdbOperation::LockMode
)((rand() >> 16) & 3);
112 ERR(pTrans
->getNdbError());
116 // Define primary keys
117 if (equalForRow(pOp
, r
+recordNo
) != 0)
121 pIndexScanOp
->end_of_bound(r
);
123 if(r
== 0 || pIndexScanOp
== 0)
125 // Define attributes to read
126 for(a
= 0; a
<tab
.getNoOfColumns(); a
++){
127 if((rows
[r
]->attributeStore(a
) =
128 pOp
->getValue(tab
.getColumn(a
)->getName())) == 0) {
129 ERR(pTrans
->getNdbError());
139 int HugoOperations::pkUpdateRecord(Ndb
* pNdb
,
143 allocRows(numRecords
);
145 for(int r
=0; r
< numRecords
; r
++){
146 NdbOperation
* pOp
= getOperation(pTrans
, NdbOperation::UpdateRequest
);
148 ERR(pTrans
->getNdbError());
152 check
= pOp
->updateTuple();
154 ERR(pTrans
->getNdbError());
158 if(setValues(pOp
, r
+recordNo
, updatesValue
) != NDBT_OK
)
167 HugoOperations::setValues(NdbOperation
* pOp
, int rowId
, int updateId
)
169 // Define primary keys
171 if (equalForRow(pOp
, rowId
) != 0)
174 for(a
= 0; a
<tab
.getNoOfColumns(); a
++){
175 if (tab
.getColumn(a
)->getPrimaryKey() == false){
176 if(setValueForAttr(pOp
, a
, rowId
, updateId
) != 0){
177 ERR(pTrans
->getNdbError());
186 int HugoOperations::pkInsertRecord(Ndb
* pNdb
,
192 for(int r
=0; r
< numRecords
; r
++){
193 NdbOperation
* pOp
= getOperation(pTrans
, NdbOperation::InsertRequest
);
195 ERR(pTrans
->getNdbError());
199 check
= pOp
->insertTuple();
201 ERR(pTrans
->getNdbError());
205 if(setValues(pOp
, r
+recordNo
, updatesValue
) != NDBT_OK
)
213 int HugoOperations::pkWriteRecord(Ndb
* pNdb
,
219 for(int r
=0; r
< numRecords
; r
++){
220 NdbOperation
* pOp
= pTrans
->getNdbOperation(tab
.getName());
222 ERR(pTrans
->getNdbError());
226 check
= pOp
->writeTuple();
228 ERR(pTrans
->getNdbError());
232 // Define primary keys
233 if (equalForRow(pOp
, r
+recordNo
) != 0)
236 // Define attributes to update
237 for(a
= 0; a
<tab
.getNoOfColumns(); a
++){
238 if (tab
.getColumn(a
)->getPrimaryKey() == false){
239 if(setValueForAttr(pOp
, a
, recordNo
+r
, updatesValue
) != 0){
240 ERR(pTrans
->getNdbError());
249 int HugoOperations::pkWritePartialRecord(Ndb
* pNdb
,
254 for(int r
=0; r
< numRecords
; r
++){
255 NdbOperation
* pOp
= pTrans
->getNdbOperation(tab
.getName());
257 ERR(pTrans
->getNdbError());
261 check
= pOp
->writeTuple();
263 ERR(pTrans
->getNdbError());
267 // Define primary keys
268 if (equalForRow(pOp
, r
+recordNo
) != 0)
274 int HugoOperations::pkDeleteRecord(Ndb
* pNdb
,
279 for(int r
=0; r
< numRecords
; r
++){
280 NdbOperation
* pOp
= getOperation(pTrans
, NdbOperation::DeleteRequest
);
282 ERR(pTrans
->getNdbError());
286 check
= pOp
->deleteTuple();
288 ERR(pTrans
->getNdbError());
292 // Define primary keys
293 if (equalForRow(pOp
, r
+recordNo
) != 0)
299 int HugoOperations::execute_Commit(Ndb
* pNdb
,
303 check
= pTrans
->execute(Commit
, eao
);
305 const NdbError err
= pTrans
->getNdbError();
306 if( check
== -1 || err
.code
) {
308 NdbOperation
* pOp
= pTrans
->getNdbErrorOperation();
310 const NdbError err2
= pOp
->getNdbError();
318 for(int i
= 0; i
<m_result_sets
.size(); i
++){
319 m_executed_result_sets
.push_back(m_result_sets
[i
]);
321 int rows
= m_result_sets
[i
].records
;
322 NdbScanOperation
* rs
= m_result_sets
[i
].m_result_set
;
323 int res
= rs
->nextResult();
328 const NdbError err
= pTrans
->getNdbError();
330 return (err
.code
> 0 ? err
.code
: NDBT_FAILED
);
339 m_result_sets
[i
].records
--;
344 m_result_sets
.clear();
349 int HugoOperations::execute_NoCommit(Ndb
* pNdb
, AbortOption eao
){
352 check
= pTrans
->execute(NoCommit
, eao
);
354 const NdbError err
= pTrans
->getNdbError();
355 if( check
== -1 || err
.code
) {
357 const NdbOperation
* pOp
= pTrans
->getNdbErrorOperation();
360 const NdbError err2
= pOp
->getNdbError();
363 pOp
= pTrans
->getNextCompletedOperation(pOp
);
370 for(int i
= 0; i
<m_result_sets
.size(); i
++){
371 m_executed_result_sets
.push_back(m_result_sets
[i
]);
373 int rows
= m_result_sets
[i
].records
;
374 NdbScanOperation
* rs
= m_result_sets
[i
].m_result_set
;
375 int res
= rs
->nextResult();
380 const NdbError err
= pTrans
->getNdbError();
382 return (err
.code
> 0 ? err
.code
: NDBT_FAILED
);
396 m_result_sets
.clear();
401 int HugoOperations::execute_Rollback(Ndb
* pNdb
){
403 check
= pTrans
->execute(Rollback
);
405 const NdbError err
= pTrans
->getNdbError();
413 HugoOperations_async_callback(int res
, NdbTransaction
* pCon
, void* ho
)
415 ((HugoOperations
*)ho
)->callback(res
, pCon
);
419 HugoOperations::callback(int res
, NdbTransaction
* pCon
)
421 assert(pCon
== pTrans
);
425 m_async_return
= pCon
->getNdbError().code
;
434 HugoOperations::execute_async(Ndb
* pNdb
, NdbTransaction::ExecType et
,
435 NdbOperation::AbortOption eao
){
438 pTrans
->executeAsynchPrepare(et
,
439 HugoOperations_async_callback
,
443 pNdb
->sendPreparedTransactions();
449 HugoOperations::execute_async_prepare(Ndb
* pNdb
, NdbTransaction::ExecType et
,
450 NdbOperation::AbortOption eao
){
453 pTrans
->executeAsynchPrepare(et
,
454 HugoOperations_async_callback
,
462 HugoOperations::wait_async(Ndb
* pNdb
, int timeout
)
464 volatile int * wait
= &m_async_reply
;
467 pNdb
->sendPollNdb(1000);
472 ndbout
<< "ERROR: " << pNdb
->getNdbError(m_async_return
) << endl
;
473 return m_async_return
;
476 ndbout_c("wait returned nothing...");
480 HugoOperations::HugoOperations(const NdbDictionary::Table
& _tab
,
481 const NdbDictionary::Index
* idx
):
482 UtilTransactions(_tab
, idx
),
487 HugoOperations::~HugoOperations(){
497 HugoOperations::equalForRow(NdbOperation
* pOp
, int row
)
499 for(int a
= 0; a
<tab
.getNoOfColumns(); a
++)
501 if (tab
.getColumn(a
)->getPrimaryKey() == true)
503 if(equalForAttr(pOp
, a
, row
) != 0)
505 ERR(pOp
->getNdbError());
513 int HugoOperations::equalForAttr(NdbOperation
* pOp
,
517 const NdbDictionary::Column
* attr
= tab
.getColumn(attrId
);
518 if (attr
->getPrimaryKey() == false){
519 g_info
<< "Can't call equalForAttr on non PK attribute" << endl
;
523 int len
= attr
->getSizeInBytes();
525 memset(buf
, 0, sizeof(buf
));
527 const char * value
= calc
.calcValue(rowId
, attrId
, 0, buf
, len
, &real_len
);
528 return pOp
->equal( attr
->getName(), value
, real_len
);
531 int HugoOperations::setValueForAttr(NdbOperation
* pOp
,
536 const NdbDictionary::Column
* attr
= tab
.getColumn(attrId
);
538 int len
= attr
->getSizeInBytes();
540 memset(buf
, 0, sizeof(buf
));
542 const char * value
= calc
.calcValue(rowId
, attrId
,
543 updateId
, buf
, len
, &real_len
);
544 return pOp
->setValue( attr
->getName(), value
, real_len
);
548 HugoOperations::verifyUpdatesValue(int updatesValue
, int _numRows
){
549 _numRows
= (_numRows
== 0 ? rows
.size() : _numRows
);
551 int result
= NDBT_OK
;
553 for(int i
= 0; i
<_numRows
; i
++){
554 if(calc
.verifyRowValues(rows
[i
]) != NDBT_OK
){
555 g_err
<< "Inconsistent row"
556 << endl
<< "\t" << rows
[i
]->c_str().c_str() << endl
;
557 result
= NDBT_FAILED
;
561 if(calc
.getUpdatesValue(rows
[i
]) != updatesValue
){
562 result
= NDBT_FAILED
;
563 g_err
<< "Invalid updates value for row " << i
<< endl
564 << " updatesValue: " << updatesValue
<< endl
565 << " calc.getUpdatesValue: " << calc
.getUpdatesValue(rows
[i
]) << endl
566 << rows
[i
]->c_str().c_str() << endl
;
572 g_err
<< "No rows -> Invalid updates value" << endl
;
579 void HugoOperations::allocRows(int _numRows
){
581 g_info
<< "Illegal value for num rows : " << _numRows
<< endl
;
585 for(int b
=rows
.size(); b
<_numRows
; b
++){
586 rows
.push_back(new NDBT_ResultRow(tab
));
590 void HugoOperations::deallocRows(){
591 while(rows
.size() > 0){
593 rows
.erase(rows
.size() - 1);
597 int HugoOperations::saveCopyOfRecord(int numRecords
){
599 if (numRecords
> (int)rows
.size())
602 for (int i
= 0; i
< numRecords
; i
++){
603 savedRecords
.push_back(rows
[i
]->c_str());
608 BaseString
HugoOperations::getRecordStr(int recordNum
){
609 if (recordNum
> (int)rows
.size())
611 return rows
[recordNum
]->c_str();
614 int HugoOperations::getRecordGci(int recordNum
){
615 return pTrans
->getGCI();
619 int HugoOperations::compareRecordToCopy(int numRecords
){
620 if (numRecords
> (int)rows
.size())
622 if ((unsigned)numRecords
> savedRecords
.size())
625 int result
= NDBT_OK
;
626 for (int i
= 0; i
< numRecords
; i
++){
627 BaseString str
= rows
[i
]->c_str();
628 ndbout
<< "row["<<i
<<"]: " << str
<< endl
;
629 ndbout
<< "sav["<<i
<<"]: " << savedRecords
[i
] << endl
;
630 if (savedRecords
[i
] == str
){
633 result
= NDBT_FAILED
;
640 HugoOperations::refresh() {
641 NdbTransaction
* t
= getTransaction();
646 int HugoOperations::indexReadRecords(Ndb
*, const char * idxName
, int recordNo
,
651 allocRows(numRecords
);
653 for(int r
=0; r
< numRecords
; r
++){
654 NdbOperation
* pOp
= pTrans
->getNdbIndexOperation(idxName
, tab
.getName());
656 ERR(pTrans
->getNdbError());
660 if (exclusive
== true)
661 check
= pOp
->readTupleExclusive();
663 check
= pOp
->readTuple();
665 ERR(pTrans
->getNdbError());
669 // Define primary keys
670 if (equalForRow(pOp
, r
+recordNo
) != 0)
673 // Define attributes to read
674 for(a
= 0; a
<tab
.getNoOfColumns(); a
++){
675 if((rows
[r
]->attributeStore(a
) =
676 pOp
->getValue(tab
.getColumn(a
)->getName())) == 0) {
677 ERR(pTrans
->getNdbError());
686 HugoOperations::indexUpdateRecord(Ndb
*,
687 const char * idxName
,
692 allocRows(numRecords
);
694 for(int r
=0; r
< numRecords
; r
++){
695 NdbOperation
* pOp
= pTrans
->getNdbIndexOperation(idxName
, tab
.getName());
697 ERR(pTrans
->getNdbError());
701 check
= pOp
->updateTuple();
703 ERR(pTrans
->getNdbError());
707 // Define primary keys
708 if (equalForRow(pOp
, r
+recordNo
) != 0)
711 // Define attributes to update
712 for(a
= 0; a
<tab
.getNoOfColumns(); a
++){
713 if (tab
.getColumn(a
)->getPrimaryKey() == false){
714 if(setValueForAttr(pOp
, a
, recordNo
+r
, updatesValue
) != 0){
715 ERR(pTrans
->getNdbError());
725 HugoOperations::scanReadRecords(Ndb
* pNdb
, NdbScanOperation::LockMode lm
,
729 NdbScanOperation
* pOp
= pTrans
->getNdbScanOperation(tab
.getName());
734 if(pOp
->readTuples(lm
, 0, 1)){
738 for(int a
= 0; a
<tab
.getNoOfColumns(); a
++){
739 if((rows
[0]->attributeStore(a
) =
740 pOp
->getValue(tab
.getColumn(a
)->getName())) == 0) {
741 ERR(pTrans
->getNdbError());
746 RsPair p
= {pOp
, records
};
747 m_result_sets
.push_back(p
);
752 template class Vector
<HugoOperations::RsPair
>;