1 /* Copyright (c) 2003-2006 MySQL AB
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
17 #include <ndb_global.h>
19 #include "SHM_Transporter.hpp"
20 #include "TransporterInternalDefinitions.hpp"
21 #include <TransporterCallback.hpp>
25 #include <InputStream.hpp>
26 #include <OutputStream.hpp>
28 extern int g_ndb_shm_signum
;
30 SHM_Transporter::SHM_Transporter(TransporterRegistry
&t_reg
,
31 const char *lHostName
,
32 const char *rHostName
,
34 bool isMgmConnection_arg
,
42 Transporter(t_reg
, tt_SHM_TRANSPORTER
,
43 lHostName
, rHostName
, r_port
, isMgmConnection_arg
,
44 lNodeId
, rNodeId
, serverNodeId
,
45 0, false, checksum
, signalId
),
52 _shmSegCreated
= false;
59 setupBuffersDone
=false;
60 #ifdef DEBUG_TRANSPORTER
61 printf("shm key (%d - %d) = %d\n", lNodeId
, rNodeId
, shmKey
);
63 m_signal_threshold
= 4096;
66 SHM_Transporter::~SHM_Transporter(){
71 SHM_Transporter::initTransporter(){
78 SHM_Transporter::setupBuffers(){
79 Uint32 sharedSize
= 0;
80 sharedSize
+= 28; //SHM_Reader::getSharedSize();
81 sharedSize
+= 28; //SHM_Writer::getSharedSize();
83 const Uint32 slack
= MAX_MESSAGE_SIZE
;
86 * NOTE: There is 7th shared variable in Win2k (sharedCountAttached).
88 Uint32 sizeOfBuffer
= shmSize
;
89 sizeOfBuffer
-= 2*sharedSize
;
92 Uint32
* base1
= (Uint32
*)shmBuf
;
94 Uint32
* sharedReadIndex1
= base1
;
95 Uint32
* sharedWriteIndex1
= base1
+ 1;
96 serverStatusFlag
= base1
+ 4;
97 char * startOfBuf1
= shmBuf
+sharedSize
;
99 Uint32
* base2
= (Uint32
*)(shmBuf
+ sizeOfBuffer
+ sharedSize
);
100 Uint32
* sharedReadIndex2
= base2
;
101 Uint32
* sharedWriteIndex2
= base2
+ 1;
102 clientStatusFlag
= base2
+ 4;
103 char * startOfBuf2
= ((char *)base2
)+sharedSize
;
106 * serverStatusFlag
= 0;
107 reader
= new SHM_Reader(startOfBuf1
,
113 writer
= new SHM_Writer(startOfBuf2
,
119 * sharedReadIndex1
= 0;
120 * sharedWriteIndex1
= 0;
122 * sharedReadIndex2
= 0;
123 * sharedWriteIndex2
= 0;
128 * serverStatusFlag
= 1;
130 #ifdef DEBUG_TRANSPORTER
131 printf("-- (%d - %d) - Server -\n", localNodeId
, remoteNodeId
);
132 printf("Reader at: %d (%p)\n", startOfBuf1
- shmBuf
, startOfBuf1
);
133 printf("sharedReadIndex1 at %d (%p) = %d\n",
134 (char*)sharedReadIndex1
-shmBuf
,
135 sharedReadIndex1
, *sharedReadIndex1
);
136 printf("sharedWriteIndex1 at %d (%p) = %d\n",
137 (char*)sharedWriteIndex1
-shmBuf
,
138 sharedWriteIndex1
, *sharedWriteIndex1
);
140 printf("Writer at: %d (%p)\n", startOfBuf2
- shmBuf
, startOfBuf2
);
141 printf("sharedReadIndex2 at %d (%p) = %d\n",
142 (char*)sharedReadIndex2
-shmBuf
,
143 sharedReadIndex2
, *sharedReadIndex2
);
144 printf("sharedWriteIndex2 at %d (%p) = %d\n",
145 (char*)sharedWriteIndex2
-shmBuf
,
146 sharedWriteIndex2
, *sharedWriteIndex2
);
148 printf("sizeOfBuffer = %d\n", sizeOfBuffer
);
151 * clientStatusFlag
= 0;
152 reader
= new SHM_Reader(startOfBuf2
,
158 writer
= new SHM_Writer(startOfBuf1
,
164 * sharedReadIndex2
= 0;
165 * sharedWriteIndex1
= 0;
169 * clientStatusFlag
= 1;
170 #ifdef DEBUG_TRANSPORTER
171 printf("-- (%d - %d) - Client -\n", localNodeId
, remoteNodeId
);
172 printf("Reader at: %d (%p)\n", startOfBuf2
- shmBuf
, startOfBuf2
);
173 printf("sharedReadIndex2 at %d (%p) = %d\n",
174 (char*)sharedReadIndex2
-shmBuf
,
175 sharedReadIndex2
, *sharedReadIndex2
);
176 printf("sharedWriteIndex2 at %d (%p) = %d\n",
177 (char*)sharedWriteIndex2
-shmBuf
,
178 sharedWriteIndex2
, *sharedWriteIndex2
);
180 printf("Writer at: %d (%p)\n", startOfBuf1
- shmBuf
, startOfBuf1
);
181 printf("sharedReadIndex1 at %d (%p) = %d\n",
182 (char*)sharedReadIndex1
-shmBuf
,
183 sharedReadIndex1
, *sharedReadIndex1
);
184 printf("sharedWriteIndex1 at %d (%p) = %d\n",
185 (char*)sharedWriteIndex1
-shmBuf
,
186 sharedWriteIndex1
, *sharedWriteIndex1
);
188 printf("sizeOfBuffer = %d\n", sizeOfBuffer
);
191 #ifdef DEBUG_TRANSPORTER
192 printf("Mapping from %p to %p\n", shmBuf
, shmBuf
+shmSize
);
197 SHM_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd
)
199 DBUG_ENTER("SHM_Transporter::connect_server_impl");
200 SocketOutputStream
s_output(sockfd
);
201 SocketInputStream
s_input(sockfd
);
206 if (!ndb_shm_create()) {
207 make_error_info(buf
, sizeof(buf
));
208 report_error(TE_SHM_UNABLE_TO_CREATE_SEGMENT
, buf
);
209 NDB_CLOSE_SOCKET(sockfd
);
212 _shmSegCreated
= true;
217 if (!ndb_shm_attach()) {
218 make_error_info(buf
, sizeof(buf
));
219 report_error(TE_SHM_UNABLE_TO_ATTACH_SEGMENT
, buf
);
220 NDB_CLOSE_SOCKET(sockfd
);
227 s_output
.println("shm server 1 ok: %d",
228 m_transporter_registry
.m_shm_own_pid
);
230 // Wait for ok from client
231 DBUG_PRINT("info", ("Wait for ok from client"));
232 if (s_input
.gets(buf
, sizeof(buf
)) == 0)
234 NDB_CLOSE_SOCKET(sockfd
);
238 if(sscanf(buf
, "shm client 1 ok: %d", &m_remote_pid
) != 1)
240 NDB_CLOSE_SOCKET(sockfd
);
244 int r
= connect_common(sockfd
);
248 s_output
.println("shm server 2 ok");
249 // Wait for ok from client
250 if (s_input
.gets(buf
, 256) == 0) {
251 NDB_CLOSE_SOCKET(sockfd
);
254 DBUG_PRINT("info", ("Successfully connected server to node %d",
258 NDB_CLOSE_SOCKET(sockfd
);
263 SHM_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd
)
265 DBUG_ENTER("SHM_Transporter::connect_client_impl");
266 SocketInputStream
s_input(sockfd
);
267 SocketOutputStream
s_output(sockfd
);
270 // Wait for server to create and attach
271 DBUG_PRINT("info", ("Wait for server to create and attach"));
272 if (s_input
.gets(buf
, 256) == 0) {
273 NDB_CLOSE_SOCKET(sockfd
);
274 DBUG_PRINT("error", ("Server id %d did not attach",
279 if(sscanf(buf
, "shm server 1 ok: %d", &m_remote_pid
) != 1)
281 NDB_CLOSE_SOCKET(sockfd
);
287 if (!ndb_shm_get()) {
288 NDB_CLOSE_SOCKET(sockfd
);
289 DBUG_PRINT("error", ("Failed create of shm seg to node %d",
293 _shmSegCreated
= true;
298 if (!ndb_shm_attach()) {
299 make_error_info(buf
, sizeof(buf
));
300 report_error(TE_SHM_UNABLE_TO_ATTACH_SEGMENT
, buf
);
301 NDB_CLOSE_SOCKET(sockfd
);
302 DBUG_PRINT("error", ("Failed attach of shm seg to node %d",
310 s_output
.println("shm client 1 ok: %d",
311 m_transporter_registry
.m_shm_own_pid
);
313 int r
= connect_common(sockfd
);
316 // Wait for ok from server
317 DBUG_PRINT("info", ("Wait for ok from server"));
318 if (s_input
.gets(buf
, 256) == 0) {
319 NDB_CLOSE_SOCKET(sockfd
);
320 DBUG_PRINT("error", ("No ok from server node %d",
325 s_output
.println("shm client 2 ok");
326 DBUG_PRINT("info", ("Successfully connected client to node %d",
330 NDB_CLOSE_SOCKET(sockfd
);
335 SHM_Transporter::connect_common(NDB_SOCKET_TYPE sockfd
)
337 if (!checkConnected()) {
341 if(!setupBuffersDone
)
344 setupBuffersDone
=true;
349 NdbSleep_MilliSleep(m_timeOutMillis
);
350 if(*serverStatusFlag
== 1 && *clientStatusFlag
== 1)
357 DBUG_PRINT("error", ("Failed to set up buffers to node %d",
363 SHM_Transporter::doSend()
368 kill(m_remote_pid
, g_ndb_shm_signum
);
373 SHM_Transporter::get_free_buffer() const
375 return writer
->get_free_buffer();