client_login_timeout: check wait_for_welcome
[pgbouncer.git] / src / server.c
blob5088f533d29e9479129694db69415bd349a865ad
1 /*
2 * PgBouncer - Lightweight connection pooler for PostgreSQL.
3 *
4 * Copyright (c) 2007-2009 Marko Kreen, Skype Technologies OÜ
5 *
6 * Permission to use, copy, modify, and/or distribute this software for any
7 * purpose with or without fee is hereby granted, provided that the above
8 * copyright notice and this permission notice appear in all copies.
9 *
10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
20 * Handling of server connections
23 #include "bouncer.h"
25 static bool load_parameter(PgSocket *server, PktHdr *pkt, bool startup)
27 const char *key, *val;
28 PgSocket *client = server->link;
31 * Want to see complete packet. That means SMALL_PKT
32 * in sbuf.c must be larger than max param pkt.
34 if (incomplete_pkt(pkt))
35 return false;
37 key = mbuf_get_string(&pkt->data);
38 val = mbuf_get_string(&pkt->data);
39 if (!key || !val) {
40 disconnect_server(server, true, "broken ParameterStatus packet");
41 return false;
43 slog_debug(server, "S: param: %s = %s", key, val);
45 varcache_set(&server->vars, key, val);
47 if (client) {
48 slog_debug(client, "setting client var: %s='%s'", key, val);
49 varcache_set(&client->vars, key, val);
52 if (startup)
53 add_welcome_parameter(server->pool, key, val);
55 return true;
58 /* we cannot log in at all, notify clients */
59 static void kill_pool_logins(PgPool *pool, PktHdr *errpkt)
61 List *item, *tmp;
62 PgSocket *client;
63 const char *level, *msg;
65 parse_server_error(errpkt, &level, &msg);
67 log_warning("server login failed: %s %s", level, msg);
69 statlist_for_each_safe(item, &pool->waiting_client_list, tmp) {
70 client = container_of(item, PgSocket, head);
71 if (!client->wait_for_welcome)
72 continue;
74 disconnect_client(client, true, "%s", msg);
78 /* process packets on server auth phase */
79 static bool handle_server_startup(PgSocket *server, PktHdr *pkt)
81 SBuf *sbuf = &server->sbuf;
82 bool res = false;
84 if (incomplete_pkt(pkt)) {
85 disconnect_server(server, true, "partial pkt in login phase");
86 return false;
89 /* ignore most that happens during connect_query */
90 if (server->exec_on_connect) {
91 switch (pkt->type) {
92 case 'Z':
93 case 'S': /* handle them below */
94 break;
96 case 'E': /* log & ignore errors */
97 log_server_error("S: error while executing exec_on_query", pkt);
98 default: /* ignore rest */
99 sbuf_prepare_skip(sbuf, pkt->len);
100 return true;
104 switch (pkt->type) {
105 default:
106 slog_error(server, "unknown pkt from server: '%c'", pkt_desc(pkt));
107 disconnect_server(server, true, "unknown pkt from server");
108 break;
110 case 'E': /* ErrorResponse */
111 if (!server->pool->welcome_msg_ready)
112 kill_pool_logins(server->pool, pkt);
113 else
114 log_server_error("S: login failed", pkt);
116 disconnect_server(server, true, "login failed");
117 break;
119 /* packets that need closer look */
120 case 'R': /* AuthenticationXXX */
121 slog_debug(server, "calling login_answer");
122 res = answer_authreq(server, pkt);
123 if (!res)
124 disconnect_server(server, false, "failed to answer authreq");
125 break;
127 case 'S': /* ParameterStatus */
128 res = load_parameter(server, pkt, true);
129 break;
131 case 'Z': /* ReadyForQuery */
132 if (server->exec_on_connect) {
133 server->exec_on_connect = 0;
134 /* deliberately ignore transaction status */
135 } else if (server->pool->db->connect_query) {
136 server->exec_on_connect = 1;
137 slog_debug(server, "server conect ok, send exec_on_connect");
138 SEND_generic(res, server, 'Q', "s", server->pool->db->connect_query);
139 if (!res)
140 disconnect_server(server, false, "exec_on_connect query failed");
141 break;
144 /* login ok */
145 slog_debug(server, "server login ok, start accepting queries");
146 server->ready = 1;
148 /* got all params */
149 finish_welcome_msg(server);
151 /* need to notify sbuf if server was closed */
152 res = release_server(server);
154 /* let the takeover process handle it */
155 if (res && server->pool->db->admin)
156 res = takeover_login(server);
157 break;
159 /* ignorable packets */
160 case 'K': /* BackendKeyData */
161 if (mbuf_avail(&pkt->data) >= BACKENDKEY_LEN)
162 memcpy(server->cancel_key,
163 mbuf_get_bytes(&pkt->data, BACKENDKEY_LEN),
164 BACKENDKEY_LEN);
165 res = true;
166 break;
168 case 'N': /* NoticeResponse */
169 slog_noise(server, "skipping pkt: %c", pkt_desc(pkt));
170 res = true;
171 break;
174 if (res)
175 sbuf_prepare_skip(sbuf, pkt->len);
177 return res;
180 /* process packets on logged in connection */
181 static bool handle_server_work(PgSocket *server, PktHdr *pkt)
183 bool ready = 0;
184 char state;
185 SBuf *sbuf = &server->sbuf;
186 PgSocket *client = server->link;
188 Assert(!server->pool->db->admin);
190 switch (pkt->type) {
191 default:
192 slog_error(server, "unknown pkt: '%c'", pkt_desc(pkt));
193 disconnect_server(server, true, "unknown pkt");
194 return false;
196 /* pooling decisions will be based on this packet */
197 case 'Z': /* ReadyForQuery */
199 /* if partial pkt, wait */
200 if (mbuf_avail(&pkt->data) == 0)
201 return false;
202 state = mbuf_get_char(&pkt->data);
204 /* set ready only if no tx */
205 if (state == 'I')
206 ready = 1;
207 else if (cf_pool_mode == POOL_STMT) {
208 disconnect_server(server, true, "Long transactions not allowed");
209 return false;
211 break;
213 case 'S': /* ParameterStatus */
214 if (!load_parameter(server, pkt, false))
215 return false;
216 break;
219 * 'E' and 'N' packets currently set ->ready to 0. Correct would
220 * be to leave ->ready as-is, because overal TX state stays same.
221 * It matters for connections in IDLE or USED state which get dirty
222 * suddenly but should not as they are still usable.
224 * But the 'E' or 'N' packet between transactions signifies probably
225 * dying backend. This its better to tag server as dirty and drop
226 * it later.
228 case 'E': /* ErrorResponse */
229 if (server->setting_vars) {
231 * the SET and user query will be different TX
232 * so we cannot report SET error to user.
234 log_server_error("varcache_apply failed", pkt);
237 * client probably gave invalid values in startup pkt.
239 * no reason to keep such guys.
241 disconnect_server(server, true, "invalid server parameter");
242 return false;
245 case 'N': /* NoticeResponse */
246 break;
248 /* chat packets */
249 case '2': /* BindComplete */
250 case '3': /* CloseComplete */
251 case 'c': /* CopyDone(F/B) */
252 case 'f': /* CopyFail(F/B) */
253 case 'I': /* EmptyQueryResponse == CommandComplete */
254 case 'V': /* FunctionCallResponse */
255 case 'n': /* NoData */
256 case 'G': /* CopyInResponse */
257 case 'H': /* CopyOutResponse */
258 case '1': /* ParseComplete */
259 case 'A': /* NotificationResponse */
260 case 's': /* PortalSuspended */
261 case 'C': /* CommandComplete */
263 /* data packets, there will be more coming */
264 case 'd': /* CopyData(F/B) */
265 case 'D': /* DataRow */
266 case 't': /* ParameterDescription */
267 case 'T': /* RowDescription */
268 break;
270 server->ready = ready;
271 server->pool->stats.server_bytes += pkt->len;
273 if (server->setting_vars) {
274 Assert(client);
275 sbuf_prepare_skip(sbuf, pkt->len);
276 } else if (client) {
277 sbuf_prepare_send(sbuf, &client->sbuf, pkt->len);
278 if (ready && client->query_start) {
279 usec_t total;
280 total = get_cached_time() - client->query_start;
281 client->query_start = 0;
282 server->pool->stats.query_time += total;
283 slog_debug(client, "query time: %d us", (int)total);
284 } else if (ready) {
285 slog_warning(client, "FIXME: query end, but query_start == 0");
287 } else {
288 if (server->state != SV_TESTED)
289 slog_warning(server,
290 "got packet '%c' from server when not linked",
291 pkt_desc(pkt));
292 sbuf_prepare_skip(sbuf, pkt->len);
295 return true;
298 /* got connection, decide what to do */
299 static bool handle_connect(PgSocket *server)
301 bool res = false;
302 PgPool *pool = server->pool;
304 fill_local_addr(server, sbuf_socket(&server->sbuf), server->remote_addr.is_unix);
306 if (!statlist_empty(&pool->cancel_req_list)) {
307 slog_debug(server, "use it for pending cancel req");
308 /* if pending cancel req, send it */
309 forward_cancel_request(server);
310 /* notify disconnect_server() that connect did not fail */
311 server->ready = 1;
312 disconnect_server(server, false, "sent cancel req");
313 } else {
314 /* proceed with login */
315 res = send_startup_packet(server);
316 if (!res)
317 disconnect_server(server, false, "startup pkt failed");
319 return res;
322 /* callback from SBuf */
323 bool server_proto(SBuf *sbuf, SBufEvent evtype, MBuf *data)
325 bool res = false;
326 PgSocket *server = container_of(sbuf, PgSocket, sbuf);
327 PgPool *pool = server->pool;
328 PktHdr pkt;
330 Assert(is_server_socket(server));
331 Assert(server->state != SV_FREE);
333 /* may happen if close failed */
334 if (server->state == SV_JUSTFREE)
335 return false;
337 switch (evtype) {
338 case SBUF_EV_RECV_FAILED:
339 disconnect_server(server, false, "server conn crashed?");
340 break;
341 case SBUF_EV_SEND_FAILED:
342 disconnect_client(server->link, false, "unexpected eof");
343 break;
344 case SBUF_EV_READ:
345 if (mbuf_avail(data) < NEW_HEADER_LEN) {
346 slog_noise(server, "S: got partial header, trying to wait a bit");
347 break;
350 /* parse pkt header */
351 if (!get_header(data, &pkt)) {
352 disconnect_server(server, true, "bad pkt header");
353 break;
355 slog_noise(server, "S: pkt '%c', len=%d", pkt_desc(&pkt), pkt.len);
357 server->request_time = get_cached_time();
358 switch (server->state) {
359 case SV_LOGIN:
360 res = handle_server_startup(server, &pkt);
361 break;
362 case SV_TESTED:
363 case SV_USED:
364 case SV_ACTIVE:
365 case SV_IDLE:
366 res = handle_server_work(server, &pkt);
367 break;
368 default:
369 fatal("server_proto: server in bad state: %d", server->state);
371 break;
372 case SBUF_EV_CONNECT_FAILED:
373 Assert(server->state == SV_LOGIN);
374 disconnect_server(server, false, "connect failed");
375 break;
376 case SBUF_EV_CONNECT_OK:
377 slog_debug(server, "S: connect ok");
378 Assert(server->state == SV_LOGIN);
379 server->request_time = get_cached_time();
380 res = handle_connect(server);
381 break;
382 case SBUF_EV_FLUSH:
383 res = true;
384 if (!server->ready)
385 break;
387 if (server->setting_vars) {
388 PgSocket *client = server->link;
389 Assert(client);
391 server->setting_vars = 0;
392 sbuf_continue(&client->sbuf);
393 break;
396 if (cf_pool_mode != POOL_SESSION || server->state == SV_TESTED) {
397 switch (server->state) {
398 case SV_ACTIVE:
399 case SV_TESTED:
400 /* retval does not matter here */
401 release_server(server);
402 break;
403 default:
404 slog_warning(server, "EV_FLUSH with state=%d", server->state);
405 case SV_IDLE:
406 break;
409 break;
410 case SBUF_EV_PKT_CALLBACK:
411 slog_warning(server, "SBUF_EV_PKT_CALLBACK with state=%d", server->state);
412 break;
414 if (!res && pool->db->admin)
415 takeover_login_failed();
416 return res;