1 (in-package :cl-postgres
)
3 ;; For more information about the PostgreSQL scocket protocol, see
4 ;; http://www.postgresql.org/docs/current/interactive/protocol.html
6 (define-condition protocol-error
(error)
7 ((message :initarg
:message
))
8 (:report
(lambda (err stream
)
9 (format stream
"PostgreSQL protocol error: ~A"
10 (slot-value err
'message
))))
11 (:documentation
"This is raised if something really unexpected
12 happens in the communcation with the server. Should only happen in
13 case of a bug or a connection to something that is not a \(supported)
14 PostgreSQL server at all."))
16 (defmacro message-case
(socket &body clauses
)
17 "Helper macro for reading messages from the server. A list of cases
18 \(characters that identify the message) can be given, each with a body
19 that handles the message, or the keyword :skip to skip the message.
20 Cases for error and warning messages are always added.
22 The body may contain an initial parameter of the form :LENGTH-SYM SYMBOL
23 where SYMBOL is a symbol to which the remaining length of the packet is
24 bound. This value indicates the number of bytes that have to be read
26 (let ((socket-name (gensym))
31 (size-sym (and (eq (car clauses
) :length-sym
) (progn (pop clauses
) (pop clauses
)))))
32 (flet ((expand-characters (chars)
33 (cond ((eq chars t
) (setf t-found t
) t
)
34 ((consp chars
) (mapcar #'char-code chars
))
35 (t (char-code chars
)))))
36 `(let* ((,socket-name
,socket
))
37 (declare (type stream
,socket-name
))
38 (labels ((,iter-name
()
39 (let ((,char-name
(read-uint1 ,socket-name
))
40 (,size-name
(read-uint4 ,socket-name
)))
41 (declare (type (unsigned-byte 8) ,char-name
)
42 (type (unsigned-byte 32) ,size-name
)
43 (ignorable ,size-name
))
46 (get-notification ,socket-name
)
48 (#.
(char-code #\E
) (get-error ,socket-name
))
49 (#.
(char-code #\S
) ;; ParameterStatus: read and continue
50 (update-parameter ,socket-name
)
52 (#.
(char-code #\N
) ;; A warning
53 (get-warning ,socket-name
)
55 ,@(mapcar (lambda (clause)
56 `(,(expand-characters (first clause
))
57 ,(if (eq (second clause
) :skip
)
58 `(skip-bytes ,socket-name
(- ,size-name
4))
60 `(let ((,size-sym
(- ,size-name
4)))
62 `(progn ,@(cdr clause
))))))
65 `((t (ensure-socket-is-closed ,socket-name
)
66 (error 'protocol-error
67 :message
(format nil
"Unexpected message received: ~A"
68 (code-char ,char-name
))))))))))
71 (defparameter *connection-params
* nil
72 "Bound to the current connection's parameter table when executing
75 (defun update-parameter (socket)
76 (let ((name (read-str socket
))
77 (value (read-str socket
)))
78 (setf (gethash name
*connection-params
*) value
)))
80 (defun read-byte-delimited (socket)
81 "Read the fields of a null-terminated list of byte + string values
82 and put them in an alist."
83 (loop :for type
= (read-uint1 socket
)
85 :collect
(cons (code-char type
) (read-simple-str socket
))))
87 (define-condition postgresql-notification
(simple-warning)
88 ((pid :initarg
:pid
:accessor postgresql-notification-pid
)
89 (channel :initarg
:channel
:accessor postgresql-notification-channel
)
90 (payload :initarg
:payload
:accessor postgresql-notification-payload
)))
92 (defun get-notification (socket)
93 "Read an asynchronous notification message from the socket and
94 signal a condition for it."
95 (let ((pid (read-int4 socket
))
96 (channel (read-str socket
))
97 (payload (read-str socket
)))
98 (warn 'postgresql-notification
102 :format-control
"Asynchronous notification ~S~@[ (payload: ~S)~] received from ~
103 server process with PID ~D."
104 :format-arguments
(list channel payload pid
))))
106 (defun get-error (socket)
107 "Read an error message from the socket and raise the corresponding
108 database-error condition."
109 (let ((data (read-byte-delimited socket
)))
110 (flet ((get-field (char)
111 (cdr (assoc char data
))))
112 (let ((code (get-field #\C
)))
113 ;; These are the errors "ADMIN SHUTDOWN" and "CRASH SHUTDOWN",
114 ;; in which case the server will close the connection right
116 (when (or (string= code
"57P01") (string= code
"57P02"))
117 (ensure-socket-is-closed socket
))
118 (error (cl-postgres-error::get-error-type code
)
120 :message
(get-field #\M
)
121 :detail
(get-field #\D
)
122 :hint
(get-field #\H
)
123 :context
(get-field #\W
)
124 :position
(let ((position (get-field #\p
)))
125 (when position
(parse-integer position
))))))))
127 (define-condition postgresql-warning
(simple-warning)
130 (defun get-warning (socket)
131 "Read a warning from the socket and emit it."
132 (let ((data (read-byte-delimited socket
)))
133 (flet ((get-field (char)
134 (cdr (assoc char data
))))
135 (warn 'postgresql-warning
136 :format-control
"PostgreSQL warning: ~A~@[~%~A~]"
137 :format-arguments
(list (get-field #\M
) (or (get-field #\D
) (get-field #\H
)))))))
139 (defparameter *ssl-certificate-file
* nil
140 "When set to a filename, this file will be used as client
141 certificate for SSL connections.")
142 (defparameter *ssl-key-file
* nil
143 "When set to a filename, this file will be used as client key for
146 ;; The let is used to remember that we have found the
147 ;; cl+ssl:make-ssl-client-stream function before.
148 (let ((make-ssl-stream nil
))
149 (defun initiate-ssl (socket required
)
150 "Initiate SSL handshake with the PostgreSQL server, and wrap the
151 socket in an SSL stream. When require is true, an error will be raised
152 when the server does not support SSL."
153 (unless make-ssl-stream
154 (unless (find-package :cl
+ssl
)
155 (error 'database-error
:message
"CL+SSL is not loaded. Load it to enable SSL."))
156 (setf make-ssl-stream
(intern (string '#:make-ssl-client-stream
) :cl
+ssl
)))
157 (ssl-request-message socket
)
158 (force-output socket
)
159 (ecase (read-byte socket
)
161 (setf socket
(funcall make-ssl-stream socket
:key
*ssl-key-file
*
162 :certificate
*ssl-certificate-file
*)))
165 (error 'database-error
:message
"Server does not support SSL encryption."))))))
167 (defun authenticate (socket conn
)
168 "Try to initiate a connection. Caller should close the socket if
169 this raises a condition."
171 (let ((gss-context nil
)
172 (gss-init-function nil
)
173 (user (connection-user conn
))
174 (password (connection-password conn
))
175 (database (connection-db conn
))
176 (use-ssl (connection-use-ssl conn
)))
178 (unless (eq use-ssl
:no
)
179 (setf socket
(initiate-ssl socket
(eq use-ssl
:yes
))))
180 (startup-message socket user database
)
181 (force-output socket
)
183 (labels ((init-gss-msg (in-buffer)
184 (when (null gss-init-function
)
185 (when (null (find-package "CL-GSS"))
186 (error 'database-error
:message
"To use GSS authentication, make sure the CL-GSS package is loaded."))
187 (setq gss-init-function
(find-symbol "INIT-SEC" "CL-GSS"))
188 (unless gss-init-function
189 (error 'database-error
:message
"INIT-SEC not found in CL-GSS package")))
190 (multiple-value-bind (continue-needed context buffer flags
)
191 (funcall gss-init-function
192 (format nil
"~a@~a" (connection-service conn
) (connection-host conn
))
195 :input-token in-buffer
)
196 (declare (ignore flags
))
197 (setq gss-context context
)
199 (gss-auth-buffer-message socket buffer
))
200 (force-output socket
)
204 (message-case socket
:length-sym size
205 ;; Authentication message
206 (#\R
(let ((type (read-uint4 socket
)))
209 (2 (error 'database-error
:message
"Unsupported Kerberos authentication requested."))
210 (3 (unless password
(error "Server requested plain-password authentication, but no password was given."))
211 (plain-password-message socket password
)
212 (force-output socket
))
213 (4 (error 'database-error
:message
"Unsupported crypt authentication requested."))
214 (5 (unless password
(error "Server requested md5-password authentication, but no password was given."))
215 (md5-password-message socket password user
(read-bytes socket
4))
216 (force-output socket
))
217 (6 (error 'database-error
:message
"Unsupported SCM authentication requested."))
219 (error 'database-error
:message
"Got GSS init message when a context was already established"))
221 (8 (unless gss-context
222 (error 'database-error
:message
"Got GSS continuation message without a context"))
223 (init-gss-msg (read-bytes socket
(- size
4)))))))))))
226 ;; BackendKeyData - ignore
229 (#\Z
(read-uint1 socket
)
233 (defclass field-description
()
234 ((name :initarg
:name
:accessor field-name
)
235 (type-id :initarg
:type-id
:accessor field-type
)
236 (interpreter :initarg
:interpreter
:accessor field-interpreter
)
237 (receive-binary-p :initarg
:receive-binary-p
:reader field-binary-p
))
238 (:documentation
"Description of a field in a query result."))
240 (defun read-field-descriptions (socket)
241 "Read the field descriptions for a query result and put them into an
242 array of field-description objects."
243 (declare (type stream socket
)
245 (let* ((number (read-uint2 socket
))
246 (descriptions (make-array number
)))
247 (declare (type fixnum number
)
248 (type (simple-array field-description
) descriptions
))
250 (let* ((name (read-str socket
))
251 (table-oid (read-uint4 socket
))
252 (column (read-uint2 socket
))
253 (type-id (read-uint4 socket
))
254 (size (read-uint2 socket
))
255 (type-modifier (read-uint4 socket
))
256 (format (read-uint2 socket
))
257 (interpreter (get-type-interpreter type-id
)))
258 (declare (ignore table-oid column size type-modifier format
)
260 (type (unsigned-byte 32) type-id
))
261 (setf (elt descriptions i
)
262 (if (interpreter-binary-p interpreter
)
263 (make-instance 'field-description
:name name
:type-id type-id
264 :interpreter
(type-interpreter-binary-reader interpreter
)
266 (make-instance 'field-description
:name name
:type-id type-id
267 :interpreter
(type-interpreter-text-reader interpreter
)
268 :receive-binary-p nil
)))))
271 (defun terminate-connection (socket)
272 "Close a connection, notifying the server."
273 (terminate-message socket
)
276 ;; This is a hacky way to communicate the amount of effected rows up
277 ;; from look-for-row to the send-execute or send-query that (directly
278 ;; or indirectly) called it.
279 (defparameter *effected-rows
* nil
)
281 (defun look-for-row (socket)
282 "Read server messages until either a new row can be read, or there
283 are no more results. Return a boolean indicating whether any more
284 results are available, and, if available, stores the amount of
285 effected rows in *effected-rows*. Also handle getting out of
286 copy-in/copy-out states \(which are not supported)."
287 (declare (type stream socket
)
292 (#\C
(let* ((command-tag (read-str socket
))
293 (space (position #\Space command-tag
:from-end t
)))
295 (setf *effected-rows
* (parse-integer command-tag
:junk-allowed t
297 (return-from look-for-row nil
)))
299 (#\G
(read-uint1 socket
)
300 (skip-bytes socket
(* 2 (read-uint2 socket
))) ;; The field formats
301 (copy-done-message socket
)
302 (error 'database-error
:message
"Copy-in not supported."))
304 (#\H
(read-uint1 socket
)
305 (skip-bytes socket
(* 2 (read-uint2 socket
))) ;; The field formats
306 (error 'database-error
:message
"Copy-out not supported."))
308 (#\D
(skip-bytes socket
2)
309 (return-from look-for-row t
))
310 ;; EmptyQueryResponse
311 (#\I
(warn "Empty query sent.")
312 (return-from look-for-row nil
)))))
314 (defun try-to-sync (socket sync-sent
)
315 "Try to re-synchronize a connection by sending a sync message if it
316 hasn't already been sent, and then looking for a ReadyForQuery
318 (when (open-stream-p socket
)
323 (sync-message socket
)
324 (force-output socket
))
325 ;; TODO initiate timeout on the socket read, signal timeout error
326 (loop :while
(and (not ok
) (open-stream-p socket
))
327 :do
(message-case socket
328 (#\Z
(read-uint1 socket
)
332 ;; if we can't sync, make sure the socket is shot
333 ;; (e.g. a timeout, or aborting execution with a restart from sldb)
334 (ensure-socket-is-closed socket
:abort t
))))))
336 (defmacro with-syncing
(&body body
)
337 "Macro to wrap a block in a handler that will try to re-sync the
338 connection if something in the block raises a condition. Not hygienic
339 at all, only used right below here."
340 `(let ((sync-sent nil
)
344 (multiple-value-prog1
348 (try-to-sync socket sync-sent
)))
350 (ensure-socket-is-closed socket
:abort t
)
353 (defmacro returning-effected-rows
(value &body body
)
354 "Computes a value, then runs a body, then returns, as multiple
355 values, that value and the amount of effected rows, if any (see
357 (let ((value-name (gensym)))
358 `(let* ((*effected-rows
* nil
)
359 (,value-name
,value
))
362 (values ,value-name
*effected-rows
*)
365 (defun send-query (socket query row-reader
)
366 "Send a query to the server, and apply the given row-reader to the
368 (declare (type stream socket
)
373 (let ((row-description nil
))
374 (simple-parse-message socket query
)
375 (simple-describe-message socket
)
376 (flush-message socket
)
377 (force-output socket
)
382 ;; ParameterDescription
386 (#\T
(setf row-description
(read-field-descriptions socket
)))
389 (simple-bind-message socket
(map 'vector
'field-binary-p row-description
))
390 (simple-execute-message socket
)
391 (sync-message socket
)
393 (force-output socket
)
397 (returning-effected-rows
399 (funcall row-reader socket row-description
)
400 (look-for-row socket
))
402 ;; ReadyForQuery, skipping transaction status
403 (#\Z
(read-uint1 socket
))))))))
405 (defun send-parse (socket name query
)
406 "Send a parse command to the server, giving it a name."
407 (declare (type stream socket
)
408 (type string name query
)
412 (parse-message socket name query
)
413 (flush-message socket
)
414 (force-output socket
)
419 (defun send-close (socket name
)
420 "Send a close command to the server, giving it a name."
421 (declare (type stream socket
)
425 (close-prepared-message socket name
)
426 (flush-message socket
)
427 (force-output socket
)
432 (defun send-execute (socket name parameters row-reader
)
433 "Execute a previously parsed query, and apply the given row-reader
435 (declare (type stream socket
)
437 (type list parameters
)
440 (let ((row-description nil
)
442 (declare (type (unsigned-byte 16) n-parameters
))
443 (describe-prepared-message socket name
)
444 (flush-message socket
)
445 (force-output socket
)
447 ;; ParameterDescription
448 (#\t (setf n-parameters
(read-uint2 socket
))
449 (skip-bytes socket
(* 4 n-parameters
))))
452 (#\T
(setf row-description
(read-field-descriptions socket
)))
455 (unless (= (length parameters
) n-parameters
)
456 (error 'database-error
457 :message
(format nil
"Incorrect number of parameters given for prepared statement ~A." name
)))
458 (bind-message socket name
(map 'vector
'field-binary-p row-description
)
460 (simple-execute-message socket
)
461 (sync-message socket
)
463 (force-output socket
)
467 (returning-effected-rows
469 (funcall row-reader socket row-description
)
470 (look-for-row socket
))
473 (#\C
(read-str socket
)
475 (#\Z
(read-uint1 socket
))))
476 ;; ReadyForQuery, skipping transaction status
477 (#\Z
(read-uint1 socket
)))))))
479 (defun build-row-reader (function-form fields body
)
480 "Helper for the following two macros."
481 (let ((socket (gensym)))
482 `(,@function-form
(,socket
,fields
)
483 (declare (type stream
,socket
)
484 (type (simple-array field-description
) ,fields
))
486 (look-for-row ,socket
))
488 (declare (type field-description field
))
489 (let ((size (read-int4 ,socket
)))
490 (declare (type (signed-byte 32) size
))
493 (funcall (field-interpreter field
)
497 (defmacro row-reader
((fields) &body body
)
498 "Create a row-reader, using the given name for the fields argument
499 and the given body for reading the rows. A row reader is a function
500 that is used to do something with the results of a query. It has two
501 local functions: next-row and next-field, the first should be called
502 once per row and will return a boolean indicating whether there are
503 any more rows, the second should be called once for every element in
504 the fields vector, with that field as argument, to read a single value
505 in a row. See list-row-reader in public.lisp for an example."
506 (build-row-reader '(lambda) fields body
))
508 (defmacro def-row-reader
(name (fields) &body body
)
509 "Create a row reader, as in the row-reader macro, and assign a name
511 (build-row-reader `(defun ,name
) fields body
))