2 * Copyright (C) 2005-2009 MaNGOS <http://getmangos.com/>
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2 of the License, or
7 * (at your option) any later version.
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the Free Software
16 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22 #include "Policies/SingletonImp.h"
23 #include "Platform/Define.h"
24 #include "../src/zthread/ThreadImpl.h"
25 #include "DatabaseEnv.h"
26 #include "Database/PGSQLDelayThread.h"
27 #include "Database/SqlOperations.h"
30 void DatabasePostgre::ThreadStart()
34 void DatabasePostgre::ThreadEnd()
38 size_t DatabasePostgre::db_count
= 0;
40 DatabasePostgre::DatabasePostgre() : Database(), mPGconn(NULL
)
42 // before first connection
46 if (!PQisthreadsafe())
48 sLog
.outError("FATAL ERROR: PostgreSQL libpq isn't thread-safe.");
54 DatabasePostgre::~DatabasePostgre()
67 bool DatabasePostgre::Initialize(const char *infoString
)
69 if(!Database::Initialize(infoString
))
76 Tokens tokens
= StrSplit(infoString
, ";");
78 Tokens::iterator iter
;
80 std::string host
, port_or_socket
, user
, password
, database
;
82 iter
= tokens
.begin();
84 if(iter
!= tokens
.end())
86 if(iter
!= tokens
.end())
87 port_or_socket
= *iter
++;
88 if(iter
!= tokens
.end())
90 if(iter
!= tokens
.end())
92 if(iter
!= tokens
.end())
95 mPGconn
= PQsetdbLogin(host
.c_str(), port_or_socket
.c_str(), NULL
, NULL
, database
.c_str(), user
.c_str(), password
.c_str());
97 /* check to see that the backend connection was successfully made */
98 if (PQstatus(mPGconn
) != CONNECTION_OK
)
100 sLog
.outError( "Could not connect to Postgre database at %s: %s",
101 host
.c_str(), PQerrorMessage(mPGconn
));
107 sLog
.outDetail( "Connected to Postgre database at %s",
109 sLog
.outString( "PostgreSQL server ver: %d",PQserverVersion(mPGconn
));
115 QueryResult
* DatabasePostgre::Query(const char *sql
)
121 uint32 fieldCount
= 0;
123 // guarded block for thread-safe request
124 ZThread::Guard
<ZThread::FastMutex
> query_connection_guard(mMutex
);
126 uint32 _s
= getMSTime();
129 PGresult
* result
= PQexec(mPGconn
, sql
);
135 if (PQresultStatus(result
) != PGRES_TUPLES_OK
)
137 sLog
.outErrorDb( "SQL : %s", sql
);
138 sLog
.outErrorDb( "SQL %s", PQerrorMessage(mPGconn
));
145 sLog
.outDebug("[%u ms] SQL: %s", getMSTime() - _s
, sql
);
149 rowCount
= PQntuples(result
);
150 fieldCount
= PQnfields(result
);
159 QueryResultPostgre
* queryResult
= new QueryResultPostgre(result
, rowCount
, fieldCount
);
160 queryResult
->NextRow();
165 bool DatabasePostgre::Execute(const char *sql
)
171 // don't use queued execution if it has not been initialized
172 if (!m_threadBody
) return DirectExecute(sql
);
174 tranThread
= ZThread::ThreadImpl::current(); // owner of this transaction
175 TransactionQueues::iterator i
= m_tranQueues
.find(tranThread
);
176 if (i
!= m_tranQueues
.end() && i
->second
!= NULL
)
177 { // Statement for transaction
178 i
->second
->DelayExecute(sql
);
182 // Simple sql statement
183 m_threadBody
->Delay(new SqlStatement(sql
));
189 bool DatabasePostgre::DirectExecute(const char* sql
)
194 // guarded block for thread-safe request
195 ZThread::Guard
<ZThread::FastMutex
> query_connection_guard(mMutex
);
197 uint32 _s
= getMSTime();
199 PGresult
*res
= PQexec(mPGconn
, sql
);
200 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
202 sLog
.outErrorDb( "SQL: %s", sql
);
203 sLog
.outErrorDb( "SQL %s", PQerrorMessage(mPGconn
) );
209 sLog
.outDebug("[%u ms] SQL: %s", getMSTime() - _s
, sql
);
219 bool DatabasePostgre::_TransactionCmd(const char *sql
)
224 PGresult
*res
= PQexec(mPGconn
, sql
);
225 if (PQresultStatus(res
) != PGRES_COMMAND_OK
)
227 sLog
.outError("SQL: %s", sql
);
228 sLog
.outError("SQL ERROR: %s", PQerrorMessage(mPGconn
));
233 DEBUG_LOG("SQL: %s", sql
);
238 bool DatabasePostgre::BeginTransaction()
242 // don't use queued execution if it has not been initialized
245 if (tranThread
==ZThread::ThreadImpl::current())
246 return false; // huh? this thread already started transaction
248 if (!_TransactionCmd("START TRANSACTION"))
250 mMutex
.release(); // can't start transaction
255 // transaction started
256 tranThread
= ZThread::ThreadImpl::current(); // owner of this transaction
257 TransactionQueues::iterator i
= m_tranQueues
.find(tranThread
);
258 if (i
!= m_tranQueues
.end() && i
->second
!= NULL
)
259 // If for thread exists queue and also contains transaction
260 // delete that transaction (not allow trans in trans)
263 m_tranQueues
[tranThread
] = new SqlTransaction();
268 bool DatabasePostgre::CommitTransaction()
273 // don't use queued execution if it has not been initialized
276 if (tranThread
!=ZThread::ThreadImpl::current())
278 bool _res
= _TransactionCmd("COMMIT");
283 tranThread
= ZThread::ThreadImpl::current();
284 TransactionQueues::iterator i
= m_tranQueues
.find(tranThread
);
285 if (i
!= m_tranQueues
.end() && i
->second
!= NULL
)
287 m_threadBody
->Delay(i
->second
);
295 bool DatabasePostgre::RollbackTransaction()
299 // don't use queued execution if it has not been initialized
302 if (tranThread
!=ZThread::ThreadImpl::current())
304 bool _res
= _TransactionCmd("ROLLBACK");
309 tranThread
= ZThread::ThreadImpl::current();
310 TransactionQueues::iterator i
= m_tranQueues
.find(tranThread
);
311 if (i
!= m_tranQueues
.end() && i
->second
!= NULL
)
319 unsigned long DatabasePostgre::escape_string(char *to
, const char *from
, unsigned long length
)
321 if (!mPGconn
|| !to
|| !from
|| !length
)
324 return PQescapeString(to
, from
, length
);
327 void DatabasePostgre::InitDelayThread()
329 assert(!m_delayThread
);
331 //New delay thread for delay execute
332 m_delayThread
= new ZThread::Thread(m_threadBody
= new PGSQLDelayThread(this));
335 void DatabasePostgre::HaltDelayThread()
337 if (!m_threadBody
|| !m_delayThread
) return;
339 m_threadBody
->Stop(); //Stop event
340 m_delayThread
->wait(); //Wait for flush to DB
341 delete m_delayThread
; //This also deletes m_threadBody
342 m_delayThread
= NULL
;