client_login_timeout: check wait_for_welcome
[pgbouncer.git] / src / takeover.c
blobef18f6c7c3c90bffd58bf827866b9b0fb09a5ded
1 /*
2 * PgBouncer - Lightweight connection pooler for PostgreSQL.
3 *
4 * Copyright (c) 2007-2009 Marko Kreen, Skype Technologies OÜ
5 *
6 * Permission to use, copy, modify, and/or distribute this software for any
7 * purpose with or without fee is hereby granted, provided that the above
8 * copyright notice and this permission notice appear in all copies.
9 *
10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
20 * Connect to running bouncer process, load fds from it, shut it down
21 * and continue with them.
23 * Each row from SHOW FDS will have corresponding fd in ancillary message.
25 * Manpages: unix, sendmsg, recvmsg, cmsg, readv
28 #include "bouncer.h"
31 * Takeover done, old process shut down,
32 * kick this one running.
35 static PgSocket *old_bouncer = NULL;
37 void takeover_finish(void)
39 uint8_t buf[512];
40 int fd = sbuf_socket(&old_bouncer->sbuf);
41 bool res;
42 int got;
44 log_info("sending SHUTDOWN;");
45 socket_set_nonblocking(fd, 0);
46 SEND_generic(res, old_bouncer, 'Q', "s", "SHUTDOWN;");
47 if (!res)
48 fatal("failed to send SHUTDOWN;");
50 while (1) {
51 got = safe_recv(fd, buf, sizeof(buf), 0);
52 if (got == 0)
53 break;
54 if (got < 0)
55 fatal_perror("sky is falling - error while waiting result from SHUTDOWN");
58 disconnect_server(old_bouncer, false, "disko over");
59 old_bouncer = NULL;
61 log_info("old process killed, resuming work");
62 resume_all();
65 static void takeover_finish_part1(PgSocket *bouncer)
67 Assert(old_bouncer == NULL);
69 /* unregister bouncer from libevent */
70 if (!sbuf_pause(&bouncer->sbuf))
71 fatal_perror("sbuf_pause failed");
72 old_bouncer = bouncer;
73 cf_reboot = 0;
74 log_info("disko over, going background");
77 /* parse msg for fd and info */
78 static void takeover_load_fd(MBuf *pkt, const struct cmsghdr *cmsg)
80 int fd;
81 char *task, *saddr, *user, *db;
82 char *client_enc, *std_string, *datestyle, *timezone;
83 int oldfd, port, linkfd;
84 int got;
85 uint64_t ckey;
86 PgAddr addr;
87 bool res = false;
89 memset(&addr, 0, sizeof(addr));
91 if (cmsg->cmsg_level == SOL_SOCKET
92 && cmsg->cmsg_type == SCM_RIGHTS
93 && cmsg->cmsg_len >= CMSG_LEN(sizeof(int)))
95 /* get the fd */
96 memcpy(&fd, CMSG_DATA(cmsg), sizeof(int));
97 log_debug("got fd: %d", fd);
98 } else
99 fatal("broken fd packet");
101 /* parse row contents */
102 got = scan_text_result(pkt, "issssiqissss", &oldfd, &task, &user, &db,
103 &saddr, &port, &ckey, &linkfd,
104 &client_enc, &std_string, &datestyle, &timezone);
105 if (task == NULL || saddr == NULL)
106 fatal("NULL data from old process");
108 log_debug("FD row: fd=%d(%d) linkfd=%d task=%s user=%s db=%s enc=%s",
109 oldfd, fd, linkfd, task,
110 user ? user : "NULL", db ? db : "NULL",
111 client_enc ? client_enc : "NULL");
113 /* fill address */
114 addr.is_unix = strcmp(saddr, "unix") == 0 ? true : false;
115 if (addr.is_unix) {
116 addr.port = cf_listen_port;
117 } else {
118 addr.ip_addr.s_addr = inet_addr(saddr);
119 addr.port = port;
122 /* decide what to do with it */
123 if (strcmp(task, "client") == 0)
124 res = use_client_socket(fd, &addr, db, user, ckey, oldfd, linkfd,
125 client_enc, std_string, datestyle, timezone);
126 else if (strcmp(task, "server") == 0)
127 res = use_server_socket(fd, &addr, db, user, ckey, oldfd, linkfd,
128 client_enc, std_string, datestyle, timezone);
129 else if (strcmp(task, "pooler") == 0)
130 res = use_pooler_socket(fd, addr.is_unix);
131 else
132 fatal("unknown task: %s", task);
134 if (!res)
135 fatal("socket takeover failed - no mem?");
138 static void takeover_create_link(PgPool *pool, PgSocket *client)
140 List *item;
141 PgSocket *server;
143 statlist_for_each(item, &pool->active_server_list) {
144 server = container_of(item, PgSocket, head);
145 if (server->tmp_sk_oldfd == client->tmp_sk_linkfd) {
146 server->link = client;
147 client->link = server;
148 return;
151 fatal("takeover_create_link: failed to find pair");
154 /* clean the inappropriate places the old fds got stored in */
155 static void takeover_clean_socket_list(StatList *list)
157 List *item;
158 PgSocket *sk;
159 statlist_for_each(item, list) {
160 sk = container_of(item, PgSocket, head);
161 if (sk->suspended) {
162 sk->tmp_sk_oldfd = get_cached_time();
163 sk->tmp_sk_linkfd = get_cached_time();
168 /* all fds loaded, create links */
169 static void takeover_postprocess_fds(void)
171 List *item, *item2;
172 PgSocket *client;
173 PgPool *pool;
175 statlist_for_each(item, &pool_list) {
176 pool = container_of(item, PgPool, head);
177 if (pool->db->admin)
178 continue;
179 statlist_for_each(item2, &pool->active_client_list) {
180 client = container_of(item2, PgSocket, head);
181 if (client->suspended && client->tmp_sk_linkfd)
182 takeover_create_link(pool, client);
185 statlist_for_each(item, &pool_list) {
186 pool = container_of(item, PgPool, head);
187 takeover_clean_socket_list(&pool->active_client_list);
188 takeover_clean_socket_list(&pool->active_server_list);
189 takeover_clean_socket_list(&pool->idle_server_list);
193 static void next_command(PgSocket *bouncer, MBuf *pkt)
195 bool res = true;
196 const char *cmd = mbuf_get_string(pkt);
198 log_debug("takeover_recv_fds: 'C' body: %s", cmd);
199 if (strcmp(cmd, "SUSPEND") == 0) {
200 log_info("SUSPEND finished, sending SHOW FDS");
201 SEND_generic(res, bouncer, 'Q', "s", "SHOW FDS;");
202 } else if (strncmp(cmd, "SHOW", 4) == 0) {
203 /* all fds loaded, review them */
204 takeover_postprocess_fds();
205 log_info("SHOW FDS finished");
207 takeover_finish_part1(bouncer);
208 } else
209 fatal("got bad CMD from old bouncer: %s", cmd);
211 if (!res)
212 fatal("command send failed");
215 static void takeover_parse_data(PgSocket *bouncer,
216 struct msghdr *msg, MBuf *data)
218 struct cmsghdr *cmsg;
219 PktHdr pkt;
221 cmsg = msg->msg_controllen ? CMSG_FIRSTHDR(msg) : NULL;
223 while (mbuf_avail(data) > 0) {
224 if (!get_header(data, &pkt))
225 fatal("cannot parse packet");
228 * There should not be partial reads from UNIX socket.
230 if (incomplete_pkt(&pkt))
231 fatal("unexpected partial packet");
233 switch (pkt.type) {
234 case 'T': /* RowDescription */
235 log_debug("takeover_parse_data: 'T'");
236 break;
237 case 'D': /* DataRow */
238 log_debug("takeover_parse_data: 'D'");
239 if (cmsg) {
240 takeover_load_fd(&pkt.data, cmsg);
241 cmsg = CMSG_NXTHDR(msg, cmsg);
242 } else
243 fatal("got row without fd info");
244 break;
245 case 'Z': /* ReadyForQuery */
246 log_debug("takeover_parse_data: 'Z'");
247 break;
248 case 'C': /* CommandComplete */
249 log_debug("takeover_parse_data: 'C'");
250 next_command(bouncer, &pkt.data);
251 break;
252 case 'E': /* ErrorMessage */
253 log_server_error("old bouncer sent", &pkt);
254 fatal("something failed");
255 default:
256 fatal("takeover_parse_data: unexpected pkt: '%c'", pkt_desc(&pkt));
262 * listen for data from old bouncer.
264 * use always recvmsg, to keep code simpler
266 static void takeover_recv_cb(int sock, short flags, void *arg)
268 PgSocket *bouncer = container_of(arg, PgSocket, sbuf);
269 uint8_t data_buf[STARTUP_BUF * 2];
270 uint8_t cnt_buf[128];
271 struct msghdr msg;
272 struct iovec io;
273 int res;
274 MBuf data;
276 memset(&msg, 0, sizeof(msg));
277 io.iov_base = data_buf;
278 io.iov_len = sizeof(data_buf);
279 msg.msg_iov = &io;
280 msg.msg_iovlen = 1;
281 msg.msg_control = cnt_buf;
282 msg.msg_controllen = sizeof(cnt_buf);
284 res = safe_recvmsg(sock, &msg, 0);
285 if (res > 0) {
286 mbuf_init(&data, data_buf, res);
287 takeover_parse_data(bouncer, &msg, &data);
288 } else if (res == 0) {
289 fatal("unexpected EOF");
290 } else {
291 if (errno == EAGAIN)
292 return;
293 fatal_perror("safe_recvmsg");
298 * login finished, send first command,
299 * replace recv callback with custom recvmsg() based one.
301 bool takeover_login(PgSocket *bouncer)
303 bool res;
305 slog_info(bouncer, "Login OK, sending SUSPEND");
306 SEND_generic(res, bouncer, 'Q', "s", "SUSPEND;");
307 if (res) {
308 /* use own callback */
309 if (!sbuf_pause(&bouncer->sbuf))
310 fatal("sbuf_pause failed");
311 res = sbuf_continue_with_callback(&bouncer->sbuf, takeover_recv_cb);
312 if (!res)
313 fatal("takeover_login: sbuf_continue_with_callback failed");
314 } else {
315 fatal("takeover_login: failed to send command");
317 return res;
320 /* launch connection to running process */
321 void takeover_init(void)
323 PgDatabase *db = find_database("pgbouncer");
324 PgPool *pool = get_pool(db, db->forced_user);
326 if (!pool)
327 fatal("no admin pool?");
329 log_info("takeover_init: launching connection");
330 launch_new_connection(pool);
333 void takeover_login_failed(void)
335 fatal("login failed");