2 * PgBouncer - Lightweight connection pooler for PostgreSQL.
4 * Copyright (c) 2007-2009 Marko Kreen, Skype Technologies OÜ
6 * Permission to use, copy, modify, and/or distribute this software for any
7 * purpose with or without fee is hereby granted, provided that the above
8 * copyright notice and this permission notice appear in all copies.
10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
20 * Handling of server connections
25 static bool load_parameter(PgSocket
*server
, PktHdr
*pkt
, bool startup
)
27 const char *key
, *val
;
28 PgSocket
*client
= server
->link
;
31 * Want to see complete packet. That means SMALL_PKT
32 * in sbuf.c must be larger than max param pkt.
34 if (incomplete_pkt(pkt
))
37 key
= mbuf_get_string(&pkt
->data
);
38 val
= mbuf_get_string(&pkt
->data
);
40 disconnect_server(server
, true, "broken ParameterStatus packet");
43 slog_debug(server
, "S: param: %s = %s", key
, val
);
45 varcache_set(&server
->vars
, key
, val
);
48 slog_debug(client
, "setting client var: %s='%s'", key
, val
);
49 varcache_set(&client
->vars
, key
, val
);
53 add_welcome_parameter(server
->pool
, key
, val
);
58 /* we cannot log in at all, notify clients */
59 static void kill_pool_logins(PgPool
*pool
, PktHdr
*errpkt
)
63 const char *level
, *msg
;
65 parse_server_error(errpkt
, &level
, &msg
);
67 log_warning("server login failed: %s %s", level
, msg
);
69 statlist_for_each_safe(item
, &pool
->waiting_client_list
, tmp
) {
70 client
= container_of(item
, PgSocket
, head
);
71 if (!client
->wait_for_welcome
)
74 disconnect_client(client
, true, "%s", msg
);
78 /* process packets on server auth phase */
79 static bool handle_server_startup(PgSocket
*server
, PktHdr
*pkt
)
81 SBuf
*sbuf
= &server
->sbuf
;
84 if (incomplete_pkt(pkt
)) {
85 disconnect_server(server
, true, "partial pkt in login phase");
89 /* ignore most that happens during connect_query */
90 if (server
->exec_on_connect
) {
93 case 'S': /* handle them below */
96 case 'E': /* log & ignore errors */
97 log_server_error("S: error while executing exec_on_query", pkt
);
98 default: /* ignore rest */
99 sbuf_prepare_skip(sbuf
, pkt
->len
);
106 slog_error(server
, "unknown pkt from server: '%c'", pkt_desc(pkt
));
107 disconnect_server(server
, true, "unknown pkt from server");
110 case 'E': /* ErrorResponse */
111 if (!server
->pool
->welcome_msg_ready
)
112 kill_pool_logins(server
->pool
, pkt
);
114 log_server_error("S: login failed", pkt
);
116 disconnect_server(server
, true, "login failed");
119 /* packets that need closer look */
120 case 'R': /* AuthenticationXXX */
121 slog_debug(server
, "calling login_answer");
122 res
= answer_authreq(server
, pkt
);
124 disconnect_server(server
, false, "failed to answer authreq");
127 case 'S': /* ParameterStatus */
128 res
= load_parameter(server
, pkt
, true);
131 case 'Z': /* ReadyForQuery */
132 if (server
->exec_on_connect
) {
133 server
->exec_on_connect
= 0;
134 /* deliberately ignore transaction status */
135 } else if (server
->pool
->db
->connect_query
) {
136 server
->exec_on_connect
= 1;
137 slog_debug(server
, "server conect ok, send exec_on_connect");
138 SEND_generic(res
, server
, 'Q', "s", server
->pool
->db
->connect_query
);
140 disconnect_server(server
, false, "exec_on_connect query failed");
145 slog_debug(server
, "server login ok, start accepting queries");
149 finish_welcome_msg(server
);
151 /* need to notify sbuf if server was closed */
152 res
= release_server(server
);
154 /* let the takeover process handle it */
155 if (res
&& server
->pool
->db
->admin
)
156 res
= takeover_login(server
);
159 /* ignorable packets */
160 case 'K': /* BackendKeyData */
161 if (mbuf_avail(&pkt
->data
) >= BACKENDKEY_LEN
)
162 memcpy(server
->cancel_key
,
163 mbuf_get_bytes(&pkt
->data
, BACKENDKEY_LEN
),
168 case 'N': /* NoticeResponse */
169 slog_noise(server
, "skipping pkt: %c", pkt_desc(pkt
));
175 sbuf_prepare_skip(sbuf
, pkt
->len
);
180 /* process packets on logged in connection */
181 static bool handle_server_work(PgSocket
*server
, PktHdr
*pkt
)
185 SBuf
*sbuf
= &server
->sbuf
;
186 PgSocket
*client
= server
->link
;
188 Assert(!server
->pool
->db
->admin
);
192 slog_error(server
, "unknown pkt: '%c'", pkt_desc(pkt
));
193 disconnect_server(server
, true, "unknown pkt");
196 /* pooling decisions will be based on this packet */
197 case 'Z': /* ReadyForQuery */
199 /* if partial pkt, wait */
200 if (mbuf_avail(&pkt
->data
) == 0)
202 state
= mbuf_get_char(&pkt
->data
);
204 /* set ready only if no tx */
207 else if (cf_pool_mode
== POOL_STMT
) {
208 disconnect_server(server
, true, "Long transactions not allowed");
213 case 'S': /* ParameterStatus */
214 if (!load_parameter(server
, pkt
, false))
219 * 'E' and 'N' packets currently set ->ready to 0. Correct would
220 * be to leave ->ready as-is, because overal TX state stays same.
221 * It matters for connections in IDLE or USED state which get dirty
222 * suddenly but should not as they are still usable.
224 * But the 'E' or 'N' packet between transactions signifies probably
225 * dying backend. This its better to tag server as dirty and drop
228 case 'E': /* ErrorResponse */
229 if (server
->setting_vars
) {
231 * the SET and user query will be different TX
232 * so we cannot report SET error to user.
234 log_server_error("varcache_apply failed", pkt
);
237 * client probably gave invalid values in startup pkt.
239 * no reason to keep such guys.
241 disconnect_server(server
, true, "invalid server parameter");
245 case 'N': /* NoticeResponse */
249 case '2': /* BindComplete */
250 case '3': /* CloseComplete */
251 case 'c': /* CopyDone(F/B) */
252 case 'f': /* CopyFail(F/B) */
253 case 'I': /* EmptyQueryResponse == CommandComplete */
254 case 'V': /* FunctionCallResponse */
255 case 'n': /* NoData */
256 case 'G': /* CopyInResponse */
257 case 'H': /* CopyOutResponse */
258 case '1': /* ParseComplete */
259 case 'A': /* NotificationResponse */
260 case 's': /* PortalSuspended */
261 case 'C': /* CommandComplete */
263 /* data packets, there will be more coming */
264 case 'd': /* CopyData(F/B) */
265 case 'D': /* DataRow */
266 case 't': /* ParameterDescription */
267 case 'T': /* RowDescription */
270 server
->ready
= ready
;
271 server
->pool
->stats
.server_bytes
+= pkt
->len
;
273 if (server
->setting_vars
) {
275 sbuf_prepare_skip(sbuf
, pkt
->len
);
277 sbuf_prepare_send(sbuf
, &client
->sbuf
, pkt
->len
);
278 if (ready
&& client
->query_start
) {
280 total
= get_cached_time() - client
->query_start
;
281 client
->query_start
= 0;
282 server
->pool
->stats
.query_time
+= total
;
283 slog_debug(client
, "query time: %d us", (int)total
);
285 slog_warning(client
, "FIXME: query end, but query_start == 0");
288 if (server
->state
!= SV_TESTED
)
290 "got packet '%c' from server when not linked",
292 sbuf_prepare_skip(sbuf
, pkt
->len
);
298 /* got connection, decide what to do */
299 static bool handle_connect(PgSocket
*server
)
302 PgPool
*pool
= server
->pool
;
304 fill_local_addr(server
, sbuf_socket(&server
->sbuf
), server
->remote_addr
.is_unix
);
306 if (!statlist_empty(&pool
->cancel_req_list
)) {
307 slog_debug(server
, "use it for pending cancel req");
308 /* if pending cancel req, send it */
309 forward_cancel_request(server
);
310 /* notify disconnect_server() that connect did not fail */
312 disconnect_server(server
, false, "sent cancel req");
314 /* proceed with login */
315 res
= send_startup_packet(server
);
317 disconnect_server(server
, false, "startup pkt failed");
322 /* callback from SBuf */
323 bool server_proto(SBuf
*sbuf
, SBufEvent evtype
, MBuf
*data
)
326 PgSocket
*server
= container_of(sbuf
, PgSocket
, sbuf
);
327 PgPool
*pool
= server
->pool
;
330 Assert(is_server_socket(server
));
331 Assert(server
->state
!= SV_FREE
);
333 /* may happen if close failed */
334 if (server
->state
== SV_JUSTFREE
)
338 case SBUF_EV_RECV_FAILED
:
339 disconnect_server(server
, false, "server conn crashed?");
341 case SBUF_EV_SEND_FAILED
:
342 disconnect_client(server
->link
, false, "unexpected eof");
345 if (mbuf_avail(data
) < NEW_HEADER_LEN
) {
346 slog_noise(server
, "S: got partial header, trying to wait a bit");
350 /* parse pkt header */
351 if (!get_header(data
, &pkt
)) {
352 disconnect_server(server
, true, "bad pkt header");
355 slog_noise(server
, "S: pkt '%c', len=%d", pkt_desc(&pkt
), pkt
.len
);
357 server
->request_time
= get_cached_time();
358 switch (server
->state
) {
360 res
= handle_server_startup(server
, &pkt
);
366 res
= handle_server_work(server
, &pkt
);
369 fatal("server_proto: server in bad state: %d", server
->state
);
372 case SBUF_EV_CONNECT_FAILED
:
373 Assert(server
->state
== SV_LOGIN
);
374 disconnect_server(server
, false, "connect failed");
376 case SBUF_EV_CONNECT_OK
:
377 slog_debug(server
, "S: connect ok");
378 Assert(server
->state
== SV_LOGIN
);
379 server
->request_time
= get_cached_time();
380 res
= handle_connect(server
);
387 if (server
->setting_vars
) {
388 PgSocket
*client
= server
->link
;
391 server
->setting_vars
= 0;
392 sbuf_continue(&client
->sbuf
);
396 if (cf_pool_mode
!= POOL_SESSION
|| server
->state
== SV_TESTED
) {
397 switch (server
->state
) {
400 /* retval does not matter here */
401 release_server(server
);
404 slog_warning(server
, "EV_FLUSH with state=%d", server
->state
);
410 case SBUF_EV_PKT_CALLBACK
:
411 slog_warning(server
, "SBUF_EV_PKT_CALLBACK with state=%d", server
->state
);
414 if (!res
&& pool
->db
->admin
)
415 takeover_login_failed();