2 * PgBouncer - Lightweight connection pooler for PostgreSQL.
4 * Copyright (c) 2007-2009 Marko Kreen, Skype Technologies OÜ
6 * Permission to use, copy, modify, and/or distribute this software for any
7 * purpose with or without fee is hereby granted, provided that the above
8 * copyright notice and this permission notice appear in all copies.
10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
20 * Connect to running bouncer process, load fds from it, shut it down
21 * and continue with them.
23 * Each row from SHOW FDS will have corresponding fd in ancillary message.
25 * Manpages: unix, sendmsg, recvmsg, cmsg, readv
31 * Takeover done, old process shut down,
32 * kick this one running.
35 static PgSocket
*old_bouncer
= NULL
;
37 void takeover_finish(void)
40 int fd
= sbuf_socket(&old_bouncer
->sbuf
);
44 log_info("sending SHUTDOWN;");
45 socket_set_nonblocking(fd
, 0);
46 SEND_generic(res
, old_bouncer
, 'Q', "s", "SHUTDOWN;");
48 fatal("failed to send SHUTDOWN;");
51 got
= safe_recv(fd
, buf
, sizeof(buf
), 0);
55 fatal_perror("sky is falling - error while waiting result from SHUTDOWN");
58 disconnect_server(old_bouncer
, false, "disko over");
61 log_info("old process killed, resuming work");
65 static void takeover_finish_part1(PgSocket
*bouncer
)
67 Assert(old_bouncer
== NULL
);
69 /* unregister bouncer from libevent */
70 if (!sbuf_pause(&bouncer
->sbuf
))
71 fatal_perror("sbuf_pause failed");
72 old_bouncer
= bouncer
;
74 log_info("disko over, going background");
77 /* parse msg for fd and info */
78 static void takeover_load_fd(MBuf
*pkt
, const struct cmsghdr
*cmsg
)
81 char *task
, *saddr
, *user
, *db
;
82 char *client_enc
, *std_string
, *datestyle
, *timezone
;
83 int oldfd
, port
, linkfd
;
89 memset(&addr
, 0, sizeof(addr
));
91 if (cmsg
->cmsg_level
== SOL_SOCKET
92 && cmsg
->cmsg_type
== SCM_RIGHTS
93 && cmsg
->cmsg_len
>= CMSG_LEN(sizeof(int)))
96 memcpy(&fd
, CMSG_DATA(cmsg
), sizeof(int));
97 log_debug("got fd: %d", fd
);
99 fatal("broken fd packet");
101 /* parse row contents */
102 got
= scan_text_result(pkt
, "issssiqissss", &oldfd
, &task
, &user
, &db
,
103 &saddr
, &port
, &ckey
, &linkfd
,
104 &client_enc
, &std_string
, &datestyle
, &timezone
);
105 if (task
== NULL
|| saddr
== NULL
)
106 fatal("NULL data from old process");
108 log_debug("FD row: fd=%d(%d) linkfd=%d task=%s user=%s db=%s enc=%s",
109 oldfd
, fd
, linkfd
, task
,
110 user
? user
: "NULL", db
? db
: "NULL",
111 client_enc
? client_enc
: "NULL");
114 addr
.is_unix
= strcmp(saddr
, "unix") == 0 ? true : false;
116 addr
.port
= cf_listen_port
;
118 addr
.ip_addr
.s_addr
= inet_addr(saddr
);
122 /* decide what to do with it */
123 if (strcmp(task
, "client") == 0)
124 res
= use_client_socket(fd
, &addr
, db
, user
, ckey
, oldfd
, linkfd
,
125 client_enc
, std_string
, datestyle
, timezone
);
126 else if (strcmp(task
, "server") == 0)
127 res
= use_server_socket(fd
, &addr
, db
, user
, ckey
, oldfd
, linkfd
,
128 client_enc
, std_string
, datestyle
, timezone
);
129 else if (strcmp(task
, "pooler") == 0)
130 res
= use_pooler_socket(fd
, addr
.is_unix
);
132 fatal("unknown task: %s", task
);
135 fatal("socket takeover failed - no mem?");
138 static void takeover_create_link(PgPool
*pool
, PgSocket
*client
)
143 statlist_for_each(item
, &pool
->active_server_list
) {
144 server
= container_of(item
, PgSocket
, head
);
145 if (server
->tmp_sk_oldfd
== client
->tmp_sk_linkfd
) {
146 server
->link
= client
;
147 client
->link
= server
;
151 fatal("takeover_create_link: failed to find pair");
154 /* clean the inappropriate places the old fds got stored in */
155 static void takeover_clean_socket_list(StatList
*list
)
159 statlist_for_each(item
, list
) {
160 sk
= container_of(item
, PgSocket
, head
);
162 sk
->tmp_sk_oldfd
= get_cached_time();
163 sk
->tmp_sk_linkfd
= get_cached_time();
168 /* all fds loaded, create links */
169 static void takeover_postprocess_fds(void)
175 statlist_for_each(item
, &pool_list
) {
176 pool
= container_of(item
, PgPool
, head
);
179 statlist_for_each(item2
, &pool
->active_client_list
) {
180 client
= container_of(item2
, PgSocket
, head
);
181 if (client
->suspended
&& client
->tmp_sk_linkfd
)
182 takeover_create_link(pool
, client
);
185 statlist_for_each(item
, &pool_list
) {
186 pool
= container_of(item
, PgPool
, head
);
187 takeover_clean_socket_list(&pool
->active_client_list
);
188 takeover_clean_socket_list(&pool
->active_server_list
);
189 takeover_clean_socket_list(&pool
->idle_server_list
);
193 static void next_command(PgSocket
*bouncer
, MBuf
*pkt
)
196 const char *cmd
= mbuf_get_string(pkt
);
198 log_debug("takeover_recv_fds: 'C' body: %s", cmd
);
199 if (strcmp(cmd
, "SUSPEND") == 0) {
200 log_info("SUSPEND finished, sending SHOW FDS");
201 SEND_generic(res
, bouncer
, 'Q', "s", "SHOW FDS;");
202 } else if (strncmp(cmd
, "SHOW", 4) == 0) {
203 /* all fds loaded, review them */
204 takeover_postprocess_fds();
205 log_info("SHOW FDS finished");
207 takeover_finish_part1(bouncer
);
209 fatal("got bad CMD from old bouncer: %s", cmd
);
212 fatal("command send failed");
215 static void takeover_parse_data(PgSocket
*bouncer
,
216 struct msghdr
*msg
, MBuf
*data
)
218 struct cmsghdr
*cmsg
;
221 cmsg
= msg
->msg_controllen
? CMSG_FIRSTHDR(msg
) : NULL
;
223 while (mbuf_avail(data
) > 0) {
224 if (!get_header(data
, &pkt
))
225 fatal("cannot parse packet");
228 * There should not be partial reads from UNIX socket.
230 if (incomplete_pkt(&pkt
))
231 fatal("unexpected partial packet");
234 case 'T': /* RowDescription */
235 log_debug("takeover_parse_data: 'T'");
237 case 'D': /* DataRow */
238 log_debug("takeover_parse_data: 'D'");
240 takeover_load_fd(&pkt
.data
, cmsg
);
241 cmsg
= CMSG_NXTHDR(msg
, cmsg
);
243 fatal("got row without fd info");
245 case 'Z': /* ReadyForQuery */
246 log_debug("takeover_parse_data: 'Z'");
248 case 'C': /* CommandComplete */
249 log_debug("takeover_parse_data: 'C'");
250 next_command(bouncer
, &pkt
.data
);
252 case 'E': /* ErrorMessage */
253 log_server_error("old bouncer sent", &pkt
);
254 fatal("something failed");
256 fatal("takeover_parse_data: unexpected pkt: '%c'", pkt_desc(&pkt
));
262 * listen for data from old bouncer.
264 * use always recvmsg, to keep code simpler
266 static void takeover_recv_cb(int sock
, short flags
, void *arg
)
268 PgSocket
*bouncer
= container_of(arg
, PgSocket
, sbuf
);
269 uint8_t data_buf
[STARTUP_BUF
* 2];
270 uint8_t cnt_buf
[128];
276 memset(&msg
, 0, sizeof(msg
));
277 io
.iov_base
= data_buf
;
278 io
.iov_len
= sizeof(data_buf
);
281 msg
.msg_control
= cnt_buf
;
282 msg
.msg_controllen
= sizeof(cnt_buf
);
284 res
= safe_recvmsg(sock
, &msg
, 0);
286 mbuf_init(&data
, data_buf
, res
);
287 takeover_parse_data(bouncer
, &msg
, &data
);
288 } else if (res
== 0) {
289 fatal("unexpected EOF");
293 fatal_perror("safe_recvmsg");
298 * login finished, send first command,
299 * replace recv callback with custom recvmsg() based one.
301 bool takeover_login(PgSocket
*bouncer
)
305 slog_info(bouncer
, "Login OK, sending SUSPEND");
306 SEND_generic(res
, bouncer
, 'Q', "s", "SUSPEND;");
308 /* use own callback */
309 if (!sbuf_pause(&bouncer
->sbuf
))
310 fatal("sbuf_pause failed");
311 res
= sbuf_continue_with_callback(&bouncer
->sbuf
, takeover_recv_cb
);
313 fatal("takeover_login: sbuf_continue_with_callback failed");
315 fatal("takeover_login: failed to send command");
320 /* launch connection to running process */
321 void takeover_init(void)
323 PgDatabase
*db
= find_database("pgbouncer");
324 PgPool
*pool
= get_pool(db
, db
->forced_user
);
327 fatal("no admin pool?");
329 log_info("takeover_init: launching connection");
330 launch_new_connection(pool
);
333 void takeover_login_failed(void)
335 fatal("login failed");