fix
[libpgclient.git] / src / pgconn.c
blob09be132cca944cea3907b46fb804e8fabc171225
1 /*Copyright (c) Brian B.
3 This library is free software; you can redistribute it and/or
4 modify it under the terms of the GNU Lesser General Public
5 License as published by the Free Software Foundation; either
6 version 3 of the License, or (at your option) any later version.
7 See the file LICENSE included with this distribution for more
8 information.
9 */
10 #include "hmac.h"
11 #include "libpgcli/pgconn.h"
13 #define PG_STACKSIZE 1024 * 8
14 #define PG_DEFAULT_PORT 5432
16 static uint32_t pg_addr = 0;
17 static unsigned short pg_port = 0;
19 //******************************************************************************
20 // Init
21 //******************************************************************************
22 #ifdef __GNUC__
23 __attribute ((constructor))
24 static void pg_init () {
25 #else
26 void pg_init () {
27 #endif
28 struct in_addr addr;
29 inet_aton("127.0.0.1", &addr);
30 pg_addr = addr.s_addr;
31 pg_port = htons(PG_DEFAULT_PORT);
34 #ifdef __GNUC__
35 __attribute__ ((destructor))
36 static void _pg_done () {
38 #endif
40 static int _pg_atoport (const char *service) {
41 struct servent *servent = getservbyname(service, "tcp");
42 int port;
43 char *tail;
44 if (servent)
45 return servent->s_port;
46 port = strtol(service, &tail, 0);
47 if ('\0' == *tail || port < 1 || port > USHRT_MAX)
48 return -1;
49 return htons(port);
52 void pgconn_init (const char *pg_srv_addr, const char *pg_srv_service) {
53 struct in_addr addr;
54 if (pg_srv_service)
55 pg_port = _pg_atoport(pg_srv_service);
56 if (pg_srv_addr)
57 inet_aton(pg_srv_addr, &addr);
58 pg_addr = addr.s_addr;
61 //******************************************************************************
62 // Data
63 //******************************************************************************
64 // Get
65 //******************************************************************************
66 #ifdef __GNUC__
67 const strptr_t pg_get (pgconn_t *conn, int row, int col) {
68 #else
69 strptr_t pg_get (pgconn_t *conn, int row, int col) {
70 #endif
71 strptr_t res = { .ptr = NULL, .len = 0 };
72 assert(CHECK_BOUNDS(conn, row, col));
73 res.ptr = (char*)PG_DATA(conn, row, col);
74 res.len = PG_FLDLEN(conn, row, col);
75 return res;
78 cstr_t *pg_getstr (pgconn_t *conn, int row, int col) {
79 strptr_t s = pg_get(conn, row, col);
80 cstr_t *res = NULL;
81 if (s.len > 0)
82 res = mkcstr(s.ptr, s.len);
83 return res;
86 uint32_t pg_getx32 (pgconn_t *conn, int row, int col) {
87 assert(CHECK_BOUNDS(conn, row, col));
88 pg_bit32_t *res = (pg_bit32_t*)PG_DATA(conn, row, col);
89 return be32toh(res->bit);
92 pg_intv_t pg_getintv (pgconn_t *conn, int row, int col) {
93 pg_intv_t *i, res = { .time = 0, .day = 0, .month = 0 };
94 assert(CHECK_BOUNDS(conn, row, col));
95 i = (pg_intv_t*)PG_DATA(conn, row, col),
96 res.time = be64toh(i->time);
97 res.day = be32toh(i->day);
98 res.month = be32toh(i->month);
99 return res;
102 typedef union {
103 unsigned long l;
104 unsigned char a [4];
105 } pg_inet4_t;
106 int32_t pg_getinet4 (pgconn_t *conn, int row, int col) {
107 uint8_t *v = PG_DATA(conn, row, col);
108 if (2 == v[0] && 32 == v[1] && 4 == *(unsigned short*)(v+2)) {
109 pg_inet4_t i;
110 i.a[0] = v[7];
111 i.a[1] = v[6];
112 i.a[2] = v[5];
113 i.a[3] = v[4];
114 return be32toh(i.l);
116 return 0;
119 //******************************************************************************
121 //******************************************************************************
122 #ifdef __GNUC__
123 const int pg_error (pgconn_t *conn, pgerror_t *e) {
124 #else
125 int pg_error (pgconn_t *conn, pgerror_t *e) {
126 #endif
127 e->code = NULL;
128 e->text = NULL;
129 e->msg = NULL;
130 if ((conn->intr_error & EPG_PORT)) {
131 e->code = STR(EPG_PORT);
132 e->text = "ERROR";
133 e->msg = "Illegal port";
134 return -1;
136 if ((conn->intr_error & EPG_ERRNO)) {
137 e->code = STR(EPG_ERRNO);
138 e->text = "ERROR";
139 e->msg = strerror(conn->intr_error & EPG_HOST);
140 return -1;
142 if ((conn->intr_error & EPG_PROTO)) {
143 e->code = STR(EPG_PROTO);
144 e->text = "ERROR";
145 e->msg = "Protocol error";
146 return -1;
148 switch ((conn->intr_error & EPG_HOST)) {
149 case HOST_NOT_FOUND:
150 e->code = STR(HOST_NOT_FOUND);
151 e->text = "ERROR";
152 e->msg = "The specified host is unknown.";
153 return -1;
154 case NO_DATA:
155 e->code = STR(NO_DATA);
156 e->text = "ERROR";
157 e->msg = "The requested name is valid but does not have an IP address. Another type of request to the name server for this domain may return an answer.";
158 return -1;
159 case NO_RECOVERY:
160 e->code = STR(NO_RECOVERY);
161 e->text = "ERROR";
162 e->msg = "A nonrecoverable name server error occurred.";
163 return -1;
164 case TRY_AGAIN:
165 e->code = STR(TRY_AGAIN);
166 e->text = "ERROR";
167 e->msg = "A temporary error occurred on an authoritative name server. Try again later.";
168 return -1;
170 if (conn->error) {
171 e->code = conn->error->code;
172 e->text = conn->error->text;
173 e->msg = conn->error->message;
174 e->detail = conn->error->detail;
175 return -1;
177 e->code = STR(ERRCODE_SUCCESSFUL_COMPLETION);
178 e->text = "SUCCESS";
179 e->msg = "Success";
180 return 0;
183 static void _pg_startup (pgconn_t *conn, const char *conn_info, conninfo_t *cinfo) {
184 pgmsg_t *msg = pgmsg_create_startup_params(conn_info);
185 if (-1 == pgmsg_send(conn->fd, msg))
186 goto done;
187 free(msg);
188 msg = NULL;
189 while (!conn->ready && !conn->error && 0 == pgmsg_recv(conn->fd, &msg)) {
190 pgmsg_resp_t *resp = NULL;
191 switch (msg->body.type) {
192 case PG_READY:
193 pgmsg_list_add(&conn->msgs, msg);
194 resp = pgmsg_parse(msg);
195 conn->ready = &resp->msg_ready;
196 msg = NULL;
197 break;
198 case PG_ERROR:
199 pgmsg_list_add(&conn->msgs, msg);
200 resp = pgmsg_parse(msg);
201 conn->error = &resp->msg_error;
202 msg = NULL;
203 break;
204 case PG_PARAMSTATUS:
205 resp = pgmsg_parse(msg);
206 if (0 == strcmp(resp->msg_param_status.name, "integer_datetimes"))
207 conn->is_int_datetimes = 0 == strcmp(resp->msg_param_status.value, "on") ? 1 : 0;
208 else
209 if (0 == strcmp(resp->msg_param_status.name, "is_superuser"))
210 conn->is_superuser = 0 == strcmp(resp->msg_param_status.value, "on") ? 1 : 0;
211 else
212 if (0 == strcmp(resp->msg_param_status.name, "server_version")) {
213 char *ver = strdup(resp->msg_param_status.value),
214 *p = strchr(ver, '.');
215 if (p) {
216 *p++ = '\0';
217 conn->minor_ver = strtol(p, NULL, 0);
219 conn->major_ver = strtol(ver, NULL, 0);
220 free(ver);
221 } else
222 if (0 == strcmp(resp->msg_param_status.name, "client_encoding"))
223 conn->client_encoding = strdup(resp->msg_param_status.value);
224 else
225 if (0 == strcmp(resp->msg_param_status.name, "server_encoding"))
226 conn->server_encoding = strdup(resp->msg_param_status.value);
227 else
228 if (0 == strcmp(resp->msg_param_status.name, "session_authorization"))
229 conn->session_auth = strdup(resp->msg_param_status.value);
230 else
231 if (0 == strcmp(resp->msg_param_status.name, "DateStyle"))
232 conn->date_style = strdup(resp->msg_param_status.value);
233 else
234 if (0 == strcmp(resp->msg_param_status.name, "TimeZone"))
235 conn->timezone = strdup(resp->msg_param_status.value);
236 if (resp) free(resp);
237 resp = NULL;
238 free(msg);
239 msg = NULL;
240 break;
241 case PG_AUTHOK:
242 resp = pgmsg_parse(msg);
243 if (!resp) continue;
244 conn->authok = resp->msg_auth.success;
245 switch (conn->authok) {
246 case PG_REQMD5:
247 case PG_REQPASS:
248 memcpy(cinfo->salt, resp->msg_auth.kind.md5_auth, sizeof(uint8_t) * 4);
249 free(resp);
250 free(msg);
251 msg = pgmsg_create_pass(conn->authok, cinfo->salt, sizeof(uint8_t) * 4, cinfo->user, cinfo->pass);
252 pgmsg_send(conn->fd, msg);
253 free(msg);
254 break;
255 case PG_REQSASL:
256 free(msg);
257 msg = pgmsg_create_sasl_init(cinfo);
258 pgmsg_send(conn->fd, msg);
259 break;
260 case PG_SASLCON:
261 free(msg);
262 msg = pgmsg_create_sasl_fin(resp, cinfo);
263 pgmsg_send(conn->fd, msg);
264 break;
265 case PG_OK:
266 case PG_SASLCOMP:
267 break;
268 default:
269 conn->intr_error = EPG_PROTO;
270 goto done;
272 free(msg);
273 free(resp);
274 msg = NULL;
275 break;
276 default:
277 free(msg);
278 msg = NULL;
279 break;
282 done:
283 if (msg)
284 free(msg);
287 static void *_parse_param (void *data, const char *begin, const char *end, unsigned int *flags) {
288 conninfo_t *cinfo = (conninfo_t*)data;
289 const char *p = begin, *p1;
290 while (p < end && '=' != *p) ++p;
291 if (p == end)
292 return data;
293 p1 = p;
294 while (p1 > begin && isspace(*(p1 - 1))) --p1;
295 if (p == p1 - 1)
296 return data;
297 ++p;
298 while (p < end && isspace(*p)) ++p;
299 if (p == end)
300 return data;
301 if (0 == strncmp(begin, "host", (uintptr_t)p1 - (uintptr_t)begin)) {
302 char *addr = strndup(p, (uintptr_t)end - (uintptr_t)p);
303 struct in_addr in_addr;
304 memset(&in_addr, 0, sizeof in_addr);
305 if (0 == atoaddr(addr, (struct in_addr*)&in_addr))
306 cinfo->in_addr.sin_addr = in_addr;
307 free(addr);
308 } else
309 if (0 == strncmp(begin, "port", (uintptr_t)p1 - (uintptr_t)begin)) {
310 char *port_s = strndup(p, (uintptr_t)end - (uintptr_t)p), *tail;
311 int port = strtol(port_s, &tail, 0);
312 if ('\0' == *tail && ERANGE != errno)
313 cinfo->in_addr.sin_port = htons(port);
314 else
315 *flags |= EPG_PORT;
316 free(port_s);
317 } else
318 if (0 == strncmp(begin, "user", (uintptr_t)p1 - (uintptr_t)begin)) {
319 if (cinfo->user)
320 free(cinfo->user);
321 cinfo->user = strndup(p, (uintptr_t)end - (uintptr_t)p);
322 } else
323 if (0 == strncmp(begin, "password", (uintptr_t)p1 - (uintptr_t)begin)) {
324 if (cinfo->pass)
325 free(cinfo->pass);
326 cinfo->pass = strndup(p, (uintptr_t)end - (uintptr_t)p);
328 return data;
331 static str_t *_parse_url (pgconn_t *conn, const char *url) {
332 char *str = strdup(url);
333 str_t *conninfo = stralloc(64, 64);
334 char *s = strstr(str, "://"), *e, *p,
335 *user = NULL,
336 *password = NULL,
337 *host = NULL,
338 *port = NULL,
339 *dbname = NULL;
340 if (!s) {
341 free(str);
342 return conninfo;
344 s += 3;
345 if (!*s) {
346 free(str);
347 return conninfo;
349 if ((e = strchr(s, '/')))
350 *e = '\0';
351 if ((p = strchr(s, '@'))) {
352 char *q;
353 *p = '\0';
354 user = s;
355 if ((q = strchr(s, ':'))) {
356 *q = '\0';
357 password = ++q;
359 s = p + 1;
361 host = s;
362 if ((p = strchr(s, ':'))) {
363 *p = '\0';
364 port = p + 1;
366 if (host) {
367 strnadd(&conninfo, CONST_STR_LEN("host="));
368 strnadd(&conninfo, host, strlen(host));
370 if (port) {
371 if (conninfo->len > 0)
372 strnadd(&conninfo, CONST_STR_LEN(" "));
373 strnadd(&conninfo, CONST_STR_LEN("port="));
374 strnadd(&conninfo, port, strlen(port));
376 if (user) {
377 if (conninfo->len > 0)
378 strnadd(&conninfo, CONST_STR_LEN(" "));
379 strnadd(&conninfo, CONST_STR_LEN("user="));
380 strnadd(&conninfo, user, strlen(user));
382 if (password) {
383 if (conninfo->len > 0)
384 strnadd(&conninfo, CONST_STR_LEN(" "));
385 strnadd(&conninfo, CONST_STR_LEN("password="));
386 strnadd(&conninfo, password, strlen(password));
388 if (!e || !*(++e)) {
389 free(str);
390 return conninfo;
392 s = e;
393 if ((e = strchr(s, '?')))
394 *e = '\0';
395 dbname = s;
396 if (conn->dbname) free(conn->dbname);
397 conn->dbname = strdup(dbname);
398 if (conninfo->len > 0) {
399 if (conninfo->len > 0)
400 strnadd(&conninfo, CONST_STR_LEN(" "));
401 strnadd(&conninfo, CONST_STR_LEN("dbname="));
402 strnadd(&conninfo, dbname, strlen(dbname));
404 if (!e || !*(++e)) {
405 free(str);
406 return conninfo;
408 s = e;
409 str_t *params = strsplit(s, strlen(s), '&');
410 strptr_t entry = { .ptr = NULL, .len = 0 };
411 while (-1 != strnext(params, &entry)) {
412 if ((p = strnchr(entry.ptr, '=', entry.len))) {
413 if (conninfo->len > 0)
414 strnadd(&conninfo, CONST_STR_LEN(" "));
415 strnadd(&conninfo, entry.ptr, entry.len);
418 free(params);
419 free(str);
420 return conninfo;
423 pgconn_t *pg_connect (const char *url) {
424 pgconn_t *conn;
425 conninfo_t cinfo = { .in_addr = {
426 .sin_family = AF_INET,
427 .sin_port = pg_port,
428 .sin_addr.s_addr = pg_addr },
429 .user = NULL, .pass = NULL,
430 .srv_scram_msg = NULL, .fmsg_bare = NULL,
431 .fmsg_srv = NULL, .fmsg_wproof = NULL,
432 .nonce = NULL };
433 int fd;
434 uint32_t flags = 0;
435 conn = calloc(1, sizeof(pgconn_t));
436 str_t *conn_info = _parse_url(conn, url);
437 parse_conninfo((void*)&cinfo, conn_info->ptr, _parse_param, &flags);
438 if (0 != (conn->intr_error = flags)) {
439 free(conn_info);
440 return conn;
442 fd = socket(AF_INET, SOCK_STREAM, 0);
443 if (-1 == connect(fd, (struct sockaddr*)&cinfo.in_addr, sizeof cinfo.in_addr)) {
444 free(conn_info);
445 close(fd);
446 conn->intr_error = EPG_ERRNO | errno;
447 return conn;
449 conn->fd = fd;
450 _pg_startup(conn, conn_info->ptr, &cinfo);
451 if (cinfo.user)
452 free(cinfo.user);
453 if (cinfo.pass)
454 free(cinfo.pass);
455 if (cinfo.srv_scram_msg)
456 free(cinfo.srv_scram_msg);
457 if (cinfo.fmsg_bare)
458 free(cinfo.fmsg_bare);
459 if (cinfo.fmsg_srv)
460 free(cinfo.fmsg_srv);
461 if (cinfo.fmsg_wproof)
462 free(cinfo.fmsg_wproof);
463 if (cinfo.nonce)
464 free(cinfo.nonce);
465 free(conn_info);
466 return conn;
469 static void _pg_close (pgconn_t *conn) {
470 if (conn->rows) {
471 free(conn->rows);
472 conn->rows = NULL;
474 if (conn->ready) {
475 free(conn->ready);
476 conn->ready = NULL;
478 if (conn->error) {
479 free(conn->error);
480 conn->error = NULL;
482 if (conn->rowdesc) {
483 free(conn->rowdesc);
484 conn->rowdesc = NULL;
486 if (conn->complete) {
487 free(conn->complete);
488 conn->complete = NULL;
490 conn->suspended = NULL;
491 pgmsg_datarow_list_clear(&conn->row_list);
492 pgmsg_list_clear(&conn->msgs);
493 conn->nflds = 0;
496 void pg_close (pgconn_t *conn) {
497 _pg_close(conn);
500 void pg_disconnect (pgconn_t *conn) {
501 pgmsg_t *msg = pgmsg_create(PG_TERM);
502 pgmsg_send(conn->fd, msg);
503 free(msg);
504 _pg_close(conn);
505 if (conn->date_style)
506 free(conn->date_style);
507 if (conn->client_encoding)
508 free(conn->client_encoding);
509 if (conn->server_encoding)
510 free(conn->server_encoding);
511 if (conn->session_auth)
512 free(conn->session_auth);
513 if (conn->timezone)
514 free(conn->timezone);
515 if (conn->dbname)
516 free(conn->dbname);
517 if (conn->nonce)
518 free(conn->nonce);
519 pgmsg_datarow_list_clear(&conn->row_list);
520 pgmsg_list_clear(&conn->msgs);
521 if (conn->fd > 0)
522 close(conn->fd);
523 free(conn);
526 static void _set_rows (pgconn_t *conn) {
527 size_t i = 0;
528 pgmsg_datarow_list_t *list = &conn->row_list;
529 pgmsg_datarow_item_t *item = list->head;
530 if (item) {
531 conn->rows = malloc(conn->row_list.len * sizeof(void*));
532 do {
533 conn->rows[i++] = item->datarow;
534 item = item->next;
535 } while (item != list->head);
539 static void _wait_ready (pgconn_t *conn) {
540 pgmsg_t *msg;
541 int is_ready = 0;
542 while (!is_ready && 0 == pgmsg_recv(conn->fd, &msg)) {
543 is_ready = PG_READY == msg->body.type;
544 free(msg);
548 static int _simple_exec (pgconn_t *conn) {
549 int rc = 0;
550 pgmsg_t *msg;
551 while (0 == pgmsg_recv(conn->fd, &msg)) {
552 pgmsg_resp_t *resp;
553 switch (msg->body.type) {
554 case PG_READY:
555 pgmsg_list_add(&conn->msgs, msg);
556 resp = pgmsg_parse(msg);
557 conn->ready = &resp->msg_ready;
558 if (conn->row_list.len > 0)
559 _set_rows(conn);
560 goto done;
561 case PG_ERROR:
562 pgmsg_list_add(&conn->msgs, msg);
563 resp = pgmsg_parse(msg);
564 conn->error = &resp->msg_error;
565 rc = -1;
566 _wait_ready(conn);
567 goto done;
568 case PG_ROWDESC:
569 pgmsg_list_add(&conn->msgs, msg);
570 resp = pgmsg_parse(msg);
571 conn->rowdesc = &resp->msg_rowdesc;
572 conn->nflds = conn->rowdesc->nflds;
573 break;
574 case PG_CMDCOMPLETE:
575 pgmsg_list_add(&conn->msgs, msg);
576 resp = pgmsg_parse(msg);
577 conn->complete = &resp->msg_complete;
578 break;
579 case PG_DATAROW:
580 pgmsg_list_add(&conn->msgs, msg);
581 resp = pgmsg_parse(msg);
582 pgmsg_datarow_list_add(&conn->row_list, &resp->msg_datarow);
583 break;
584 case PG_NODATA:
585 free(msg);
586 conn->nflds = 0;
587 break;
588 case PG_COPYIN:
589 pgmsg_list_add(&conn->msgs, msg);
590 resp = pgmsg_parse(msg);
591 conn->cols = resp->msg_copyin.cols;
592 conn->fmt = resp->msg_copyin.fmt;
593 free(resp);
594 goto done;
595 default:
596 free(msg);
597 break;
600 done:
601 return rc;
604 static int _exec (pgconn_t *conn) {
605 int rc = 0;
606 pgmsg_t *msg;
607 _pg_close(conn);
608 while (0 == pgmsg_recv(conn->fd, &msg)) {
609 pgmsg_resp_t *resp;
610 switch (msg->body.type) {
611 case PG_READY:
612 pgmsg_list_add(&conn->msgs, msg);
613 resp = pgmsg_parse(msg);
614 conn->ready = &resp->msg_ready;
615 if (conn->row_list.len > 0)
616 _set_rows(conn);
617 goto done;
618 case PG_PARSECOMPLETE:
619 free(msg);
620 break;
621 case PG_BINDCOMPLETE:
622 free(msg);
623 break;
624 case PG_ROWDESC:
625 pgmsg_list_add(&conn->msgs, msg);
626 resp = pgmsg_parse(msg);
627 conn->rowdesc = &resp->msg_rowdesc;
628 conn->nflds = conn->rowdesc->nflds;
629 break;
630 case PG_PARAMDESC:
631 free(msg);
632 if (0 == pgmsg_recv(conn->fd, &msg)) {
633 switch (msg->body.type) {
634 case PG_ERROR:
635 pgmsg_list_add(&conn->msgs, msg);
636 resp = pgmsg_parse(msg);
637 conn->error = &resp->msg_error;
638 rc = -1;
639 _wait_ready(conn);
640 break;
641 case PG_ROWDESC:
642 pgmsg_list_add(&conn->msgs, msg);
643 resp = pgmsg_parse(msg);
644 conn->rowdesc = &resp->msg_rowdesc;
645 conn->nflds = conn->rowdesc->nflds;
646 break;
647 case PG_NODATA:
648 free(msg);
649 conn->nflds = 0;
650 break;
651 default:
652 free(msg);
653 break;
656 // ins
657 case PG_ERROR:
658 pgmsg_list_add(&conn->msgs, msg);
659 resp = pgmsg_parse(msg);
660 conn->error = &resp->msg_error;
661 rc = -1;
662 _wait_ready(conn);
663 goto done;
664 case PG_CMDCOMPLETE:
665 pgmsg_list_add(&conn->msgs, msg);
666 resp = pgmsg_parse(msg);
667 conn->complete = &resp->msg_complete;
668 break;
669 case PG_DATAROW:
670 pgmsg_list_add(&conn->msgs, msg);
671 resp = pgmsg_parse(msg);
672 pgmsg_datarow_list_add(&conn->row_list, &resp->msg_datarow);
673 break;
674 case PG_NODATA:
675 free(msg);
676 conn->nflds = 0;
677 break;
678 case PG_PORTALSUSPENDED:
679 pgmsg_list_add(&conn->msgs, msg);
680 conn->suspended = msg;
681 break;
682 case PG_EMPTYQUERY:
683 pgmsg_list_add(&conn->msgs, msg);
684 conn->emptyquery = msg;
685 break;
686 default:
687 free(msg);
688 break;
691 done:
692 return rc;
695 int pg_execsql (pgconn_t *conn, const char *sql, size_t sql_len) {
696 int rc;
697 pgmsg_t *msg;
698 _pg_close(conn);
699 if (0 == sql_len)
700 sql_len = strlen(sql);
701 msg = pgmsg_create_simple_query(sql, sql_len);
702 rc = pgmsg_send(conn->fd, msg);
703 free(msg);
704 if (0 == rc)
705 rc = _simple_exec(conn);
706 return rc;
709 int pg_copyin (pgconn_t *conn, const char *table_name) {
710 cstr_t *sql = cstrfmt("copy %s from stdin", table_name);
711 int rc = pg_execsql(conn, sql->ptr, sql->len);
712 _pg_close(conn);
713 free(sql);
714 if (!rc) rc = conn->cols;
715 return rc;
718 int pg_copyin_sendln (pgconn_t *conn, pgfld_t **flds) {
719 pgmsg_t *msg = pgmsg_copyin_flds(conn->cols, flds);
720 if (!msg) return -1;
721 _pg_close(conn);
722 int rc = pgmsg_send(conn->fd, msg);
723 free(msg);
724 return rc;
727 int pg_copyin_end (pgconn_t *conn) {
728 pgmsg_t *msg = pgmsg_create(PG_COPYEND);
729 _pg_close(conn);
730 int rc = pgmsg_send(conn->fd, msg);
731 free(msg);
732 if (0 == rc)
733 rc = _simple_exec(conn);
734 return rc;
737 static int _pg_sync (pgconn_t *conn) {
738 int rc;
739 pgmsg_t *msg = pgmsg_create(PG_SYNC);
740 rc = pgmsg_send(conn->fd, msg);
741 free(msg);
742 return rc;
745 int pg_prepareln (pgconn_t *conn, const char *name, size_t name_len, const char *sql, size_t sql_len, int fld_len, pgfld_t **flds) {
746 int rc;
747 pgmsg_t *msg;
748 _pg_close(conn);
749 msg = pgmsg_create_parse(name, name_len, sql, sql_len, fld_len, flds);
750 rc = pgmsg_send(conn->fd, msg);
751 free(msg);
752 if (0 == rc && name && 0 == (rc = _pg_sync(conn)))
753 rc = _exec(conn);
754 return rc;
757 DECLARE_ARRAY(pgfld_array,pgfld_ptr_t)
758 DECLARE_ARRAY_FUN(pgfld_array,pgfld_ptr_t)
760 static int pg_preparevn (pgconn_t *conn, const char *name, size_t name_len, const char *sql, size_t sql_len, pgfld_t *fld, va_list ap) {
761 int rc;
762 pgfld_array_t *flds = create_pgfld_array(16, 16, 0);
763 if (fld) {
764 add_pgfld_array(&flds, &fld);
765 while (NULL != (fld = va_arg(ap, pgfld_ptr_t)))
766 add_pgfld_array(&flds, &fld);
768 rc = pg_prepareln(conn, name, name_len, sql, sql_len, flds->len, flds->ptr);
769 free_pgfld_array(&flds);
770 return rc;
773 int pg_preparen (pgconn_t *conn, const char *name, size_t name_len, const char *sql, size_t sql_len, pgfld_t *fld, ...) {
774 int rc;
775 va_list ap;
776 va_start(ap, fld);
777 rc = pg_preparevn(conn, name, name_len, sql, sql_len, fld, ap);
778 va_end(ap);
779 return rc;
782 int pg_prepare (pgconn_t *conn, const char *sql, size_t sql_len, pgfld_t *fld, ...) {
783 int rc;
784 va_list ap;
785 va_start(ap, fld);
786 rc = pg_preparevn(conn, CONST_STR_NULL, sql, sql_len, fld, ap);
787 va_end(ap);
788 return rc;
791 static int _pg_bind (pgconn_t *conn, const char *portal, size_t portal_len, const char *stmt, size_t stmt_len,
792 int fld_len, pgfld_t **flds, int res_fmt_len, int *res_fmt) {
793 int rc;
794 pgmsg_t *msg;
795 _pg_close(conn);
796 msg = pgmsg_create_bind(portal, portal_len, stmt, stmt_len, fld_len, flds, res_fmt_len, res_fmt);
797 rc = pgmsg_send(conn->fd, msg);
798 free(msg);
799 return rc;
802 static int _pg_describe (pgconn_t *conn, int8_t op, const char *portal, size_t portal_len) {
803 int rc;
804 pgmsg_t *msg;
805 msg = pgmsg_create_describe(op, portal, portal_len);
806 rc = pgmsg_send(conn->fd, msg);
807 free(msg);
808 return rc;
811 static int _pg_execute (pgconn_t *conn, const char *portal, size_t portal_len, int max_rows) {
812 int rc;
813 pgmsg_t *msg;
814 msg = pgmsg_create_execute(portal, portal_len, max_rows);
815 rc = pgmsg_send(conn->fd, msg);
816 free(msg);
817 if (0 == rc && 0 == (rc = _pg_sync(conn)))
818 rc = _exec(conn);
819 return rc;
822 int pg_execln (pgconn_t *conn, const char *portal, size_t portal_len, const char *stmt, size_t stmt_len,
823 int fld_len, pgfld_t **flds, int res_fmt_len, int *res_fmt, int max_data) {
824 int rc;
825 if (0 == (rc = _pg_bind(conn, portal, portal_len, stmt, stmt_len, fld_len, flds, res_fmt_len, res_fmt)) &&
826 0 == (rc = _pg_describe(conn, PG_PREPARED_PORTAL, portal, portal_len)))
827 rc = _pg_execute(conn, portal, portal_len, max_data);
828 return rc;
831 int pg_nextn (pgconn_t *conn, const char *portal, size_t portal_len, int max_data) {
832 return _pg_execute(conn, portal, portal_len, max_data);
835 int pg_execvn (pgconn_t *conn, const char *portal, size_t portal_len, const char *stmt, size_t stmt_len, int max_data, int out_fmt, pgfld_t *fld, va_list ap) {
836 int rc;
837 pgfld_array_t *flds = create_pgfld_array(16, 16, 0);
838 if (fld) {
839 add_pgfld_array(&flds, &fld);
840 while (NULL != (fld = va_arg(ap, pgfld_ptr_t)))
841 add_pgfld_array(&flds, &fld);
843 rc = pg_execln(conn, portal, portal_len, stmt, stmt_len, flds->len, flds->ptr, 1, &out_fmt, max_data);
844 free_pgfld_array(&flds);
845 return rc;
848 int pg_execn (pgconn_t *conn, const char *portal, size_t portal_len, const char *stmt, size_t stmt_len, int max_data, int out_fmt, pgfld_t *fld, ...) {
849 int rc;
850 va_list ap;
851 va_start(ap, fld);
852 rc = pg_execvn(conn, portal, portal_len, stmt, stmt_len, max_data, out_fmt, fld, ap);
853 va_end(ap);
854 return rc;
857 int pg_exec (pgconn_t *conn, int max_data, int out_fmt, pgfld_t *fld, ...) {
858 int rc;
859 va_list ap;
860 va_start(ap, fld);
861 rc = pg_execvn(conn, CONST_STR_NULL, CONST_STR_NULL, max_data, out_fmt, fld, ap);
862 va_end(ap);
863 return rc;
866 int pg_release (pgconn_t *conn, const char *name, size_t name_len) {
867 pgmsg_t *msg = pgmsg_create_close(PG_OPNAME, name, 0 == name_len ? strlen(name) : name_len);
868 int rc = pgmsg_send(conn->fd, msg);
869 free(msg);
870 return rc;