mySQL 5.0.11 sources for tomato
[tomato.git] / release / src / router / mysql / storage / ndb / src / common / transporter / SHM_Transporter.cpp
blob42bb784166ea1e3ce747ea05de1164acf0e5fe0b
1 /* Copyright (c) 2003-2006 MySQL AB
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
17 #include <ndb_global.h>
19 #include "SHM_Transporter.hpp"
20 #include "TransporterInternalDefinitions.hpp"
21 #include <TransporterCallback.hpp>
22 #include <NdbSleep.h>
23 #include <NdbOut.hpp>
25 #include <InputStream.hpp>
26 #include <OutputStream.hpp>
28 extern int g_ndb_shm_signum;
30 SHM_Transporter::SHM_Transporter(TransporterRegistry &t_reg,
31 const char *lHostName,
32 const char *rHostName,
33 int r_port,
34 bool isMgmConnection_arg,
35 NodeId lNodeId,
36 NodeId rNodeId,
37 NodeId serverNodeId,
38 bool checksum,
39 bool signalId,
40 key_t _shmKey,
41 Uint32 _shmSize) :
42 Transporter(t_reg, tt_SHM_TRANSPORTER,
43 lHostName, rHostName, r_port, isMgmConnection_arg,
44 lNodeId, rNodeId, serverNodeId,
45 0, false, checksum, signalId),
46 shmKey(_shmKey),
47 shmSize(_shmSize)
49 #ifndef NDB_WIN32
50 shmId= 0;
51 #endif
52 _shmSegCreated = false;
53 _attached = false;
55 shmBuf = 0;
56 reader = 0;
57 writer = 0;
59 setupBuffersDone=false;
60 #ifdef DEBUG_TRANSPORTER
61 printf("shm key (%d - %d) = %d\n", lNodeId, rNodeId, shmKey);
62 #endif
63 m_signal_threshold = 4096;
66 SHM_Transporter::~SHM_Transporter(){
67 doDisconnect();
70 bool
71 SHM_Transporter::initTransporter(){
72 if (g_ndb_shm_signum)
73 return true;
74 return false;
77 void
78 SHM_Transporter::setupBuffers(){
79 Uint32 sharedSize = 0;
80 sharedSize += 28; //SHM_Reader::getSharedSize();
81 sharedSize += 28; //SHM_Writer::getSharedSize();
83 const Uint32 slack = MAX_MESSAGE_SIZE;
85 /**
86 * NOTE: There is 7th shared variable in Win2k (sharedCountAttached).
88 Uint32 sizeOfBuffer = shmSize;
89 sizeOfBuffer -= 2*sharedSize;
90 sizeOfBuffer /= 2;
92 Uint32 * base1 = (Uint32*)shmBuf;
94 Uint32 * sharedReadIndex1 = base1;
95 Uint32 * sharedWriteIndex1 = base1 + 1;
96 serverStatusFlag = base1 + 4;
97 char * startOfBuf1 = shmBuf+sharedSize;
99 Uint32 * base2 = (Uint32*)(shmBuf + sizeOfBuffer + sharedSize);
100 Uint32 * sharedReadIndex2 = base2;
101 Uint32 * sharedWriteIndex2 = base2 + 1;
102 clientStatusFlag = base2 + 4;
103 char * startOfBuf2 = ((char *)base2)+sharedSize;
105 if(isServer){
106 * serverStatusFlag = 0;
107 reader = new SHM_Reader(startOfBuf1,
108 sizeOfBuffer,
109 slack,
110 sharedReadIndex1,
111 sharedWriteIndex1);
113 writer = new SHM_Writer(startOfBuf2,
114 sizeOfBuffer,
115 slack,
116 sharedReadIndex2,
117 sharedWriteIndex2);
119 * sharedReadIndex1 = 0;
120 * sharedWriteIndex1 = 0;
122 * sharedReadIndex2 = 0;
123 * sharedWriteIndex2 = 0;
125 reader->clear();
126 writer->clear();
128 * serverStatusFlag = 1;
130 #ifdef DEBUG_TRANSPORTER
131 printf("-- (%d - %d) - Server -\n", localNodeId, remoteNodeId);
132 printf("Reader at: %d (%p)\n", startOfBuf1 - shmBuf, startOfBuf1);
133 printf("sharedReadIndex1 at %d (%p) = %d\n",
134 (char*)sharedReadIndex1-shmBuf,
135 sharedReadIndex1, *sharedReadIndex1);
136 printf("sharedWriteIndex1 at %d (%p) = %d\n",
137 (char*)sharedWriteIndex1-shmBuf,
138 sharedWriteIndex1, *sharedWriteIndex1);
140 printf("Writer at: %d (%p)\n", startOfBuf2 - shmBuf, startOfBuf2);
141 printf("sharedReadIndex2 at %d (%p) = %d\n",
142 (char*)sharedReadIndex2-shmBuf,
143 sharedReadIndex2, *sharedReadIndex2);
144 printf("sharedWriteIndex2 at %d (%p) = %d\n",
145 (char*)sharedWriteIndex2-shmBuf,
146 sharedWriteIndex2, *sharedWriteIndex2);
148 printf("sizeOfBuffer = %d\n", sizeOfBuffer);
149 #endif
150 } else {
151 * clientStatusFlag = 0;
152 reader = new SHM_Reader(startOfBuf2,
153 sizeOfBuffer,
154 slack,
155 sharedReadIndex2,
156 sharedWriteIndex2);
158 writer = new SHM_Writer(startOfBuf1,
159 sizeOfBuffer,
160 slack,
161 sharedReadIndex1,
162 sharedWriteIndex1);
164 * sharedReadIndex2 = 0;
165 * sharedWriteIndex1 = 0;
167 reader->clear();
168 writer->clear();
169 * clientStatusFlag = 1;
170 #ifdef DEBUG_TRANSPORTER
171 printf("-- (%d - %d) - Client -\n", localNodeId, remoteNodeId);
172 printf("Reader at: %d (%p)\n", startOfBuf2 - shmBuf, startOfBuf2);
173 printf("sharedReadIndex2 at %d (%p) = %d\n",
174 (char*)sharedReadIndex2-shmBuf,
175 sharedReadIndex2, *sharedReadIndex2);
176 printf("sharedWriteIndex2 at %d (%p) = %d\n",
177 (char*)sharedWriteIndex2-shmBuf,
178 sharedWriteIndex2, *sharedWriteIndex2);
180 printf("Writer at: %d (%p)\n", startOfBuf1 - shmBuf, startOfBuf1);
181 printf("sharedReadIndex1 at %d (%p) = %d\n",
182 (char*)sharedReadIndex1-shmBuf,
183 sharedReadIndex1, *sharedReadIndex1);
184 printf("sharedWriteIndex1 at %d (%p) = %d\n",
185 (char*)sharedWriteIndex1-shmBuf,
186 sharedWriteIndex1, *sharedWriteIndex1);
188 printf("sizeOfBuffer = %d\n", sizeOfBuffer);
189 #endif
191 #ifdef DEBUG_TRANSPORTER
192 printf("Mapping from %p to %p\n", shmBuf, shmBuf+shmSize);
193 #endif
196 bool
197 SHM_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
199 DBUG_ENTER("SHM_Transporter::connect_server_impl");
200 SocketOutputStream s_output(sockfd);
201 SocketInputStream s_input(sockfd);
202 char buf[256];
204 // Create
205 if(!_shmSegCreated){
206 if (!ndb_shm_create()) {
207 make_error_info(buf, sizeof(buf));
208 report_error(TE_SHM_UNABLE_TO_CREATE_SEGMENT, buf);
209 NDB_CLOSE_SOCKET(sockfd);
210 DBUG_RETURN(false);
212 _shmSegCreated = true;
215 // Attach
216 if(!_attached){
217 if (!ndb_shm_attach()) {
218 make_error_info(buf, sizeof(buf));
219 report_error(TE_SHM_UNABLE_TO_ATTACH_SEGMENT, buf);
220 NDB_CLOSE_SOCKET(sockfd);
221 DBUG_RETURN(false);
223 _attached = true;
226 // Send ok to client
227 s_output.println("shm server 1 ok: %d",
228 m_transporter_registry.m_shm_own_pid);
230 // Wait for ok from client
231 DBUG_PRINT("info", ("Wait for ok from client"));
232 if (s_input.gets(buf, sizeof(buf)) == 0)
234 NDB_CLOSE_SOCKET(sockfd);
235 DBUG_RETURN(false);
238 if(sscanf(buf, "shm client 1 ok: %d", &m_remote_pid) != 1)
240 NDB_CLOSE_SOCKET(sockfd);
241 DBUG_RETURN(false);
244 int r= connect_common(sockfd);
246 if (r) {
247 // Send ok to client
248 s_output.println("shm server 2 ok");
249 // Wait for ok from client
250 if (s_input.gets(buf, 256) == 0) {
251 NDB_CLOSE_SOCKET(sockfd);
252 DBUG_RETURN(false);
254 DBUG_PRINT("info", ("Successfully connected server to node %d",
255 remoteNodeId));
258 NDB_CLOSE_SOCKET(sockfd);
259 DBUG_RETURN(r);
262 bool
263 SHM_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
265 DBUG_ENTER("SHM_Transporter::connect_client_impl");
266 SocketInputStream s_input(sockfd);
267 SocketOutputStream s_output(sockfd);
268 char buf[256];
270 // Wait for server to create and attach
271 DBUG_PRINT("info", ("Wait for server to create and attach"));
272 if (s_input.gets(buf, 256) == 0) {
273 NDB_CLOSE_SOCKET(sockfd);
274 DBUG_PRINT("error", ("Server id %d did not attach",
275 remoteNodeId));
276 DBUG_RETURN(false);
279 if(sscanf(buf, "shm server 1 ok: %d", &m_remote_pid) != 1)
281 NDB_CLOSE_SOCKET(sockfd);
282 DBUG_RETURN(false);
285 // Create
286 if(!_shmSegCreated){
287 if (!ndb_shm_get()) {
288 NDB_CLOSE_SOCKET(sockfd);
289 DBUG_PRINT("error", ("Failed create of shm seg to node %d",
290 remoteNodeId));
291 DBUG_RETURN(false);
293 _shmSegCreated = true;
296 // Attach
297 if(!_attached){
298 if (!ndb_shm_attach()) {
299 make_error_info(buf, sizeof(buf));
300 report_error(TE_SHM_UNABLE_TO_ATTACH_SEGMENT, buf);
301 NDB_CLOSE_SOCKET(sockfd);
302 DBUG_PRINT("error", ("Failed attach of shm seg to node %d",
303 remoteNodeId));
304 DBUG_RETURN(false);
306 _attached = true;
309 // Send ok to server
310 s_output.println("shm client 1 ok: %d",
311 m_transporter_registry.m_shm_own_pid);
313 int r= connect_common(sockfd);
315 if (r) {
316 // Wait for ok from server
317 DBUG_PRINT("info", ("Wait for ok from server"));
318 if (s_input.gets(buf, 256) == 0) {
319 NDB_CLOSE_SOCKET(sockfd);
320 DBUG_PRINT("error", ("No ok from server node %d",
321 remoteNodeId));
322 DBUG_RETURN(false);
324 // Send ok to server
325 s_output.println("shm client 2 ok");
326 DBUG_PRINT("info", ("Successfully connected client to node %d",
327 remoteNodeId));
330 NDB_CLOSE_SOCKET(sockfd);
331 DBUG_RETURN(r);
334 bool
335 SHM_Transporter::connect_common(NDB_SOCKET_TYPE sockfd)
337 if (!checkConnected()) {
338 return false;
341 if(!setupBuffersDone)
343 setupBuffers();
344 setupBuffersDone=true;
347 if(setupBuffersDone)
349 NdbSleep_MilliSleep(m_timeOutMillis);
350 if(*serverStatusFlag == 1 && *clientStatusFlag == 1)
352 m_last_signal = 0;
353 return true;
357 DBUG_PRINT("error", ("Failed to set up buffers to node %d",
358 remoteNodeId));
359 return false;
362 void
363 SHM_Transporter::doSend()
365 if(m_last_signal)
367 m_last_signal = 0;
368 kill(m_remote_pid, g_ndb_shm_signum);
372 Uint32
373 SHM_Transporter::get_free_buffer() const
375 return writer->get_free_buffer();