weekly release 5.0dev
[moodle.git] / lib / dml / pgsql_native_moodle_database.php
blobf7d70c040df91dff015a7dcba6aa58b5373ffcf0
1 <?php
2 // This file is part of Moodle - http://moodle.org/
3 //
4 // Moodle is free software: you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation, either version 3 of the License, or
7 // (at your option) any later version.
8 //
9 // Moodle is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
14 // You should have received a copy of the GNU General Public License
15 // along with Moodle. If not, see <http://www.gnu.org/licenses/>.
17 /**
18 * Native pgsql class representing moodle database interface.
20 * @package core_dml
21 * @copyright 2008 Petr Skoda (http://skodak.org)
22 * @license http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later
25 defined('MOODLE_INTERNAL') || die();
27 require_once(__DIR__.'/moodle_database.php');
28 require_once(__DIR__.'/moodle_read_slave_trait.php');
29 require_once(__DIR__.'/pgsql_native_moodle_recordset.php');
30 require_once(__DIR__.'/pgsql_native_moodle_temptables.php');
32 /**
33 * Native pgsql class representing moodle database interface.
35 * @package core_dml
36 * @copyright 2008 Petr Skoda (http://skodak.org)
37 * @license http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later
39 class pgsql_native_moodle_database extends moodle_database {
40 use moodle_read_slave_trait {
41 select_db_handle as read_slave_select_db_handle;
42 can_use_readonly as read_slave_can_use_readonly;
43 query_start as read_slave_query_start;
44 query_end as read_slave_query_end;
47 /** @var array $sslmodes */
48 private static $sslmodes = [
49 'disable',
50 'prefer',
51 'require',
52 'verify-full'
55 /** @var array $serverinfo cache */
56 private $serverinfo = [];
58 /** @var array $dbhcursor keep track of open cursors */
59 private $dbhcursor = [];
61 /** @var resource|PgSql\Connection|null $pgsql database resource */
62 protected $pgsql = null;
64 protected $last_error_reporting; // To handle pgsql driver default verbosity
66 /** @var bool savepoint hack for MDL-35506 - workaround for automatic transaction rollback on error */
67 protected $savepointpresent = false;
69 /** @var int Number of cursors used (for constructing a unique ID) */
70 protected $cursorcount = 0;
72 /** @var int Default number of rows to fetch at a time when using recordsets with cursors */
73 const DEFAULT_FETCH_BUFFER_SIZE = 100000;
75 /**
76 * Detects if all needed PHP stuff installed.
77 * Note: can be used before connect()
78 * @return mixed true if ok, string if something
80 public function driver_installed() {
81 if (!extension_loaded('pgsql')) {
82 return get_string('pgsqlextensionisnotpresentinphp', 'install');
84 return true;
87 /**
88 * Returns database family type - describes SQL dialect
89 * Note: can be used before connect()
90 * @return string db family name (mysql, postgres, mssql, oracle, etc.)
92 public function get_dbfamily() {
93 return 'postgres';
96 /**
97 * Returns more specific database driver type
98 * Note: can be used before connect()
99 * @return string db type mysqli, pgsql, oci, mssql, sqlsrv
101 protected function get_dbtype() {
102 return 'pgsql';
106 * Returns general database library name
107 * Note: can be used before connect()
108 * @return string db type pdo, native
110 protected function get_dblibrary() {
111 return 'native';
115 * Returns localised database type name
116 * Note: can be used before connect()
117 * @return string
119 public function get_name() {
120 return get_string('nativepgsql', 'install');
124 * Returns localised database configuration help.
125 * Note: can be used before connect()
126 * @return string
128 public function get_configuration_help() {
129 return get_string('nativepgsqlhelp', 'install');
133 * Connect to db
134 * @param string $dbhost The database host.
135 * @param string $dbuser The database username.
136 * @param string $dbpass The database username's password.
137 * @param string $dbname The name of the database being connected to.
138 * @param mixed $prefix string means moodle db prefix, false used for external databases where prefix not used
139 * @param array $dboptions driver specific options
140 * @return bool true
141 * @throws moodle_exception
142 * @throws dml_connection_exception if error
144 public function raw_connect(string $dbhost, string $dbuser, string $dbpass, string $dbname, $prefix, ?array $dboptions=null): bool {
145 if ($prefix == '' and !$this->external) {
146 //Enforce prefixes for everybody but mysql
147 throw new dml_exception('prefixcannotbeempty', $this->get_dbfamily());
150 $driverstatus = $this->driver_installed();
152 if ($driverstatus !== true) {
153 throw new dml_exception('dbdriverproblem', $driverstatus);
156 $this->store_settings($dbhost, $dbuser, $dbpass, $dbname, $prefix, $dboptions);
158 $pass = addcslashes($this->dbpass, "'\\");
160 // Unix socket connections should have lower overhead
161 if (!empty($this->dboptions['dbsocket']) and ($this->dbhost === 'localhost' or $this->dbhost === '127.0.0.1')) {
162 $connection = "user='$this->dbuser' password='$pass' dbname='$this->dbname'";
163 if (strpos($this->dboptions['dbsocket'], '/') !== false) {
164 // A directory was specified as the socket location.
165 $connection .= " host='".$this->dboptions['dbsocket']."'";
167 if (!empty($this->dboptions['dbport'])) {
168 // A port as specified, add it to the connection as it's used as part of the socket path.
169 $connection .= " port ='".$this->dboptions['dbport']."'";
171 } else {
172 $this->dboptions['dbsocket'] = '';
173 if (empty($this->dbname)) {
174 // probably old style socket connection - do not add port
175 $port = "";
176 } else if (empty($this->dboptions['dbport'])) {
177 $port = "port ='5432'";
178 } else {
179 $port = "port ='".$this->dboptions['dbport']."'";
181 $connection = "host='$this->dbhost' $port user='$this->dbuser' password='$pass' dbname='$this->dbname'";
184 if (!empty($this->dboptions['connecttimeout'])) {
185 $connection .= " connect_timeout=".$this->dboptions['connecttimeout'];
188 if (empty($this->dboptions['dbhandlesoptions'])) {
189 // ALTER USER and ALTER DATABASE are overridden by these settings.
190 $options = array('--client_encoding=utf8', '--standard_conforming_strings=on');
191 // Select schema if specified, otherwise the first one wins.
192 if (!empty($this->dboptions['dbschema'])) {
193 $options[] = "-c search_path=" . addcslashes($this->dboptions['dbschema'], "'\\");
196 $connection .= " options='" . implode(' ', $options) . "'";
199 if (isset($this->dboptions['ssl'])) {
200 $sslmode = $this->dboptions['ssl'];
201 if (!in_array($sslmode, self::$sslmodes, true)) {
202 throw new moodle_exception('validateerrorlist', 'admin', '', "'dboptions''ssl': $sslmode");
204 $connection .= " sslmode=$sslmode";
207 ob_start();
208 // It seems that pg_connect() handles some errors differently.
209 // For example, name resolution error will raise an exception, and non-existing
210 // database or wrong credentials will just return false.
211 // We need to cater for both.
212 try {
213 if (empty($this->dboptions['dbpersist'])) {
214 $this->pgsql = pg_connect($connection, PGSQL_CONNECT_FORCE_NEW);
215 } else {
216 $this->pgsql = pg_pconnect($connection, PGSQL_CONNECT_FORCE_NEW);
218 $dberr = ob_get_contents();
219 } catch (\Exception $e) {
220 $dberr = $e->getMessage();
222 ob_end_clean();
224 $status = $this->pgsql ? pg_connection_status($this->pgsql) : false;
226 if ($status === false or $status === PGSQL_CONNECTION_BAD) {
227 $this->pgsql = null;
228 throw new dml_connection_exception($dberr);
231 if (!empty($this->dboptions['dbpersist'])) {
232 // There are rare situations (such as PHP out of memory errors) when open cursors may
233 // not be closed at the end of a connection. When using persistent connections, the
234 // cursors remain open and 'get in the way' of future connections. To avoid this
235 // problem, close all cursors here.
236 $result = pg_query($this->pgsql, 'CLOSE ALL');
237 if ($result) {
238 pg_free_result($result);
242 if (!empty($this->dboptions['dbhandlesoptions'])) {
243 /* We don't trust people who just set the dbhandlesoptions, this code checks up on them.
244 * These functions do not talk to the server, they use the client library knowledge to determine state.
246 if (!empty($this->dboptions['dbschema'])) {
247 throw new dml_connection_exception('You cannot specify a schema with dbhandlesoptions, use the database to set it.');
249 if (pg_client_encoding($this->pgsql) != 'UTF8') {
250 throw new dml_connection_exception('client_encoding = UTF8 not set, it is: ' . pg_client_encoding($this->pgsql));
252 if (pg_escape_string($this->pgsql, '\\') != '\\') {
253 throw new dml_connection_exception('standard_conforming_strings = on, must be set at the database.');
257 // Connection stabilised and configured, going to instantiate the temptables controller
258 $this->temptables = new pgsql_native_moodle_temptables($this);
260 return true;
264 * Close database connection and release all resources
265 * and memory (especially circular memory references).
266 * Do NOT use connect() again, create a new instance if needed.
268 public function dispose() {
269 parent::dispose(); // Call parent dispose to write/close session and other common stuff before closing connection
270 if ($this->pgsql) {
271 pg_close($this->pgsql);
272 $this->pgsql = null;
277 * Gets db handle currently used with queries
278 * @return resource
280 protected function get_db_handle() {
281 return $this->pgsql;
285 * Sets db handle to be used with subsequent queries
286 * @param resource $dbh
287 * @return void
289 protected function set_db_handle($dbh): void {
290 $this->pgsql = $dbh;
294 * Select appropriate db handle - readwrite or readonly
295 * @param int $type type of query
296 * @param string $sql
297 * @return void
299 protected function select_db_handle(int $type, string $sql): void {
300 $this->read_slave_select_db_handle($type, $sql);
302 if (preg_match('/^DECLARE (crs\w*) NO SCROLL CURSOR/', $sql, $match)) {
303 $cursor = $match[1];
304 $this->dbhcursor[$cursor] = $this->pgsql;
306 if (preg_match('/^(?:FETCH \d+ FROM|CLOSE) (crs\w*)\b/', $sql, $match)) {
307 $cursor = $match[1];
308 $this->pgsql = $this->dbhcursor[$cursor];
313 * Check if The query qualifies for readonly connection execution
314 * Logging queries are exempt, those are write operations that circumvent
315 * standard query_start/query_end paths.
316 * @param int $type type of query
317 * @param string $sql
318 * @return bool
320 protected function can_use_readonly(int $type, string $sql): bool {
321 // ... pg_*lock queries always go to master.
322 if (preg_match('/\bpg_\w*lock/', $sql)) {
323 return false;
326 // ... a nuisance - temptables use this.
327 if (preg_match('/\bpg_catalog/', $sql) && $this->temptables->get_temptables()) {
328 return false;
331 return $this->read_slave_can_use_readonly($type, $sql);
336 * Called before each db query.
337 * @param string $sql
338 * @param array|null $params An array of parameters.
339 * @param int $type type of query
340 * @param mixed $extrainfo driver specific extra information
341 * @return void
343 protected function query_start($sql, ?array $params, $type, $extrainfo=null) {
344 $this->read_slave_query_start($sql, $params, $type, $extrainfo);
345 // pgsql driver tends to send debug to output, we do not need that.
346 $this->last_error_reporting = error_reporting(0);
350 * Called immediately after each db query.
351 * @param mixed db specific result
352 * @return void
354 protected function query_end($result) {
355 // reset original debug level
356 error_reporting($this->last_error_reporting);
357 try {
358 $this->read_slave_query_end($result);
359 if ($this->savepointpresent &&
360 !in_array(
361 $this->last_type,
362 [SQL_QUERY_AUX, SQL_QUERY_AUX_READONLY, SQL_QUERY_SELECT],
363 true
364 )) {
365 $res = @pg_query($this->pgsql, "RELEASE SAVEPOINT moodle_pg_savepoint; SAVEPOINT moodle_pg_savepoint");
366 if ($res) {
367 pg_free_result($res);
370 } catch (Exception $e) {
371 if ($this->savepointpresent) {
372 $res = @pg_query($this->pgsql, "ROLLBACK TO SAVEPOINT moodle_pg_savepoint; SAVEPOINT moodle_pg_savepoint");
373 if ($res) {
374 pg_free_result($res);
377 throw $e;
382 * Returns database server info array
383 * @return array Array containing 'description' and 'version' info
385 public function get_server_info(): array {
386 if (empty($this->serverinfo)) {
387 $this->query_start('--pg_version()', null, SQL_QUERY_AUX);
388 $this->serverinfo = pg_version($this->pgsql);
389 $this->query_end(true);
391 return [
392 'description' => $this->serverinfo['server'],
393 'version' => $this->serverinfo['server'],
398 * Returns supported query parameter types
399 * @return int bitmask of accepted SQL_PARAMS_*
401 protected function allowed_param_types() {
402 return SQL_PARAMS_DOLLAR;
406 * Returns last error reported by database engine.
407 * @return string error message
409 public function get_last_error() {
410 return pg_last_error($this->pgsql);
414 * Return tables in database WITHOUT current prefix.
415 * @param bool $usecache if true, returns list of cached tables.
416 * @return array of table names in lowercase and without prefix
418 public function get_tables($usecache=true) {
419 if ($usecache and $this->tables !== null) {
420 return $this->tables;
422 $this->tables = array();
423 $prefix = str_replace('_', '|_', $this->prefix);
424 $sql = "SELECT c.relname
425 FROM pg_catalog.pg_class c
426 JOIN pg_catalog.pg_namespace as ns ON ns.oid = c.relnamespace
427 WHERE c.relname LIKE '$prefix%' ESCAPE '|'
428 AND c.relkind = 'r'
429 AND (ns.nspname = current_schema() OR ns.oid = pg_my_temp_schema())";
430 $this->query_start($sql, null, SQL_QUERY_AUX_READONLY);
431 $result = pg_query($this->pgsql, $sql);
432 $this->query_end($result);
434 if ($result) {
435 while ($row = pg_fetch_row($result)) {
436 $tablename = reset($row);
437 if ($this->prefix !== false && $this->prefix !== '') {
438 if (strpos($tablename, $this->prefix) !== 0) {
439 continue;
441 $tablename = substr($tablename, strlen($this->prefix));
443 $this->tables[$tablename] = $tablename;
445 pg_free_result($result);
447 return $this->tables;
451 * Constructs 'IN()' or '=' sql fragment
453 * Method overriding {@see moodle_database::get_in_or_equal} to be able to use
454 * more than 65535 elements in $items array.
456 * @param mixed $items A single value or array of values for the expression.
457 * @param int $type Parameter bounding type : SQL_PARAMS_QM or SQL_PARAMS_NAMED.
458 * @param string $prefix Named parameter placeholder prefix (a unique counter value is appended to each parameter name).
459 * @param bool $equal True means we want to equate to the constructed expression, false means we don't want to equate to it.
460 * @param mixed $onemptyitems This defines the behavior when the array of items provided is empty. Defaults to false,
461 * meaning throw exceptions. Other values will become part of the returned SQL fragment.
462 * @throws coding_exception | dml_exception
463 * @return array A list containing the constructed sql fragment and an array of parameters.
465 public function get_in_or_equal($items, $type=SQL_PARAMS_QM, $prefix='param', $equal=true, $onemptyitems=false): array {
466 // We only interfere if number of items in expression exceeds 16 bit value.
467 if (!is_array($items) || count($items) < 65535) {
468 return parent::get_in_or_equal($items, $type, $prefix, $equal, $onemptyitems);
471 // Determine the type from the first value. We don't need to be very smart here,
472 // it is developer's responsibility to make sure that variable type is matching
473 // field type, if not the case, DB engine will hint. Also mixing types won't work
474 // here anyway, so we ignore NULL or boolean (unlikely you need 56k values of
475 // these types only).
476 $cast = is_string(current($items)) ? '::text' : '::bigint';
478 if ($type == SQL_PARAMS_QM) {
479 if ($equal) {
480 $sql = 'IN (VALUES ('.implode('),(', array_fill(0, count($items), '?'.$cast)).'))';
481 } else {
482 $sql = 'NOT IN (VALUES ('.implode('),(', array_fill(0, count($items), '?'.$cast)).'))';
484 $params = array_values($items);
485 } else if ($type == SQL_PARAMS_NAMED) {
486 if (empty($prefix)) {
487 $prefix = 'param';
489 $params = [];
490 $sql = [];
491 foreach ($items as $item) {
492 $param = $prefix.$this->inorequaluniqueindex++;
493 $params[$param] = $item;
494 $sql[] = ':'.$param.$cast;
496 if ($equal) {
497 $sql = 'IN (VALUES ('.implode('),(', $sql).'))';
498 } else {
499 $sql = 'NOT IN (VALUES ('.implode('),(', $sql).'))';
501 } else {
502 throw new dml_exception('typenotimplement');
504 return [$sql, $params];
508 * Return table indexes - everything lowercased.
509 * @param string $table The table we want to get indexes from.
510 * @return array of arrays
512 public function get_indexes($table) {
513 $indexes = array();
514 $tablename = $this->prefix.$table;
516 $sql = "SELECT i.*
517 FROM pg_catalog.pg_indexes i
518 JOIN pg_catalog.pg_namespace as ns ON ns.nspname = i.schemaname
519 WHERE i.tablename = '$tablename'
520 AND (i.schemaname = current_schema() OR ns.oid = pg_my_temp_schema())";
522 $this->query_start($sql, null, SQL_QUERY_AUX_READONLY);
523 $result = pg_query($this->pgsql, $sql);
524 $this->query_end($result);
526 if ($result) {
527 while ($row = pg_fetch_assoc($result)) {
528 // The index definition could be generated schema-qualifying the target table name
529 // for safety, depending on the pgsql version (CVE-2018-1058).
530 if (!preg_match('/CREATE (|UNIQUE )INDEX ([^\s]+) ON (|'.$row['schemaname'].'\.)'.$tablename.' USING ([^\s]+) \(([^\)]+)\)/i', $row['indexdef'], $matches)) {
531 continue;
533 if ($matches[5] === 'id') {
534 continue;
536 $columns = explode(',', $matches[5]);
537 foreach ($columns as $k=>$column) {
538 $column = trim($column);
539 if ($pos = strpos($column, ' ')) {
540 // index type is separated by space
541 $column = substr($column, 0, $pos);
543 $columns[$k] = $this->trim_quotes($column);
545 $indexes[$row['indexname']] = array('unique'=>!empty($matches[1]),
546 'columns'=>$columns);
548 pg_free_result($result);
550 return $indexes;
554 * Returns detailed information about columns in table.
556 * @param string $table name
557 * @return database_column_info[] array of database_column_info objects indexed with column names
559 protected function fetch_columns(string $table): array {
560 $structure = array();
562 $tablename = $this->prefix.$table;
564 $sql = "SELECT a.attnum, a.attname AS field, t.typname AS type, a.attlen, a.atttypmod, a.attnotnull, a.atthasdef,
565 CASE WHEN a.atthasdef THEN pg_catalog.pg_get_expr(d.adbin, d.adrelid) ELSE '' END AS adsrc
566 FROM pg_catalog.pg_class c
567 JOIN pg_catalog.pg_namespace as ns ON ns.oid = c.relnamespace
568 JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid
569 JOIN pg_catalog.pg_type t ON t.oid = a.atttypid
570 LEFT JOIN pg_catalog.pg_attrdef d ON (d.adrelid = c.oid AND d.adnum = a.attnum)
571 WHERE relkind = 'r' AND c.relname = '$tablename' AND c.reltype > 0 AND a.attnum > 0
572 AND (ns.nspname = current_schema() OR ns.oid = pg_my_temp_schema())
573 ORDER BY a.attnum";
575 $this->query_start($sql, null, SQL_QUERY_AUX_READONLY);
576 $result = pg_query($this->pgsql, $sql);
577 $this->query_end($result);
579 if (!$result) {
580 return array();
582 while ($rawcolumn = pg_fetch_object($result)) {
584 $info = new stdClass();
585 $info->name = $rawcolumn->field;
586 $matches = null;
588 if ($rawcolumn->type === 'varchar') {
589 $info->type = 'varchar';
590 $info->meta_type = 'C';
591 $info->max_length = $rawcolumn->atttypmod - 4;
592 $info->scale = null;
593 $info->not_null = ($rawcolumn->attnotnull === 't');
594 $info->has_default = ($rawcolumn->atthasdef === 't');
595 if ($info->has_default) {
596 $parts = explode('::', $rawcolumn->adsrc);
597 if (count($parts) > 1) {
598 $info->default_value = reset($parts);
599 $info->default_value = trim($info->default_value, "'");
600 } else {
601 $info->default_value = $rawcolumn->adsrc;
603 } else {
604 $info->default_value = null;
606 $info->primary_key = false;
607 $info->binary = false;
608 $info->unsigned = null;
609 $info->auto_increment= false;
610 $info->unique = null;
612 } else if (preg_match('/int(\d)/i', $rawcolumn->type, $matches)) {
613 $info->type = 'int';
614 if (strpos($rawcolumn->adsrc ?? '', 'nextval') === 0) {
615 $info->primary_key = true;
616 $info->meta_type = 'R';
617 $info->unique = true;
618 $info->auto_increment= true;
619 $info->has_default = false;
620 } else {
621 $info->primary_key = false;
622 $info->meta_type = 'I';
623 $info->unique = null;
624 $info->auto_increment= false;
625 $info->has_default = ($rawcolumn->atthasdef === 't');
627 // Return number of decimals, not bytes here.
628 if ($matches[1] >= 8) {
629 $info->max_length = 18;
630 } else if ($matches[1] >= 4) {
631 $info->max_length = 9;
632 } else if ($matches[1] >= 2) {
633 $info->max_length = 4;
634 } else if ($matches[1] >= 1) {
635 $info->max_length = 2;
636 } else {
637 $info->max_length = 0;
639 $info->scale = null;
640 $info->not_null = ($rawcolumn->attnotnull === 't');
641 if ($info->has_default) {
642 // PG 9.5+ uses ::<TYPE> syntax for some defaults.
643 $parts = explode('::', $rawcolumn->adsrc);
644 if (count($parts) > 1) {
645 $info->default_value = reset($parts);
646 } else {
647 $info->default_value = $rawcolumn->adsrc;
649 $info->default_value = trim($info->default_value, "()'");
650 } else {
651 $info->default_value = null;
653 $info->binary = false;
654 $info->unsigned = false;
656 } else if ($rawcolumn->type === 'numeric') {
657 $info->type = $rawcolumn->type;
658 $info->meta_type = 'N';
659 $info->primary_key = false;
660 $info->binary = false;
661 $info->unsigned = null;
662 $info->auto_increment= false;
663 $info->unique = null;
664 $info->not_null = ($rawcolumn->attnotnull === 't');
665 $info->has_default = ($rawcolumn->atthasdef === 't');
666 if ($info->has_default) {
667 // PG 9.5+ uses ::<TYPE> syntax for some defaults.
668 $parts = explode('::', $rawcolumn->adsrc);
669 if (count($parts) > 1) {
670 $info->default_value = reset($parts);
671 } else {
672 $info->default_value = $rawcolumn->adsrc;
674 $info->default_value = trim($info->default_value, "()'");
675 } else {
676 $info->default_value = null;
678 $info->max_length = $rawcolumn->atttypmod >> 16;
679 $info->scale = ($rawcolumn->atttypmod & 0xFFFF) - 4;
681 } else if (preg_match('/float(\d)/i', $rawcolumn->type, $matches)) {
682 $info->type = 'float';
683 $info->meta_type = 'N';
684 $info->primary_key = false;
685 $info->binary = false;
686 $info->unsigned = null;
687 $info->auto_increment= false;
688 $info->unique = null;
689 $info->not_null = ($rawcolumn->attnotnull === 't');
690 $info->has_default = ($rawcolumn->atthasdef === 't');
691 if ($info->has_default) {
692 // PG 9.5+ uses ::<TYPE> syntax for some defaults.
693 $parts = explode('::', $rawcolumn->adsrc);
694 if (count($parts) > 1) {
695 $info->default_value = reset($parts);
696 } else {
697 $info->default_value = $rawcolumn->adsrc;
699 $info->default_value = trim($info->default_value, "()'");
700 } else {
701 $info->default_value = null;
703 // just guess expected number of deciaml places :-(
704 if ($matches[1] == 8) {
705 // total 15 digits
706 $info->max_length = 8;
707 $info->scale = 7;
708 } else {
709 // total 6 digits
710 $info->max_length = 4;
711 $info->scale = 2;
714 } else if ($rawcolumn->type === 'text') {
715 $info->type = $rawcolumn->type;
716 $info->meta_type = 'X';
717 $info->max_length = -1;
718 $info->scale = null;
719 $info->not_null = ($rawcolumn->attnotnull === 't');
720 $info->has_default = ($rawcolumn->atthasdef === 't');
721 if ($info->has_default) {
722 $parts = explode('::', $rawcolumn->adsrc);
723 if (count($parts) > 1) {
724 $info->default_value = reset($parts);
725 $info->default_value = trim($info->default_value, "'");
726 } else {
727 $info->default_value = $rawcolumn->adsrc;
729 } else {
730 $info->default_value = null;
732 $info->primary_key = false;
733 $info->binary = false;
734 $info->unsigned = null;
735 $info->auto_increment= false;
736 $info->unique = null;
738 } else if ($rawcolumn->type === 'bytea') {
739 $info->type = $rawcolumn->type;
740 $info->meta_type = 'B';
741 $info->max_length = -1;
742 $info->scale = null;
743 $info->not_null = ($rawcolumn->attnotnull === 't');
744 $info->has_default = false;
745 $info->default_value = null;
746 $info->primary_key = false;
747 $info->binary = true;
748 $info->unsigned = null;
749 $info->auto_increment= false;
750 $info->unique = null;
754 $structure[$info->name] = new database_column_info($info);
757 pg_free_result($result);
759 return $structure;
763 * Normalise values based in RDBMS dependencies (booleans, LOBs...)
765 * @param database_column_info $column column metadata corresponding with the value we are going to normalise
766 * @param mixed $value value we are going to normalise
767 * @return mixed the normalised value
769 protected function normalise_value($column, $value) {
770 $this->detect_objects($value);
772 if (is_bool($value)) { // Always, convert boolean to int
773 $value = (int)$value;
775 } else if ($column->meta_type === 'B') {
776 if (!is_null($value)) {
777 // standard_conforming_strings must be enabled, otherwise pg_escape_bytea() will double escape
778 // \ and produce data errors. This is set on the connection.
779 $value = pg_escape_bytea($this->pgsql, $value);
782 } else if ($value === '') {
783 if ($column->meta_type === 'I' or $column->meta_type === 'F' or $column->meta_type === 'N') {
784 $value = 0; // prevent '' problems in numeric fields
787 return $value;
791 * Is db in unicode mode?
792 * @return bool
794 public function setup_is_unicodedb() {
795 // Get PostgreSQL server_encoding value
796 $sql = 'SHOW server_encoding';
797 $this->query_start($sql, null, SQL_QUERY_AUX_READONLY);
798 $result = pg_query($this->pgsql, $sql);
799 $this->query_end($result);
801 if (!$result) {
802 return false;
804 $rawcolumn = pg_fetch_object($result);
805 $encoding = $rawcolumn->server_encoding;
806 pg_free_result($result);
808 return (strtoupper($encoding) == 'UNICODE' || strtoupper($encoding) == 'UTF8');
812 * Do NOT use in code, to be used by database_manager only!
813 * @param string|array $sql query
814 * @param array|null $tablenames an array of xmldb table names affected by this request.
815 * @return bool true
816 * @throws ddl_change_structure_exception A DDL specific exception is thrown for any errors.
818 public function change_database_structure($sql, $tablenames = null) {
819 $this->get_manager(); // Includes DDL exceptions classes ;-)
820 if (is_array($sql)) {
821 $sql = implode("\n;\n", $sql);
823 if (!$this->is_transaction_started()) {
824 // It is better to do all or nothing, this helps with recovery...
825 $sql = "BEGIN ISOLATION LEVEL SERIALIZABLE;\n$sql\n; COMMIT";
828 try {
829 $this->query_start($sql, null, SQL_QUERY_STRUCTURE);
830 $result = pg_query($this->pgsql, $sql);
831 $this->query_end($result);
832 pg_free_result($result);
833 } catch (ddl_change_structure_exception $e) {
834 if (!$this->is_transaction_started()) {
835 $result = @pg_query($this->pgsql, "ROLLBACK");
836 @pg_free_result($result);
838 $this->reset_caches($tablenames);
839 throw $e;
842 $this->reset_caches($tablenames);
843 return true;
847 * Execute general sql query. Should be used only when no other method suitable.
848 * Do NOT use this to make changes in db structure, use database_manager methods instead!
849 * @param string $sql query
850 * @param array $params query parameters
851 * @return bool true
852 * @throws dml_exception A DML specific exception is thrown for any errors.
854 public function execute($sql, ?array $params=null) {
855 list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
857 if (strpos($sql, ';') !== false) {
858 throw new coding_exception('moodle_database::execute() Multiple sql statements found or bound parameters not used properly in query!');
861 $this->query_start($sql, $params, SQL_QUERY_UPDATE);
862 $result = pg_query_params($this->pgsql, $sql, $params);
863 $this->query_end($result);
865 pg_free_result($result);
866 return true;
870 * Get a number of records as a moodle_recordset using a SQL statement.
872 * Since this method is a little less readable, use of it should be restricted to
873 * code where it's possible there might be large datasets being returned. For known
874 * small datasets use get_records_sql - it leads to simpler code.
876 * The return type is like:
877 * @see function get_recordset.
879 * @param string $sql the SQL select query to execute.
880 * @param array $params array of sql parameters
881 * @param int $limitfrom return a subset of records, starting at this point (optional, required if $limitnum is set).
882 * @param int $limitnum return a subset comprising this many records (optional, required if $limitfrom is set).
883 * @return moodle_recordset instance
884 * @throws dml_exception A DML specific exception is thrown for any errors.
886 public function get_recordset_sql($sql, ?array $params=null, $limitfrom=0, $limitnum=0) {
888 list($limitfrom, $limitnum) = $this->normalise_limit_from_num($limitfrom, $limitnum);
890 if ($limitnum) {
891 $sql .= " LIMIT $limitnum";
893 if ($limitfrom) {
894 $sql .= " OFFSET $limitfrom";
897 list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
899 // For any query that doesn't explicitly specify a limit, we must use cursors to stop it
900 // loading the entire thing (unless the config setting is turned off).
901 $usecursors = !$limitnum && ($this->get_fetch_buffer_size() > 0);
902 if ($usecursors) {
903 // Work out the cursor unique identifer. This is based on a simple count used which
904 // should be OK because the identifiers only need to be unique within the current
905 // transaction.
906 $this->cursorcount++;
907 $cursorname = 'crs' . $this->cursorcount;
909 // Do the query to a cursor.
910 $sql = 'DECLARE ' . $cursorname . ' NO SCROLL CURSOR WITH HOLD FOR ' . $sql;
911 } else {
912 $cursorname = '';
915 $this->query_start($sql, $params, SQL_QUERY_SELECT);
917 $result = pg_query_params($this->pgsql, $sql, $params);
919 $this->query_end($result);
920 if ($usecursors) {
921 pg_free_result($result);
922 $result = null;
925 return new pgsql_native_moodle_recordset($result, $this, $cursorname);
929 * Gets size of fetch buffer used for recordset queries.
931 * If this returns 0 then cursors will not be used, meaning recordset queries will occupy enough
932 * memory as needed for the Postgres library to hold the entire query results in memory.
934 * @return int Fetch buffer size or 0 indicating not to use cursors
936 protected function get_fetch_buffer_size() {
937 if (array_key_exists('fetchbuffersize', $this->dboptions)) {
938 return (int)$this->dboptions['fetchbuffersize'];
939 } else {
940 return self::DEFAULT_FETCH_BUFFER_SIZE;
945 * Retrieves data from cursor. For use by recordset only; do not call directly.
947 * Return value contains the next batch of Postgres data, and a boolean indicating if this is
948 * definitely the last batch (if false, there may be more)
950 * @param string $cursorname Name of cursor to read from
951 * @return array Array with 2 elements (next data batch and boolean indicating last batch)
953 public function fetch_from_cursor($cursorname) {
954 $count = $this->get_fetch_buffer_size();
956 $sql = 'FETCH ' . $count . ' FROM ' . $cursorname;
958 $this->query_start($sql, [], SQL_QUERY_AUX);
959 $result = pg_query($this->pgsql, $sql);
960 $last = pg_num_rows($result) !== $count;
962 $this->query_end($result);
964 return [$result, $last];
968 * Closes a cursor. For use by recordset only; do not call directly.
970 * @param string $cursorname Name of cursor to close
971 * @return bool True if we actually closed one, false if the transaction was cancelled
973 public function close_cursor($cursorname) {
974 // If the transaction got cancelled, then ignore this request.
975 $sql = 'CLOSE ' . $cursorname;
976 $this->query_start($sql, [], SQL_QUERY_AUX);
977 $result = pg_query($this->pgsql, $sql);
978 $this->query_end($result);
979 if ($result) {
980 pg_free_result($result);
982 return true;
986 * A faster version of pg_field_type
988 * The pg_field_type function in the php postgres driver internally makes an sql call
989 * to get the list of field types which it statically caches only for a single request.
990 * This wraps it in a cache keyed by oid to avoid these DB calls on every request.
992 * @param resource|PgSql\Result $result
993 * @param int $fieldnumber
994 * @return string Field type
996 public function pg_field_type($result, int $fieldnumber) {
997 static $map;
998 $cache = $this->get_metacache();
1000 // Getting the oid doesn't make an internal query.
1001 $oid = pg_field_type_oid($result, $fieldnumber);
1002 if (!$map) {
1003 $map = $cache->get('oid2typname');
1005 if ($map === false) {
1006 $map = [];
1008 if (isset($map[$oid])) {
1009 return $map[$oid];
1011 $map[$oid] = pg_field_type($result, $fieldnumber);
1012 $cache->set('oid2typname', $map);
1013 return $map[$oid];
1017 * Get a number of records as an array of objects using a SQL statement.
1019 * Return value is like:
1020 * @see function get_records.
1022 * @param string $sql the SQL select query to execute. The first column of this SELECT statement
1023 * must be a unique value (usually the 'id' field), as it will be used as the key of the
1024 * returned array.
1025 * @param array $params array of sql parameters
1026 * @param int $limitfrom return a subset of records, starting at this point (optional, required if $limitnum is set).
1027 * @param int $limitnum return a subset comprising this many records (optional, required if $limitfrom is set).
1028 * @return array of objects, or empty array if no records were found
1029 * @throws dml_exception A DML specific exception is thrown for any errors.
1031 public function get_records_sql($sql, ?array $params = null, $limitfrom = 0, $limitnum = 0) {
1032 list($limitfrom, $limitnum) = $this->normalise_limit_from_num($limitfrom, $limitnum);
1034 if ($limitnum) {
1035 $sql .= " LIMIT $limitnum";
1037 if ($limitfrom) {
1038 $sql .= " OFFSET $limitfrom";
1041 list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
1042 $this->query_start($sql, $params, SQL_QUERY_SELECT);
1043 $result = pg_query_params($this->pgsql, $sql, $params);
1044 $this->query_end($result);
1046 // find out if there are any blobs
1047 $numfields = pg_num_fields($result);
1048 $blobs = array();
1049 for ($i = 0; $i < $numfields; $i++) {
1050 $type = $this->pg_field_type($result, $i);
1051 if ($type == 'bytea') {
1052 $blobs[] = pg_field_name($result, $i);
1056 $return = [];
1057 while ($row = pg_fetch_assoc($result)) {
1058 $id = reset($row);
1059 if ($blobs) {
1060 foreach ($blobs as $blob) {
1061 $row[$blob] = ($row[$blob] !== null ? pg_unescape_bytea($row[$blob]) : null);
1064 if (isset($return[$id])) {
1065 $colname = key($row);
1066 debugging("Did you remember to make the first column something unique in your call to get_records? Duplicate value '$id' found in column '$colname'.", DEBUG_DEVELOPER);
1068 $return[$id] = (object) $row;
1071 return $return;
1075 * Selects records and return values (first field) as an array using a SQL statement.
1077 * @param string $sql The SQL query
1078 * @param array $params array of sql parameters
1079 * @return array of values
1080 * @throws dml_exception A DML specific exception is thrown for any errors.
1082 public function get_fieldset_sql($sql, ?array $params=null) {
1083 list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
1085 $this->query_start($sql, $params, SQL_QUERY_SELECT);
1086 $result = pg_query_params($this->pgsql, $sql, $params);
1087 $this->query_end($result);
1089 $return = pg_fetch_all_columns($result, 0);
1091 if ($this->pg_field_type($result, 0) == 'bytea') {
1092 foreach ($return as $key => $value) {
1093 $return[$key] = ($value === null ? $value : pg_unescape_bytea($value));
1097 pg_free_result($result);
1099 return $return;
1103 * Insert new record into database, as fast as possible, no safety checks, lobs not supported.
1104 * @param string $table name
1105 * @param mixed $params data record as object or array
1106 * @param bool $returnit return it of inserted record
1107 * @param bool $bulk true means repeated inserts expected
1108 * @param bool $customsequence true if 'id' included in $params, disables $returnid
1109 * @return bool|int true or new id
1110 * @throws dml_exception A DML specific exception is thrown for any errors.
1112 public function insert_record_raw($table, $params, $returnid=true, $bulk=false, $customsequence=false) {
1113 if (!is_array($params)) {
1114 $params = (array)$params;
1117 $returning = "";
1119 if ($customsequence) {
1120 if (!isset($params['id'])) {
1121 throw new coding_exception('moodle_database::insert_record_raw() id field must be specified if custom sequences used.');
1123 $returnid = false;
1124 } else {
1125 if ($returnid) {
1126 $returning = "RETURNING id";
1127 unset($params['id']);
1128 } else {
1129 unset($params['id']);
1133 if (empty($params)) {
1134 throw new coding_exception('moodle_database::insert_record_raw() no fields found.');
1137 $fields = implode(',', array_keys($params));
1138 $values = array();
1139 $i = 1;
1140 foreach ($params as $value) {
1141 $this->detect_objects($value);
1142 $values[] = "\$".$i++;
1144 $values = implode(',', $values);
1146 $sql = "INSERT INTO {$this->prefix}$table ($fields) VALUES($values) $returning";
1147 $this->query_start($sql, $params, SQL_QUERY_INSERT);
1148 $result = pg_query_params($this->pgsql, $sql, $params);
1149 $this->query_end($result);
1151 if ($returning !== "") {
1152 $row = pg_fetch_assoc($result);
1153 $params['id'] = reset($row);
1155 pg_free_result($result);
1157 if (!$returnid) {
1158 return true;
1161 return (int)$params['id'];
1165 * Insert a record into a table and return the "id" field if required.
1167 * Some conversions and safety checks are carried out. Lobs are supported.
1168 * If the return ID isn't required, then this just reports success as true/false.
1169 * $data is an object containing needed data
1170 * @param string $table The database table to be inserted into
1171 * @param object|array $dataobject A data object with values for one or more fields in the record
1172 * @param bool $returnid Should the id of the newly created record entry be returned? If this option is not requested then true/false is returned.
1173 * @return bool|int true or new id
1174 * @throws dml_exception A DML specific exception is thrown for any errors.
1176 public function insert_record($table, $dataobject, $returnid=true, $bulk=false) {
1177 $dataobject = (array)$dataobject;
1179 $columns = $this->get_columns($table);
1180 if (empty($columns)) {
1181 throw new dml_exception('ddltablenotexist', $table);
1184 $cleaned = array();
1186 foreach ($dataobject as $field=>$value) {
1187 if ($field === 'id') {
1188 continue;
1190 if (!isset($columns[$field])) {
1191 continue;
1193 $column = $columns[$field];
1194 $cleaned[$field] = $this->normalise_value($column, $value);
1197 return $this->insert_record_raw($table, $cleaned, $returnid, $bulk);
1202 * Insert multiple records into database as fast as possible.
1204 * Order of inserts is maintained, but the operation is not atomic,
1205 * use transactions if necessary.
1207 * This method is intended for inserting of large number of small objects,
1208 * do not use for huge objects with text or binary fields.
1210 * @since Moodle 2.7
1212 * @param string $table The database table to be inserted into
1213 * @param array|Traversable $dataobjects list of objects to be inserted, must be compatible with foreach
1214 * @return void does not return new record ids
1216 * @throws coding_exception if data objects have different structure
1217 * @throws dml_exception A DML specific exception is thrown for any errors.
1219 public function insert_records($table, $dataobjects) {
1220 if (!is_array($dataobjects) and !($dataobjects instanceof Traversable)) {
1221 throw new coding_exception('insert_records() passed non-traversable object');
1224 // PostgreSQL does not seem to have problems with huge queries.
1225 $chunksize = 500;
1226 if (!empty($this->dboptions['bulkinsertsize'])) {
1227 $chunksize = (int)$this->dboptions['bulkinsertsize'];
1230 $columns = $this->get_columns($table, true);
1232 $fields = null;
1233 $count = 0;
1234 $chunk = array();
1235 foreach ($dataobjects as $dataobject) {
1236 if (!is_array($dataobject) and !is_object($dataobject)) {
1237 throw new coding_exception('insert_records() passed invalid record object');
1239 $dataobject = (array)$dataobject;
1240 if ($fields === null) {
1241 $fields = array_keys($dataobject);
1242 $columns = array_intersect_key($columns, $dataobject);
1243 unset($columns['id']);
1244 } else if ($fields !== array_keys($dataobject)) {
1245 throw new coding_exception('All dataobjects in insert_records() must have the same structure!');
1248 $count++;
1249 $chunk[] = $dataobject;
1251 if ($count === $chunksize) {
1252 $this->insert_chunk($table, $chunk, $columns);
1253 $chunk = array();
1254 $count = 0;
1258 if ($count) {
1259 $this->insert_chunk($table, $chunk, $columns);
1264 * Insert records in chunks, strict param types...
1266 * Note: can be used only from insert_records().
1268 * @param string $table
1269 * @param array $chunk
1270 * @param database_column_info[] $columns
1272 protected function insert_chunk($table, array $chunk, array $columns) {
1273 $i = 1;
1274 $params = array();
1275 $values = array();
1276 foreach ($chunk as $dataobject) {
1277 $vals = array();
1278 foreach ($columns as $field => $column) {
1279 $params[] = $this->normalise_value($column, $dataobject[$field]);
1280 $vals[] = "\$".$i++;
1282 $values[] = '('.implode(',', $vals).')';
1285 $fieldssql = '('.implode(',', array_keys($columns)).')';
1286 $valuessql = implode(',', $values);
1288 $sql = "INSERT INTO {$this->prefix}$table $fieldssql VALUES $valuessql";
1289 $this->query_start($sql, $params, SQL_QUERY_INSERT);
1290 $result = pg_query_params($this->pgsql, $sql, $params);
1291 $this->query_end($result);
1292 pg_free_result($result);
1296 * Import a record into a table, id field is required.
1297 * Safety checks are NOT carried out. Lobs are supported.
1299 * @param string $table name of database table to be inserted into
1300 * @param object $dataobject A data object with values for one or more fields in the record
1301 * @return bool true
1302 * @throws dml_exception A DML specific exception is thrown for any errors.
1304 public function import_record($table, $dataobject) {
1305 $dataobject = (array)$dataobject;
1307 $columns = $this->get_columns($table);
1308 $cleaned = array();
1310 foreach ($dataobject as $field=>$value) {
1311 $this->detect_objects($value);
1312 if (!isset($columns[$field])) {
1313 continue;
1315 $column = $columns[$field];
1316 $cleaned[$field] = $this->normalise_value($column, $value);
1319 return $this->insert_record_raw($table, $cleaned, false, true, true);
1323 * Update record in database, as fast as possible, no safety checks, lobs not supported.
1324 * @param string $table name
1325 * @param stdClass|array $params data record as object or array
1326 * @param bool true means repeated updates expected
1327 * @return bool true
1328 * @throws dml_exception A DML specific exception is thrown for any errors.
1330 public function update_record_raw($table, $params, $bulk=false) {
1331 $params = (array)$params;
1333 if (!isset($params['id'])) {
1334 throw new coding_exception('moodle_database::update_record_raw() id field must be specified.');
1336 $id = $params['id'];
1337 unset($params['id']);
1339 if (empty($params)) {
1340 throw new coding_exception('moodle_database::update_record_raw() no fields found.');
1343 $i = 1;
1345 $sets = array();
1346 foreach ($params as $field=>$value) {
1347 $this->detect_objects($value);
1348 $sets[] = "$field = \$".$i++;
1351 $params[] = $id; // last ? in WHERE condition
1353 $sets = implode(',', $sets);
1354 $sql = "UPDATE {$this->prefix}$table SET $sets WHERE id=\$".$i;
1356 $this->query_start($sql, $params, SQL_QUERY_UPDATE);
1357 $result = pg_query_params($this->pgsql, $sql, $params);
1358 $this->query_end($result);
1360 pg_free_result($result);
1361 return true;
1365 * Update a record in a table
1367 * $dataobject is an object containing needed data
1368 * Relies on $dataobject having a variable "id" to
1369 * specify the record to update
1371 * @param string $table The database table to be checked against.
1372 * @param stdClass|array $dataobject An object with contents equal to fieldname=>fieldvalue.
1373 * Must have an entry for 'id' to map to the table specified.
1374 * @param bool true means repeated updates expected
1375 * @return bool true
1376 * @throws dml_exception A DML specific exception is thrown for any errors.
1378 public function update_record($table, $dataobject, $bulk=false) {
1379 $dataobject = (array)$dataobject;
1381 $columns = $this->get_columns($table);
1382 $cleaned = array();
1384 foreach ($dataobject as $field=>$value) {
1385 if (!isset($columns[$field])) {
1386 continue;
1388 $column = $columns[$field];
1389 $cleaned[$field] = $this->normalise_value($column, $value);
1392 $this->update_record_raw($table, $cleaned, $bulk);
1394 return true;
1398 * Set a single field in every table record which match a particular WHERE clause.
1400 * @param string $table The database table to be checked against.
1401 * @param string $newfield the field to set.
1402 * @param string $newvalue the value to set the field to.
1403 * @param string $select A fragment of SQL to be used in a where clause in the SQL call.
1404 * @param array $params array of sql parameters
1405 * @return bool true
1406 * @throws dml_exception A DML specific exception is thrown for any errors.
1408 public function set_field_select($table, $newfield, $newvalue, $select, ?array $params=null) {
1410 if ($select) {
1411 $select = "WHERE $select";
1413 if (is_null($params)) {
1414 $params = array();
1416 list($select, $params, $type) = $this->fix_sql_params($select, $params);
1417 $i = count($params)+1;
1419 // Get column metadata
1420 $columns = $this->get_columns($table);
1421 $column = $columns[$newfield];
1423 $normalisedvalue = $this->normalise_value($column, $newvalue);
1425 $newfield = "$newfield = \$" . $i;
1426 $params[] = $normalisedvalue;
1427 $sql = "UPDATE {$this->prefix}$table SET $newfield $select";
1429 $this->query_start($sql, $params, SQL_QUERY_UPDATE);
1430 $result = pg_query_params($this->pgsql, $sql, $params);
1431 $this->query_end($result);
1433 pg_free_result($result);
1435 return true;
1439 * Delete one or more records from a table which match a particular WHERE clause, lobs not supported.
1441 * @param string $table The database table to be checked against.
1442 * @param string $select A fragment of SQL to be used in a where clause in the SQL call (used to define the selection criteria).
1443 * @param array $params array of sql parameters
1444 * @return bool true
1445 * @throws dml_exception A DML specific exception is thrown for any errors.
1447 public function delete_records_select($table, $select, ?array $params=null) {
1448 if ($select) {
1449 $select = "WHERE $select";
1451 $sql = "DELETE FROM {$this->prefix}$table $select";
1453 list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
1455 $this->query_start($sql, $params, SQL_QUERY_UPDATE);
1456 $result = pg_query_params($this->pgsql, $sql, $params);
1457 $this->query_end($result);
1459 pg_free_result($result);
1461 return true;
1465 * Returns 'LIKE' part of a query.
1467 * @param string $fieldname usually name of the table column
1468 * @param string $param usually bound query parameter (?, :named)
1469 * @param bool $casesensitive use case sensitive search
1470 * @param bool $accensensitive use accent sensitive search (not all databases support accent insensitive)
1471 * @param bool $notlike true means "NOT LIKE"
1472 * @param string $escapechar escape char for '%' and '_'
1473 * @return string SQL code fragment
1475 public function sql_like($fieldname, $param, $casesensitive = true, $accentsensitive = true, $notlike = false, $escapechar = '\\') {
1476 if (strpos($param, '%') !== false) {
1477 debugging('Potential SQL injection detected, sql_like() expects bound parameters (? or :named)');
1480 // postgresql does not support accent insensitive text comparisons, sorry
1481 if ($casesensitive) {
1482 $LIKE = $notlike ? 'NOT LIKE' : 'LIKE';
1483 } else {
1484 $LIKE = $notlike ? 'NOT ILIKE' : 'ILIKE';
1486 return "$fieldname $LIKE $param ESCAPE '$escapechar'";
1489 public function sql_bitxor($int1, $int2) {
1490 return '((' . $int1 . ') # (' . $int2 . '))';
1494 * Return SQL for casting to char of given field/expression
1496 * @param string $field Table field or SQL expression to be cast
1497 * @return string
1499 public function sql_cast_to_char(string $field): string {
1500 return "CAST({$field} AS VARCHAR)";
1503 public function sql_cast_char2int($fieldname, $text=false) {
1504 return ' CAST(' . $fieldname . ' AS INT) ';
1507 public function sql_cast_char2real($fieldname, $text=false) {
1508 return " $fieldname::real ";
1511 public function sql_concat(...$arr) {
1512 $s = implode(' || ', $arr);
1513 if ($s === '') {
1514 return " '' ";
1516 // Add always empty string element so integer-exclusive concats
1517 // will work without needing to cast each element explicitly
1518 return " '' || $s ";
1521 public function sql_concat_join($separator="' '", $elements=array()) {
1522 for ($n=count($elements)-1; $n > 0 ; $n--) {
1523 array_splice($elements, $n, 0, $separator);
1525 $s = implode(' || ', $elements);
1526 if ($s === '') {
1527 return " '' ";
1529 return " $s ";
1533 * Return SQL for performing group concatenation on given field/expression
1535 * @param string $field
1536 * @param string $separator
1537 * @param string $sort
1538 * @return string
1540 public function sql_group_concat(string $field, string $separator = ', ', string $sort = ''): string {
1541 $fieldsort = $sort ? "ORDER BY {$sort}" : '';
1542 return "STRING_AGG(" . $this->sql_cast_to_char($field) . ", '{$separator}' {$fieldsort})";
1546 * Returns the SQL text to be used to order by columns, standardising the return
1547 * pattern of null values across database types to sort nulls first when ascending
1548 * and last when descending.
1550 * @param string $fieldname The name of the field we need to sort by.
1551 * @param int $sort An order to sort the results in.
1552 * @return string The piece of SQL code to be used in your statement.
1554 public function sql_order_by_null(string $fieldname, int $sort = SORT_ASC): string {
1555 return parent::sql_order_by_null($fieldname, $sort) . ' NULLS ' . ($sort == SORT_ASC ? 'FIRST' : 'LAST');
1558 public function sql_regex_supported() {
1559 return true;
1562 public function sql_regex($positivematch = true, $casesensitive = false) {
1563 if ($casesensitive) {
1564 return $positivematch ? '~' : '!~';
1565 } else {
1566 return $positivematch ? '~*' : '!~*';
1571 * Does this driver support tool_replace?
1573 * @since Moodle 2.6.1
1574 * @return bool
1576 public function replace_all_text_supported() {
1577 return true;
1580 public function session_lock_supported() {
1581 return true;
1585 * Obtain session lock
1586 * @param int $rowid id of the row with session record
1587 * @param int $timeout max allowed time to wait for the lock in seconds
1588 * @return bool success
1590 public function get_session_lock($rowid, $timeout) {
1591 // NOTE: there is a potential locking problem for database running
1592 // multiple instances of moodle, we could try to use pg_advisory_lock(int, int),
1593 // luckily there is not a big chance that they would collide
1594 if (!$this->session_lock_supported()) {
1595 return;
1598 parent::get_session_lock($rowid, $timeout);
1600 $timeoutmilli = $timeout * 1000;
1602 $sql = "SET statement_timeout TO $timeoutmilli";
1603 $this->query_start($sql, null, SQL_QUERY_AUX);
1604 $result = pg_query($this->pgsql, $sql);
1605 $this->query_end($result);
1607 if ($result) {
1608 pg_free_result($result);
1611 $sql = "SELECT pg_advisory_lock($rowid)";
1612 $this->query_start($sql, null, SQL_QUERY_AUX);
1613 $start = time();
1614 $result = pg_query($this->pgsql, $sql);
1615 $end = time();
1616 try {
1617 $this->query_end($result);
1618 } catch (dml_exception $ex) {
1619 if ($end - $start >= $timeout) {
1620 throw new dml_sessionwait_exception();
1621 } else {
1622 throw $ex;
1626 if ($result) {
1627 pg_free_result($result);
1630 $sql = "SET statement_timeout TO DEFAULT";
1631 $this->query_start($sql, null, SQL_QUERY_AUX);
1632 $result = pg_query($this->pgsql, $sql);
1633 $this->query_end($result);
1635 if ($result) {
1636 pg_free_result($result);
1640 public function release_session_lock($rowid) {
1641 if (!$this->session_lock_supported()) {
1642 return;
1644 if (!$this->used_for_db_sessions) {
1645 return;
1648 parent::release_session_lock($rowid);
1650 $sql = "SELECT pg_advisory_unlock($rowid)";
1651 $this->query_start($sql, null, SQL_QUERY_AUX);
1652 $result = pg_query($this->pgsql, $sql);
1653 $this->query_end($result);
1655 if ($result) {
1656 pg_free_result($result);
1661 * Driver specific start of real database transaction,
1662 * this can not be used directly in code.
1663 * @return void
1665 protected function begin_transaction() {
1666 $this->savepointpresent = true;
1667 $sql = "BEGIN ISOLATION LEVEL READ COMMITTED; SAVEPOINT moodle_pg_savepoint";
1668 $this->query_start($sql, null, SQL_QUERY_AUX);
1669 $result = pg_query($this->pgsql, $sql);
1670 $this->query_end($result);
1672 pg_free_result($result);
1676 * Driver specific commit of real database transaction,
1677 * this can not be used directly in code.
1678 * @return void
1680 protected function commit_transaction() {
1681 $this->savepointpresent = false;
1682 $sql = "RELEASE SAVEPOINT moodle_pg_savepoint; COMMIT";
1683 $this->query_start($sql, null, SQL_QUERY_AUX);
1684 $result = pg_query($this->pgsql, $sql);
1685 $this->query_end($result);
1687 pg_free_result($result);
1691 * Driver specific abort of real database transaction,
1692 * this can not be used directly in code.
1693 * @return void
1695 protected function rollback_transaction() {
1696 $this->savepointpresent = false;
1697 $sql = "RELEASE SAVEPOINT moodle_pg_savepoint; ROLLBACK";
1698 $this->query_start($sql, null, SQL_QUERY_AUX);
1699 $result = pg_query($this->pgsql, $sql);
1700 $this->query_end($result);
1702 pg_free_result($result);
1706 * Helper function trimming (whitespace + quotes) any string
1707 * needed because PG uses to enclose with double quotes some
1708 * fields in indexes definition and others
1710 * @param string $str string to apply whitespace + quotes trim
1711 * @return string trimmed string
1713 private function trim_quotes($str) {
1714 return trim(trim($str), "'\"");
1718 * Postgresql supports full-text search indexes.
1720 * @return bool
1722 public function is_fulltext_search_supported() {
1723 return true;
1727 * Postgresql supports the COUNT() window function and provides a performance improvement.
1729 * @return bool
1731 public function is_count_window_function_supported(): bool {
1732 return true;