Add support for karma threshold.
[lw2-viewer.git] / src / lmdb.lisp
blob2fb956b82e129e9f15f536cb73c72bf2884074cf
1 (uiop:define-package #:lw2.lmdb
2 (:use #:cl #:sb-ext #:sb-thread #:alexandria #:iterate #:lw2.raw-memory-streams #:lw2.conditions #:lw2.sites #:lw2.context #:lw2.backend-modules #:lw2-viewer.config #:lw2.hash-utils)
3 (:import-from #:lw2.rwlock #:rwlock #:make-rwlock #:with-read-lock #:with-write-lock #:with-rwlock-protect)
4 (:export
5 #:close-unused-environments
6 #:define-cache-database #:with-cache-mutex #:with-cache-transaction #:with-cache-readonly-transaction
7 #:cache-put #:cache-get #:cache-exists #:cache-del
8 #:count-database-entries #:truncate-database
9 #:call-with-cursor #:cursor-get
10 #:existence
11 #:binary-stream
12 #:simple-cacheable #:define-lmdb-memoized #:current-memo-hash #:*memoized-output-stream* #:*memoized-output-without-hyphens*)
13 (:unintern #:lmdb-clear-db #:lmdb-put-string #:*db-mutex* #:*cache-environment-databases-list* #:*db-environments-lock*))
15 (in-package #:lw2.lmdb)
17 (defglobal *cache-databases-epoch* 0)
19 (defglobal *db-environments-rwlock* (make-rwlock))
21 (defglobal *db-environments* nil)
23 (defglobal *environments-sites* nil)
25 (defun define-cache-database (class-name &rest names)
26 (with-write-lock (*db-environments-rwlock*)
27 (let* ((class (find-class class-name))
28 (old-list (class-own-databases class))
29 (new-list (union old-list names :test #'string= :key (lambda (x) (if (atom x) x (first x))))))
30 (unless (equal old-list new-list)
31 (incf *cache-databases-epoch*)
32 (setf (class-own-databases class) new-list)))))
34 (defmethod class-databases ((class t)) nil)
36 (defmethod class-databases ((class backend-class))
37 (if (eq *cache-databases-epoch* (class-databases-epoch class))
38 (class-cached-databases class)
39 (let ((new-list (append (class-own-databases class)
40 (loop for superclass in (closer-mop:class-direct-superclasses class)
41 append (class-databases superclass)))))
42 (setf (class-cached-databases class) new-list
43 (class-databases-epoch class) *cache-databases-epoch*)
44 new-list)))
46 (defun backend-databases (backend)
47 (class-databases (class-of backend)))
49 (defstruct environment-container
50 (rwlock nil :type rwlock)
51 (environment nil :type lmdb:environment)
52 (open-databases (make-hash-table :test 'equal) :type hash-table)
53 (databases-list nil))
55 (defun call-with-environment-transaction (fn environment &key read-only)
56 (if lmdb:*transaction*
57 (funcall fn)
58 (let ((txn (lmdb:make-transaction environment :flags (if read-only liblmdb:+rdonly+ 0))))
59 (unwind-protect
60 (progn
61 (lmdb:begin-transaction txn)
62 (let ((lmdb:*transaction* txn))
63 (multiple-value-prog1
64 (funcall fn)
65 (lmdb:commit-transaction txn)
66 (setf txn nil))))
67 (when txn (lmdb:abort-transaction txn))))))
69 (defmacro with-environment-transaction ((environment) &body body)
70 `(call-with-environment-transaction (lambda () ,@body) ,environment))
72 (defun close-environment (environment open-databases)
73 (with-environment-transaction (environment)
74 (maphash (lambda (k v)
75 (declare (ignore k))
76 (lmdb:close-database v :transaction lmdb:*transaction*))
77 open-databases))
78 (lmdb:close-environment environment))
80 (defun prepare-environment (environment-container backend)
81 (let ((environment (environment-container-environment environment-container))
82 (open-databases (environment-container-open-databases environment-container)))
83 (assert (not lmdb:*transaction*) () "The transaction in which a database is created must be closed before that database may be used in another thread.")
84 (with-environment-transaction (environment)
85 (dolist (db-args (backend-databases backend))
86 (destructuring-bind (db-name &key (flags 0)) (ensure-list db-args)
87 (unless (gethash db-name open-databases)
88 (let ((db (lmdb:make-database db-name :flags flags)))
89 (lmdb:open-database db :create t)
90 (setf (gethash db-name open-databases) db))))))
91 (setf (environment-container-databases-list environment-container) (backend-databases backend))))
93 (defun find-environment-with-path (path environment-list)
94 (find-if
95 (lambda (env) (string= path (lmdb:environment-directory (environment-container-environment env))))
96 environment-list))
98 (defun find-site-with-environment-path (path site-list)
99 (find path site-list
100 :test #'string=
101 :key (lambda (site) (backend-cache-db-path (site-backend site)))))
103 (defun close-unused-environments ()
104 (with-write-lock (*db-environments-rwlock*)
105 (let ((old-environments *db-environments*))
106 (setf *db-environments* nil)
107 (dolist (env old-environments)
108 (if (find-site-with-environment-path (lmdb:environment-directory (environment-container-environment env)) *sites*)
109 (push env *db-environments*)
110 (progn
111 (with-write-lock ((environment-container-rwlock env))
112 (close-environment (environment-container-environment env) (environment-container-open-databases env)))))))))
114 (define-backend-function get-current-environment ())
116 (define-backend-operation get-current-environment backend-lmdb-cache ()
117 (with-rwlock-protect *db-environments-rwlock*
118 (and (backend-lmdb-environment backend) (eq *sites* *environments-sites*)
119 (eq (backend-databases backend) (environment-container-databases-list (backend-lmdb-environment backend))))
120 (progn
121 (setf *environments-sites* *sites*)
122 (let ((lmdb-cache-sites (remove-if (lambda (x) (not (typep (site-backend x) 'backend-lmdb-cache)))
123 *environments-sites*)))
124 (uiop:ensure-all-directories-exist (map 'list
125 (lambda (site)
126 (backend-cache-db-path (site-backend site)))
127 lmdb-cache-sites))
128 (dolist (site lmdb-cache-sites)
129 (if-let (existing-environment (find-environment-with-path (backend-cache-db-path (site-backend site)) *db-environments*))
130 (progn
131 (setf (backend-lmdb-environment (site-backend site)) existing-environment)
132 (prepare-environment existing-environment (site-backend site)))
133 (let ((new-environment
134 (make-environment-container
135 :rwlock (make-rwlock)
136 :environment (lmdb:make-environment (backend-cache-db-path (site-backend site))
137 :max-databases 1024 :max-readers 512 :open-flags 0 :mapsize *lmdb-mapsize*))))
138 (lmdb:open-environment (environment-container-environment new-environment) :create t)
139 (prepare-environment new-environment (site-backend site))
140 (setf (backend-lmdb-environment (site-backend site)) new-environment)
141 (push new-environment *db-environments*))))))
142 (backend-lmdb-environment backend)))
144 (uiop:chdir (asdf:system-source-directory "lw2-viewer"))
146 (defun get-open-database (db-name)
147 (let ((env (get-current-environment)))
148 (with-read-lock (*db-environments-rwlock* :upgrade-fn upgrade-lock)
149 (unless (eq (backend-databases *current-backend*) (environment-container-databases-list env))
150 (upgrade-lock)
151 (prepare-environment env *current-backend*))
152 (or (gethash db-name (environment-container-open-databases env))
153 (error "The database '~A' is not defined." db-name)))))
155 (defun call-with-cache-transaction (fn &key read-only)
156 (if lmdb:*transaction*
157 (funcall fn)
158 (let ((env (get-current-environment)))
159 (with-read-lock ((environment-container-rwlock env))
160 (call-with-environment-transaction fn (environment-container-environment env) :read-only read-only)))))
162 (defmacro with-cache-transaction (&body body)
163 `(call-with-cache-transaction (lambda () ,@body)))
165 (defmacro with-cache-readonly-transaction (&body body)
166 `(call-with-cache-transaction (lambda () ,@body) :read-only t))
168 (defmacro with-db ((db db-name &key read-only) &body body)
169 `(let ((,db (get-open-database ,db-name)))
170 (call-with-cache-transaction
171 (lambda () ,@body)
172 :read-only ,read-only)))
174 (eval-when (:compile-toplevel :load-toplevel :execute)
175 (defparameter *type-accessors*
176 '((:byte-vector (vector (unsigned-byte 8)) x x :byte-vector)
177 (:string string (and x (string-to-octets x :external-format :utf-8)) x :string)
178 (:json t
179 (and x (string-to-octets (json:encode-json-to-string x) :external-format :utf-8))
180 (and x (json:decode-json (flex:make-flexi-stream x :external-format :utf-8))) 'binary-stream)
181 (:lisp t
182 (and x (string-to-octets (write-to-string x :pretty nil :readably t :circle nil) :external-format :utf-8))
183 (and x (read (flex:make-flexi-stream x :external-format :utf-8))) 'binary-stream)))
185 (defun map-type-accessors (fn)
186 (map 'list (lambda (accessor) (apply fn accessor)) *type-accessors*))
188 (defun type-accessor-fn (form)
189 (coerce
190 `(lambda (key type encoder decoder lmdb-type)
191 (declare (ignorable key type encoder decoder lmdb-type))
192 ,form)
193 'function)))
195 (defmacro type-accessor-case (type form)
196 `(case ,type
197 ,@(map-type-accessors (type-accessor-fn ``(,key ,,form)))))
199 (defun encode-value (x type)
200 (type-accessor-case type encoder))
202 (defun accessor-decoder (return-type value-type)
203 (if return-type (values return-type #'identity) (type-accessor-case value-type `(values ,lmdb-type (lambda (x) ,decoder)))))
205 (defun cache-put (db-name key value &key (key-type :string) (value-type :string))
206 (with-db (db db-name)
207 (if (lmdb:put db
208 (encode-value key key-type)
209 (encode-value value value-type))
210 value
211 nil)))
213 (defun cache-get (db-name key &key (key-type :string) (value-type :string) return-type)
214 (multiple-value-bind (lmdb-type decoder) (accessor-decoder return-type value-type)
215 (with-db (db db-name :read-only t)
216 (funcall decoder (lmdb:get db (encode-value key key-type) :return-type lmdb-type)))))
218 (defun cache-exists (db-name key &key (key-type :string))
219 (with-db (db db-name :read-only t)
220 (lmdb:get db (encode-value key key-type) :return-type 'existence)))
222 (defun cache-del (db-name key &key value (key-type :string) (value-type :string))
223 (with-db (db db-name)
224 (lmdb:del db
225 (encode-value key key-type)
226 (and value (encode-value value value-type)))))
228 (defun count-database-entries (db-name)
229 (with-db (db db-name :read-only t)
230 (getf (lmdb:database-statistics db)
231 :entries)))
233 (defun truncate-database (db-name)
234 (with-db (db db-name)
235 (lmdb:drop-database db :delete 0)))
237 (defun call-with-cursor (db-name fn &key read-only)
238 (with-db (db db-name :read-only read-only)
239 (let ((cursor (lmdb:make-cursor db)))
240 (lmdb:with-cursor (cursor)
241 (funcall fn db cursor)))))
243 (defun cursor-get (cursor operation &key key value (key-type :string) (value-type :string) return-type)
244 (multiple-value-bind (key-lmdb-type key-decoder) (accessor-decoder return-type key-type)
245 (multiple-value-bind (value-lmdb-type value-decoder) (accessor-decoder return-type value-type)
246 (multiple-value-bind (return-value return-key)
247 (lmdb:cursor-get cursor operation (encode-value key key-type) (encode-value value value-type)
248 :return-type value-lmdb-type :key-return-type key-lmdb-type)
249 (values (funcall value-decoder return-value)
250 (funcall key-decoder return-key))))))
252 (defun truncate-database-nicely (db-name)
253 (iter
254 (until
255 (call-with-cursor
256 db-name
257 (lambda (db cursor)
258 (declare (ignore db))
259 (iter
260 (for limit from 100 above 0)
261 (if (cursor-get cursor :first :return-type 'existence)
262 (lmdb:cursor-del cursor)
263 (leave t))))))))
265 (defun existence (array size)
266 (declare (ignore array size))
269 (defun binary-stream (array size)
270 (make-instance 'raw-memory-stream :pointer array :length size))
272 (defun make-simple-cache (cache-db)
273 (lambda (key value) (cache-put cache-db key value)))
275 (defun wrap-handler (fn)
276 (lambda (key)
277 (handler-case
278 (funcall fn key)
279 (t () "[Error communicating with LW2 server]"))))
281 (defun make-simple-get (cache-db cache-fn get-real-fn get-wrapper-fn)
282 (lambda (key)
283 (labels ((inner (key)
284 (let ((val (cache-get cache-db key)))
285 (if val val
286 (let ((data (funcall get-real-fn key)))
287 (assert data)
288 (funcall cache-fn key data))))))
289 (if get-wrapper-fn
290 (funcall get-wrapper-fn key #'inner)
291 (inner key)))))
293 (defmacro simple-cacheable ((base-name class-name cache-db key &key (catch-errors t) get-wrapper) &body body)
294 (let ((get-real (intern (format nil "~:@(get-~A-real~)" base-name)))
295 (cache (intern (format nil "~:@(cache-~A~)" base-name)))
296 (get (intern (format nil "~:@(get-~A~)" base-name))))
297 `(progn
298 (define-cache-database ,class-name ,cache-db)
299 (declaim (ftype (function (string) string) ,get-real ,get)
300 (ftype (function (string string) string) ,cache))
301 (setf (fdefinition (quote ,get-real)) (lambda (,key) ,@body)
302 (fdefinition (quote ,cache)) (make-simple-cache ,cache-db)
303 (fdefinition (quote ,get)) (,(if catch-errors 'wrap-handler 'identity)
304 (make-simple-get ,cache-db (fdefinition (quote ,cache)) (fdefinition (quote ,get-real)) ,get-wrapper))))))
306 (defvar *memoized-output-stream*)
307 (defvar *memoized-output-without-hyphens*) ;todo there's probably a better way to do this...
308 (defparameter *memoized-output-dynamic-blocks* nil)
310 (defun write-memoized-data (array size)
311 ;; This is unsafe anyway thanks to mem-aref, and it's pretty speed-critical
312 (declare (optimize (safety 0) (debug 0))
313 (type (and fixnum (integer 0)) size)
314 (type cffi:foreign-pointer array))
315 (let ((out-stream *memoized-output-stream*)
316 (dynamic-blocks *memoized-output-dynamic-blocks*)
317 (index 0)
318 (buffer (make-array 2048 :element-type '(unsigned-byte 8) :initial-element 0))
319 (buffer-index 0))
320 (declare (dynamic-extent buffer)
321 (type (and fixnum (integer 0)) index)
322 (type (integer 0 2048) buffer-index))
323 (labels ((flush-buffer ()
324 (when (> buffer-index 0)
325 (write-sequence buffer out-stream :end buffer-index)
326 (setf buffer-index 0)))
327 (output-byte (byte)
328 (when (= buffer-index 2048)
329 (flush-buffer))
330 (setf (aref buffer buffer-index) byte)
331 (incf buffer-index))
332 (process-span (start end)
333 (declare (type (and fixnum (integer 0)) start end))
334 (if *memoized-output-without-hyphens*
335 ;; Filter out soft hyphens while writing to the output stream.
336 ;; Thanks to UTF-8's prefix-free property, we don't need to decode characters to
337 ;; do this, just search for the soft-hyphen byte sequence.
338 (let ((hyphen-bytes (load-time-value (string-to-octets (string #\SOFT_HYPHEN) :external-format :utf-8)))
339 (hi 0))
340 (declare (type (unsigned-byte 3) hi))
341 (iter (for i from start below end)
342 (let ((in-byte (cffi:mem-aref array :unsigned-char i)))
343 (if (= in-byte (aref hyphen-bytes hi))
344 (if (= (1+ hi) (length hyphen-bytes))
345 (setf hi 0)
346 (incf hi))
347 (progn
348 (when (/= hi 0)
349 (iter (for i from 0 below hi) (output-byte (aref hyphen-bytes i)))
350 (setf hi 0))
351 (output-byte in-byte))))))
352 ;; In this case, keep the soft hyphens.
353 (iter (for i from start below end)
354 (output-byte (cffi:mem-aref array :unsigned-char i))))
355 (flush-buffer)))
356 (declare (dynamic-extent #'output-byte #'flush-buffer #'process-span))
357 (iter (for dynamic-block in dynamic-blocks)
358 (destructuring-bind (start end fn &rest args) dynamic-block
359 (process-span index start)
360 (setf index start)
361 (log-and-ignore-errors
362 (handler-case
363 (progn (apply fn args) (values))
364 (:no-error () (setf index end)))))
365 (finally
366 (process-span index size)))))
369 (declaim (ftype (function (cffi:foreign-pointer fixnum (simple-array (unsigned-byte 8) (*))) boolean) version-equal)
370 (inline version-equal))
372 (defun version-equal (array size version)
373 (declare (type (unsigned-byte 32) size)
374 (type (simple-array (unsigned-byte 8) (*)) version))
375 (if (>= size (length version))
376 (loop for i from 0 below (length version)
377 do (when (/= (aref version i)
378 (cffi:mem-aref array :unsigned-char i))
379 (return nil))
380 finally
381 (return t))))
383 (defstruct scoreboard
384 (table (make-hash-table :test 'equalp))
385 (mutex (sb-thread:make-mutex)))
387 (defstruct scorecard
388 (ready nil)
389 (mutex (sb-thread:make-mutex))
390 (waitqueue (sb-thread:make-waitqueue)))
392 (defun call-with-scorecard (scoreboard key created-fn existing-fn)
393 (declare (type (function ()) created-fn existing-fn))
394 (let ((table (scoreboard-table scoreboard))
395 scorecard
396 created)
397 (sb-thread:with-mutex ((scoreboard-mutex scoreboard))
398 (setf scorecard (gethash key table))
399 (unless scorecard
400 (setf created t
401 scorecard (make-scorecard)
402 (gethash key table) scorecard)))
403 (cond
404 (created
405 (unwind-protect
406 (funcall created-fn)
407 (setf (scorecard-ready scorecard) t)
408 (sb-thread:with-mutex ((scorecard-mutex scorecard))
409 (sb-thread:condition-broadcast (scorecard-waitqueue scorecard)))
410 (sb-thread:with-mutex ((scoreboard-mutex scoreboard))
411 (remhash key table)))
412 (funcall existing-fn))
413 (:otherwise
414 (unless (scorecard-ready scorecard)
415 (sb-thread:with-mutex ((scorecard-mutex scorecard))
416 (loop until (scorecard-ready scorecard)
417 do (or (sb-thread:condition-wait (scorecard-waitqueue scorecard) (scorecard-mutex scorecard))
418 (error "Waitqueue error.")))))
419 (funcall existing-fn)))))
421 (defmacro scorecard-protect (scoreboard key when-not-ready when-ready)
423 Protect multiple threads from simultaneous execution. When multiple threads
424 evaluate SCORECARD-PROTECT with the same SCOREBOARD and KEYs that are EQUALP,
425 only the first thread evaluates WHEN-NOT-READY, and other threads wait for it
426 to finish. Finally, all threads evaluate WHEN-READY, and its values are returned."
427 `(flet ((when-not-ready () ,when-not-ready)
428 (when-ready () ,when-ready))
429 (declare (dynamic-extent #'when-not-ready #'when-ready))
430 (call-with-scorecard ,scoreboard ,key #'when-not-ready #'when-ready)))
432 (defun make-lmdb-memoized-wrapper (db-name version scoreboard fn return-type)
433 (declare (type (simple-array (unsigned-byte 8) (*)) version))
434 (labels ((unchecked-return-type (array size)
435 (declare (type (unsigned-byte 32) size))
436 (let* ((version-size (length version))
437 (data-size (- size version-size)))
438 (ecase return-type
439 (:string (cffi:foreign-string-to-lisp array :offset (length version) :count data-size))
440 ('write-memoized-data (write-memoized-data (cffi:inc-pointer array version-size) data-size)))))
441 (cached-return-type (array size)
442 (when (version-equal array size version)
443 (unchecked-return-type array size))))
444 (lambda (&rest args)
445 (let* ((hash (hash-printable-object args))
446 (*memoized-output-dynamic-blocks* (cache-get "dynamic-content-blocks" hash :key-type :byte-vector :value-type :lisp))
447 (cached-value (cache-get db-name hash :key-type :byte-vector :return-type #'cached-return-type)))
448 (if cached-value
449 cached-value
450 (scorecard-protect scoreboard hash
451 (let* ((new-value (apply fn hash args))
452 (octets-value (concatenate '(simple-array (unsigned-byte 8) (*))
453 version
454 (string-to-octets new-value :external-format :utf-8))))
455 (with-cache-transaction
456 (cache-put db-name hash octets-value :key-type :byte-vector :value-type :byte-vector)
457 (setf *memoized-output-dynamic-blocks* (cache-get "dynamic-content-blocks" hash :key-type :byte-vector :value-type :lisp))))
458 (cache-get db-name hash :key-type :byte-vector :return-type #'unchecked-return-type)))))))
460 (defmacro define-lmdb-memoized (name class-name (&key sources) lambda &body body)
461 (let ((db-name (concatenate 'string (string-downcase (symbol-name name)) "-memo"))
462 (alt-name (intern (format nil "~A*" name)))
463 (scoreboard-name (intern (format nil "*~A-SCOREBOARD*" name)))
464 (now-hash (hash-file-list (list* "src/hash-utils.lisp" sources))))
465 (alexandria:once-only (db-name now-hash)
466 `(progn
467 (define-cache-database ,class-name ,db-name)
468 (declaim (ftype (function * string) ,name)
469 (ftype (function * (values &optional t)) ,alt-name))
470 (eval-when (:compile-toplevel :load-toplevel :execute)
471 (defglobal ,scoreboard-name (make-scoreboard)))
472 (let ((real-fn (lambda (current-memo-hash ,@lambda)
473 (declare (ignorable current-memo-hash))
474 ,@body)))
475 (setf (fdefinition (quote ,name)) (make-lmdb-memoized-wrapper ,db-name ,now-hash ,scoreboard-name real-fn :string)
476 (fdefinition (quote ,alt-name)) (make-lmdb-memoized-wrapper ,db-name ,now-hash ,scoreboard-name real-fn 'write-memoized-data)))))))