use oid constants
[postmodern.git] / cl-postgres / protocol.lisp
blob773e3344644b8cab20c565ad9753ec708ae2ff6f
1 (in-package :cl-postgres)
3 ;; For more information about the PostgreSQL scocket protocol, see
4 ;; http://www.postgresql.org/docs/current/interactive/protocol.html
6 (define-condition protocol-error (error)
7 ((message :initarg :message))
8 (:report (lambda (err stream)
9 (format stream "PostgreSQL protocol error: ~A"
10 (slot-value err 'message))))
11 (:documentation "This is raised if something really unexpected
12 happens in the communcation with the server. Should only happen in
13 case of a bug or a connection to something that is not a \(supported)
14 PostgreSQL server at all."))
16 (defmacro message-case (socket &body clauses)
17 "Helper macro for reading messages from the server. A list of cases
18 \(characters that identify the message) can be given, each with a body
19 that handles the message, or the keyword :skip to skip the message.
20 Cases for error and warning messages are always added.
22 The body may contain an initial parameter of the form :LENGTH-SYM SYMBOL
23 where SYMBOL is a symbol to which the remaining length of the packet is
24 bound. This value indicates the number of bytes that have to be read
25 from the socket."
26 (let ((socket-name (gensym))
27 (size-name (gensym))
28 (char-name (gensym))
29 (iter-name (gensym))
30 (t-found nil)
31 (size-sym (and (eq (car clauses) :length-sym) (progn (pop clauses) (pop clauses)))))
32 (flet ((expand-characters (chars)
33 (cond ((eq chars t) (setf t-found t) t)
34 ((consp chars) (mapcar #'char-code chars))
35 (t (char-code chars)))))
36 `(let* ((,socket-name ,socket))
37 (declare (type stream ,socket-name))
38 (labels ((,iter-name ()
39 (let ((,char-name (read-uint1 ,socket-name))
40 (,size-name (read-uint4 ,socket-name)))
41 (declare (type (unsigned-byte 8) ,char-name)
42 (type (unsigned-byte 32) ,size-name)
43 (ignorable ,size-name))
44 (case ,char-name
45 (#.(char-code #\A)
46 (get-notification ,socket-name)
47 (,iter-name))
48 (#.(char-code #\E) (get-error ,socket-name))
49 (#.(char-code #\S) ;; ParameterStatus: read and continue
50 (update-parameter ,socket-name)
51 (,iter-name))
52 (#.(char-code #\N) ;; A warning
53 (get-warning ,socket-name)
54 (,iter-name))
55 ,@(mapcar (lambda (clause)
56 `(,(expand-characters (first clause))
57 ,(if (eq (second clause) :skip)
58 `(skip-bytes ,socket-name (- ,size-name 4))
59 (if size-sym
60 `(let ((,size-sym (- ,size-name 4)))
61 ,@(cdr clause))
62 `(progn ,@(cdr clause))))))
63 clauses)
64 ,@(unless t-found
65 `((t (ensure-socket-is-closed ,socket-name)
66 (error 'protocol-error
67 :message (format nil "Unexpected message received: ~A"
68 (code-char ,char-name))))))))))
69 (,iter-name))))))
71 (defparameter *connection-params* nil
72 "Bound to the current connection's parameter table when executing
73 a query.")
75 (defun update-parameter (socket)
76 (let ((name (read-str socket))
77 (value (read-str socket)))
78 (setf (gethash name *connection-params*) value)))
80 (defun read-byte-delimited (socket)
81 "Read the fields of a null-terminated list of byte + string values
82 and put them in an alist."
83 (loop :for type = (read-uint1 socket)
84 :until (zerop type)
85 :collect (cons (code-char type) (read-simple-str socket))))
87 (define-condition postgresql-notification (simple-warning)
88 ((pid :initarg :pid :accessor postgresql-notification-pid)
89 (channel :initarg :channel :accessor postgresql-notification-channel)
90 (payload :initarg :payload :accessor postgresql-notification-payload)))
92 (defun get-notification (socket)
93 "Read an asynchronous notification message from the socket and
94 signal a condition for it."
95 (let ((pid (read-int4 socket))
96 (channel (read-str socket))
97 (payload (read-str socket)))
98 (warn 'postgresql-notification
99 :pid pid
100 :channel channel
101 :payload payload
102 :format-control "Asynchronous notification ~S~@[ (payload: ~S)~] received from ~
103 server process with PID ~D."
104 :format-arguments (list channel payload pid))))
106 (defun get-error (socket)
107 "Read an error message from the socket and raise the corresponding
108 database-error condition."
109 (let ((data (read-byte-delimited socket)))
110 (flet ((get-field (char)
111 (cdr (assoc char data))))
112 (let ((code (get-field #\C)))
113 ;; These are the errors "ADMIN SHUTDOWN" and "CRASH SHUTDOWN",
114 ;; in which case the server will close the connection right
115 ;; away.
116 (when (or (string= code "57P01") (string= code "57P02"))
117 (ensure-socket-is-closed socket))
118 (error (cl-postgres-error::get-error-type code)
119 :code code
120 :message (get-field #\M)
121 :detail (get-field #\D)
122 :hint (get-field #\H)
123 :context (get-field #\W)
124 :position (let ((position (get-field #\p)))
125 (when position (parse-integer position))))))))
127 (define-condition postgresql-warning (simple-warning)
130 (defun get-warning (socket)
131 "Read a warning from the socket and emit it."
132 (let ((data (read-byte-delimited socket)))
133 (flet ((get-field (char)
134 (cdr (assoc char data))))
135 (warn 'postgresql-warning
136 :format-control "PostgreSQL warning: ~A~@[~%~A~]"
137 :format-arguments (list (get-field #\M) (or (get-field #\D) (get-field #\H)))))))
139 (defparameter *ssl-certificate-file* nil
140 "When set to a filename, this file will be used as client
141 certificate for SSL connections.")
142 (defparameter *ssl-key-file* nil
143 "When set to a filename, this file will be used as client key for
144 SSL connections.")
146 ;; The let is used to remember that we have found the
147 ;; cl+ssl:make-ssl-client-stream function before.
148 (let ((make-ssl-stream nil))
149 (defun initiate-ssl (socket required)
150 "Initiate SSL handshake with the PostgreSQL server, and wrap the
151 socket in an SSL stream. When require is true, an error will be raised
152 when the server does not support SSL."
153 (unless make-ssl-stream
154 (unless (find-package :cl+ssl)
155 (error 'database-error :message "CL+SSL is not loaded. Load it to enable SSL."))
156 (setf make-ssl-stream (intern (string '#:make-ssl-client-stream) :cl+ssl)))
157 (ssl-request-message socket)
158 (force-output socket)
159 (ecase (read-byte socket)
160 (#.(char-code #\S)
161 (setf socket (funcall make-ssl-stream socket :key *ssl-key-file*
162 :certificate *ssl-certificate-file*)))
163 (#.(char-code #\N)
164 (when required
165 (error 'database-error :message "Server does not support SSL encryption."))))))
167 (defun authenticate (socket conn)
168 "Try to initiate a connection. Caller should close the socket if
169 this raises a condition."
171 (let ((gss-context nil)
172 (gss-init-function nil)
173 (user (connection-user conn))
174 (password (connection-password conn))
175 (database (connection-db conn))
176 (use-ssl (connection-use-ssl conn)))
178 (unless (eq use-ssl :no)
179 (setf socket (initiate-ssl socket (eq use-ssl :yes))))
180 (startup-message socket user database)
181 (force-output socket)
183 (labels ((init-gss-msg (in-buffer)
184 (when (null gss-init-function)
185 (when (null (find-package "CL-GSS"))
186 (error 'database-error :message "To use GSS authentication, make sure the CL-GSS package is loaded."))
187 (setq gss-init-function (find-symbol "INIT-SEC" "CL-GSS"))
188 (unless gss-init-function
189 (error 'database-error :message "INIT-SEC not found in CL-GSS package")))
190 (multiple-value-bind (continue-needed context buffer flags)
191 (funcall gss-init-function
192 (format nil "~a@~a" (connection-service conn) (connection-host conn))
193 :flags '(:mutual)
194 :context gss-context
195 :input-token in-buffer)
196 (declare (ignore flags))
197 (setq gss-context context)
198 (when buffer
199 (gss-auth-buffer-message socket buffer))
200 (force-output socket)
201 continue-needed)))
203 (loop
204 (message-case socket :length-sym size
205 ;; Authentication message
206 (#\R (let ((type (read-uint4 socket)))
207 (ecase type
208 (0 (return))
209 (2 (error 'database-error :message "Unsupported Kerberos authentication requested."))
210 (3 (unless password (error "Server requested plain-password authentication, but no password was given."))
211 (plain-password-message socket password)
212 (force-output socket))
213 (4 (error 'database-error :message "Unsupported crypt authentication requested."))
214 (5 (unless password (error "Server requested md5-password authentication, but no password was given."))
215 (md5-password-message socket password user (read-bytes socket 4))
216 (force-output socket))
217 (6 (error 'database-error :message "Unsupported SCM authentication requested."))
218 (7 (when gss-context
219 (error 'database-error :message "Got GSS init message when a context was already established"))
220 (init-gss-msg nil))
221 (8 (unless gss-context
222 (error 'database-error :message "Got GSS continuation message without a context"))
223 (init-gss-msg (read-bytes socket (- size 4)))))))))))
224 (loop
225 (message-case socket
226 ;; BackendKeyData - ignore
227 (#\K :skip)
228 ;; ReadyForQuery
229 (#\Z (read-uint1 socket)
230 (return))))
231 socket)
233 (defclass field-description ()
234 ((name :initarg :name :accessor field-name)
235 (type-id :initarg :type-id :accessor field-type)
236 (interpreter :initarg :interpreter :accessor field-interpreter)
237 (receive-binary-p :initarg :receive-binary-p :reader field-binary-p))
238 (:documentation "Description of a field in a query result."))
240 (defun read-field-descriptions (socket)
241 "Read the field descriptions for a query result and put them into an
242 array of field-description objects."
243 (declare (type stream socket)
244 #.*optimize*)
245 (let* ((number (read-uint2 socket))
246 (descriptions (make-array number)))
247 (declare (type fixnum number)
248 (type (simple-array field-description) descriptions))
249 (dotimes (i number)
250 (let* ((name (read-str socket))
251 (table-oid (read-uint4 socket))
252 (column (read-uint2 socket))
253 (type-id (read-uint4 socket))
254 (size (read-uint2 socket))
255 (type-modifier (read-uint4 socket))
256 (format (read-uint2 socket))
257 (interpreter (get-type-interpreter type-id)))
258 (declare (ignore table-oid column size type-modifier format)
259 (type string name)
260 (type (unsigned-byte 32) type-id))
261 (setf (elt descriptions i)
262 (if (interpreter-binary-p interpreter)
263 (make-instance 'field-description :name name :type-id type-id
264 :interpreter (type-interpreter-binary-reader interpreter)
265 :receive-binary-p t)
266 (make-instance 'field-description :name name :type-id type-id
267 :interpreter (type-interpreter-text-reader interpreter)
268 :receive-binary-p nil)))))
269 descriptions))
271 (defun terminate-connection (socket)
272 "Close a connection, notifying the server."
273 (terminate-message socket)
274 (close socket))
276 ;; This is a hacky way to communicate the amount of effected rows up
277 ;; from look-for-row to the send-execute or send-query that (directly
278 ;; or indirectly) called it.
279 (defparameter *effected-rows* nil)
281 (defun look-for-row (socket)
282 "Read server messages until either a new row can be read, or there
283 are no more results. Return a boolean indicating whether any more
284 results are available, and, if available, stores the amount of
285 effected rows in *effected-rows*. Also handle getting out of
286 copy-in/copy-out states \(which are not supported)."
287 (declare (type stream socket)
288 #.*optimize*)
289 (loop
290 (message-case socket
291 ;; CommandComplete
292 (#\C (let* ((command-tag (read-str socket))
293 (space (position #\Space command-tag :from-end t)))
294 (when space
295 (setf *effected-rows* (parse-integer command-tag :junk-allowed t
296 :start (1+ space))))
297 (return-from look-for-row nil)))
298 ;; CopyInResponse
299 (#\G (read-uint1 socket)
300 (skip-bytes socket (* 2 (read-uint2 socket))) ;; The field formats
301 (copy-done-message socket)
302 (error 'database-error :message "Copy-in not supported."))
303 ;; CopyOutResponse
304 (#\H (read-uint1 socket)
305 (skip-bytes socket (* 2 (read-uint2 socket))) ;; The field formats
306 (error 'database-error :message "Copy-out not supported."))
307 ;; DataRow
308 (#\D (skip-bytes socket 2)
309 (return-from look-for-row t))
310 ;; EmptyQueryResponse
311 (#\I (warn "Empty query sent.")
312 (return-from look-for-row nil)))))
314 (defun try-to-sync (socket sync-sent)
315 "Try to re-synchronize a connection by sending a sync message if it
316 hasn't already been sent, and then looking for a ReadyForQuery
317 message."
318 (when (open-stream-p socket)
319 (let ((ok nil))
320 (unwind-protect
321 (progn
322 (unless sync-sent
323 (sync-message socket)
324 (force-output socket))
325 ;; TODO initiate timeout on the socket read, signal timeout error
326 (loop :while (and (not ok) (open-stream-p socket))
327 :do (message-case socket
328 (#\Z (read-uint1 socket)
329 (setf ok t))
330 (t :skip))))
331 (unless ok
332 ;; if we can't sync, make sure the socket is shot
333 ;; (e.g. a timeout, or aborting execution with a restart from sldb)
334 (ensure-socket-is-closed socket :abort t))))))
336 (defmacro with-syncing (&body body)
337 "Macro to wrap a block in a handler that will try to re-sync the
338 connection if something in the block raises a condition. Not hygienic
339 at all, only used right below here."
340 `(let ((sync-sent nil)
341 (ok nil))
342 (handler-case
343 (unwind-protect
344 (multiple-value-prog1
345 (progn ,@body)
346 (setf ok t))
347 (unless ok
348 (try-to-sync socket sync-sent)))
349 (end-of-file (c)
350 (ensure-socket-is-closed socket :abort t)
351 (error c)))))
353 (defmacro returning-effected-rows (value &body body)
354 "Computes a value, then runs a body, then returns, as multiple
355 values, that value and the amount of effected rows, if any (see
356 *effected rows*)."
357 (let ((value-name (gensym)))
358 `(let* ((*effected-rows* nil)
359 (,value-name ,value))
360 ,@body
361 (if *effected-rows*
362 (values ,value-name *effected-rows*)
363 ,value-name))))
365 (defun send-query (socket query row-reader)
366 "Send a query to the server, and apply the given row-reader to the
367 results."
368 (declare (type stream socket)
369 (type string query)
370 #.*optimize*)
371 (with-syncing
372 (with-query (query)
373 (let ((row-description nil))
374 (simple-parse-message socket query)
375 (simple-describe-message socket)
376 (flush-message socket)
377 (force-output socket)
378 (message-case socket
379 ;; ParseComplete
380 (#\1))
381 (message-case socket
382 ;; ParameterDescription
383 (#\t :skip))
384 (message-case socket
385 ;; RowDescription
386 (#\T (setf row-description (read-field-descriptions socket)))
387 ;; NoData
388 (#\n))
389 (simple-bind-message socket (map 'vector 'field-binary-p row-description))
390 (simple-execute-message socket)
391 (sync-message socket)
392 (setf sync-sent t)
393 (force-output socket)
394 (message-case socket
395 ;; BindComplete
396 (#\2))
397 (returning-effected-rows
398 (if row-description
399 (funcall row-reader socket row-description)
400 (look-for-row socket))
401 (message-case socket
402 ;; ReadyForQuery, skipping transaction status
403 (#\Z (read-uint1 socket))))))))
405 (defun send-parse (socket name query)
406 "Send a parse command to the server, giving it a name."
407 (declare (type stream socket)
408 (type string name query)
409 #.*optimize*)
410 (with-syncing
411 (with-query (query)
412 (parse-message socket name query)
413 (flush-message socket)
414 (force-output socket)
415 (message-case socket
416 ;; ParseComplete
417 (#\1)))))
419 (defun send-close (socket name)
420 "Send a close command to the server, giving it a name."
421 (declare (type stream socket)
422 (type string name)
423 #.*optimize*)
424 (with-syncing
425 (close-prepared-message socket name)
426 (flush-message socket)
427 (force-output socket)
428 (message-case socket
429 ;; CloseComplete
430 (#\3))))
432 (defun send-execute (socket name parameters row-reader)
433 "Execute a previously parsed query, and apply the given row-reader
434 to the result."
435 (declare (type stream socket)
436 (type string name)
437 (type list parameters)
438 #.*optimize*)
439 (with-syncing
440 (let ((row-description nil)
441 (n-parameters 0))
442 (declare (type (unsigned-byte 16) n-parameters))
443 (describe-prepared-message socket name)
444 (flush-message socket)
445 (force-output socket)
446 (message-case socket
447 ;; ParameterDescription
448 (#\t (setf n-parameters (read-uint2 socket))
449 (skip-bytes socket (* 4 n-parameters))))
450 (message-case socket
451 ;; RowDescription
452 (#\T (setf row-description (read-field-descriptions socket)))
453 ;; NoData
454 (#\n))
455 (unless (= (length parameters) n-parameters)
456 (error 'database-error
457 :message (format nil "Incorrect number of parameters given for prepared statement ~A." name)))
458 (bind-message socket name (map 'vector 'field-binary-p row-description)
459 parameters)
460 (simple-execute-message socket)
461 (sync-message socket)
462 (setf sync-sent t)
463 (force-output socket)
464 (message-case socket
465 ;; BindComplete
466 (#\2))
467 (returning-effected-rows
468 (if row-description
469 (funcall row-reader socket row-description)
470 (look-for-row socket))
471 (message-case socket
472 ;; CommandComplete
473 (#\C (read-str socket)
474 (message-case socket
475 (#\Z (read-uint1 socket))))
476 ;; ReadyForQuery, skipping transaction status
477 (#\Z (read-uint1 socket)))))))
479 (defun build-row-reader (function-form fields body)
480 "Helper for the following two macros."
481 (let ((socket (gensym)))
482 `(,@function-form (,socket ,fields)
483 (declare (type stream ,socket)
484 (type (simple-array field-description) ,fields))
485 (flet ((next-row ()
486 (look-for-row ,socket))
487 (next-field (field)
488 (declare (type field-description field))
489 (let ((size (read-int4 ,socket)))
490 (declare (type (signed-byte 32) size))
491 (if (eq size -1)
492 :null
493 (funcall (field-interpreter field)
494 ,socket size)))))
495 ,@body))))
497 (defmacro row-reader ((fields) &body body)
498 "Create a row-reader, using the given name for the fields argument
499 and the given body for reading the rows. A row reader is a function
500 that is used to do something with the results of a query. It has two
501 local functions: next-row and next-field, the first should be called
502 once per row and will return a boolean indicating whether there are
503 any more rows, the second should be called once for every element in
504 the fields vector, with that field as argument, to read a single value
505 in a row. See list-row-reader in public.lisp for an example."
506 (build-row-reader '(lambda) fields body))
508 (defmacro def-row-reader (name (fields) &body body)
509 "Create a row reader, as in the row-reader macro, and assign a name
510 to it."
511 (build-row-reader `(defun ,name) fields body))