Merge branch 'MDL-58454-master' of git://github.com/junpataleta/moodle
[moodle.git] / lib / dml / pgsql_native_moodle_database.php
blob97d94905ff9b75ee61d4b5fcd849877a24f011e6
1 <?php
2 // This file is part of Moodle - http://moodle.org/
3 //
4 // Moodle is free software: you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation, either version 3 of the License, or
7 // (at your option) any later version.
8 //
9 // Moodle is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
14 // You should have received a copy of the GNU General Public License
15 // along with Moodle. If not, see <http://www.gnu.org/licenses/>.
17 /**
18 * Native pgsql class representing moodle database interface.
20 * @package core_dml
21 * @copyright 2008 Petr Skoda (http://skodak.org)
22 * @license http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later
25 defined('MOODLE_INTERNAL') || die();
27 require_once(__DIR__.'/moodle_database.php');
28 require_once(__DIR__.'/pgsql_native_moodle_recordset.php');
29 require_once(__DIR__.'/pgsql_native_moodle_temptables.php');
31 /**
32 * Native pgsql class representing moodle database interface.
34 * @package core_dml
35 * @copyright 2008 Petr Skoda (http://skodak.org)
36 * @license http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later
38 class pgsql_native_moodle_database extends moodle_database {
40 /** @var resource $pgsql database resource */
41 protected $pgsql = null;
43 protected $last_error_reporting; // To handle pgsql driver default verbosity
45 /** @var bool savepoint hack for MDL-35506 - workaround for automatic transaction rollback on error */
46 protected $savepointpresent = false;
48 /** @var int Number of cursors used (for constructing a unique ID) */
49 protected $cursorcount = 0;
51 /** @var int Default number of rows to fetch at a time when using recordsets with cursors */
52 const DEFAULT_FETCH_BUFFER_SIZE = 100000;
54 /**
55 * Detects if all needed PHP stuff installed.
56 * Note: can be used before connect()
57 * @return mixed true if ok, string if something
59 public function driver_installed() {
60 if (!extension_loaded('pgsql')) {
61 return get_string('pgsqlextensionisnotpresentinphp', 'install');
63 return true;
66 /**
67 * Returns database family type - describes SQL dialect
68 * Note: can be used before connect()
69 * @return string db family name (mysql, postgres, mssql, oracle, etc.)
71 public function get_dbfamily() {
72 return 'postgres';
75 /**
76 * Returns more specific database driver type
77 * Note: can be used before connect()
78 * @return string db type mysqli, pgsql, oci, mssql, sqlsrv
80 protected function get_dbtype() {
81 return 'pgsql';
84 /**
85 * Returns general database library name
86 * Note: can be used before connect()
87 * @return string db type pdo, native
89 protected function get_dblibrary() {
90 return 'native';
93 /**
94 * Returns localised database type name
95 * Note: can be used before connect()
96 * @return string
98 public function get_name() {
99 return get_string('nativepgsql', 'install');
103 * Returns localised database configuration help.
104 * Note: can be used before connect()
105 * @return string
107 public function get_configuration_help() {
108 return get_string('nativepgsqlhelp', 'install');
112 * Connect to db
113 * Must be called before other methods.
114 * @param string $dbhost The database host.
115 * @param string $dbuser The database username.
116 * @param string $dbpass The database username's password.
117 * @param string $dbname The name of the database being connected to.
118 * @param mixed $prefix string means moodle db prefix, false used for external databases where prefix not used
119 * @param array $dboptions driver specific options
120 * @return bool true
121 * @throws dml_connection_exception if error
123 public function connect($dbhost, $dbuser, $dbpass, $dbname, $prefix, array $dboptions=null) {
124 if ($prefix == '' and !$this->external) {
125 //Enforce prefixes for everybody but mysql
126 throw new dml_exception('prefixcannotbeempty', $this->get_dbfamily());
129 $driverstatus = $this->driver_installed();
131 if ($driverstatus !== true) {
132 throw new dml_exception('dbdriverproblem', $driverstatus);
135 $this->store_settings($dbhost, $dbuser, $dbpass, $dbname, $prefix, $dboptions);
137 $pass = addcslashes($this->dbpass, "'\\");
139 // Unix socket connections should have lower overhead
140 if (!empty($this->dboptions['dbsocket']) and ($this->dbhost === 'localhost' or $this->dbhost === '127.0.0.1')) {
141 $connection = "user='$this->dbuser' password='$pass' dbname='$this->dbname'";
142 if (strpos($this->dboptions['dbsocket'], '/') !== false) {
143 // A directory was specified as the socket location.
144 $connection .= " host='".$this->dboptions['dbsocket']."'";
146 if (!empty($this->dboptions['dbport'])) {
147 // A port as specified, add it to the connection as it's used as part of the socket path.
148 $connection .= " port ='".$this->dboptions['dbport']."'";
150 } else {
151 $this->dboptions['dbsocket'] = '';
152 if (empty($this->dbname)) {
153 // probably old style socket connection - do not add port
154 $port = "";
155 } else if (empty($this->dboptions['dbport'])) {
156 $port = "port ='5432'";
157 } else {
158 $port = "port ='".$this->dboptions['dbport']."'";
160 $connection = "host='$this->dbhost' $port user='$this->dbuser' password='$pass' dbname='$this->dbname'";
163 if (empty($this->dboptions['dbhandlesoptions'])) {
164 // ALTER USER and ALTER DATABASE are overridden by these settings.
165 $options = array('--client_encoding=utf8', '--standard_conforming_strings=on');
166 // Select schema if specified, otherwise the first one wins.
167 if (!empty($this->dboptions['dbschema'])) {
168 $options[] = "-c search_path=" . addcslashes($this->dboptions['dbschema'], "'\\");
171 $connection .= " options='" . implode(' ', $options) . "'";
174 ob_start();
175 if (empty($this->dboptions['dbpersist'])) {
176 $this->pgsql = pg_connect($connection, PGSQL_CONNECT_FORCE_NEW);
177 } else {
178 $this->pgsql = pg_pconnect($connection, PGSQL_CONNECT_FORCE_NEW);
180 $dberr = ob_get_contents();
181 ob_end_clean();
183 $status = pg_connection_status($this->pgsql);
185 if ($status === false or $status === PGSQL_CONNECTION_BAD) {
186 $this->pgsql = null;
187 throw new dml_connection_exception($dberr);
190 if (!empty($this->dboptions['dbhandlesoptions'])) {
191 /* We don't trust people who just set the dbhandlesoptions, this code checks up on them.
192 * These functions do not talk to the server, they use the client library knowledge to determine state.
194 if (!empty($this->dboptions['dbschema'])) {
195 throw new dml_connection_exception('You cannot specify a schema with dbhandlesoptions, use the database to set it.');
197 if (pg_client_encoding($this->pgsql) != 'UTF8') {
198 throw new dml_connection_exception('client_encoding = UTF8 not set, it is: ' . pg_client_encoding($this->pgsql));
200 if (pg_escape_string($this->pgsql, '\\') != '\\') {
201 throw new dml_connection_exception('standard_conforming_strings = on, must be set at the database.');
205 // Connection stabilised and configured, going to instantiate the temptables controller
206 $this->temptables = new pgsql_native_moodle_temptables($this);
208 return true;
212 * Close database connection and release all resources
213 * and memory (especially circular memory references).
214 * Do NOT use connect() again, create a new instance if needed.
216 public function dispose() {
217 parent::dispose(); // Call parent dispose to write/close session and other common stuff before closing connection
218 if ($this->pgsql) {
219 pg_close($this->pgsql);
220 $this->pgsql = null;
226 * Called before each db query.
227 * @param string $sql
228 * @param array array of parameters
229 * @param int $type type of query
230 * @param mixed $extrainfo driver specific extra information
231 * @return void
233 protected function query_start($sql, array $params=null, $type, $extrainfo=null) {
234 parent::query_start($sql, $params, $type, $extrainfo);
235 // pgsql driver tents to send debug to output, we do not need that ;-)
236 $this->last_error_reporting = error_reporting(0);
240 * Called immediately after each db query.
241 * @param mixed db specific result
242 * @return void
244 protected function query_end($result) {
245 // reset original debug level
246 error_reporting($this->last_error_reporting);
247 try {
248 parent::query_end($result);
249 if ($this->savepointpresent and $this->last_type != SQL_QUERY_AUX and $this->last_type != SQL_QUERY_SELECT) {
250 $res = @pg_query($this->pgsql, "RELEASE SAVEPOINT moodle_pg_savepoint; SAVEPOINT moodle_pg_savepoint");
251 if ($res) {
252 pg_free_result($res);
255 } catch (Exception $e) {
256 if ($this->savepointpresent) {
257 $res = @pg_query($this->pgsql, "ROLLBACK TO SAVEPOINT moodle_pg_savepoint; SAVEPOINT moodle_pg_savepoint");
258 if ($res) {
259 pg_free_result($res);
262 throw $e;
267 * Returns database server info array
268 * @return array Array containing 'description' and 'version' info
270 public function get_server_info() {
271 static $info;
272 if (!$info) {
273 $this->query_start("--pg_version()", null, SQL_QUERY_AUX);
274 $info = pg_version($this->pgsql);
275 $this->query_end(true);
277 return array('description'=>$info['server'], 'version'=>$info['server']);
281 * Returns supported query parameter types
282 * @return int bitmask of accepted SQL_PARAMS_*
284 protected function allowed_param_types() {
285 return SQL_PARAMS_DOLLAR;
289 * Returns last error reported by database engine.
290 * @return string error message
292 public function get_last_error() {
293 return pg_last_error($this->pgsql);
297 * Return tables in database WITHOUT current prefix.
298 * @param bool $usecache if true, returns list of cached tables.
299 * @return array of table names in lowercase and without prefix
301 public function get_tables($usecache=true) {
302 if ($usecache and $this->tables !== null) {
303 return $this->tables;
305 $this->tables = array();
306 $prefix = str_replace('_', '|_', $this->prefix);
307 $sql = "SELECT c.relname
308 FROM pg_catalog.pg_class c
309 JOIN pg_catalog.pg_namespace as ns ON ns.oid = c.relnamespace
310 WHERE c.relname LIKE '$prefix%' ESCAPE '|'
311 AND c.relkind = 'r'
312 AND (ns.nspname = current_schema() OR ns.oid = pg_my_temp_schema())";
313 $this->query_start($sql, null, SQL_QUERY_AUX);
314 $result = pg_query($this->pgsql, $sql);
315 $this->query_end($result);
317 if ($result) {
318 while ($row = pg_fetch_row($result)) {
319 $tablename = reset($row);
320 if ($this->prefix !== false && $this->prefix !== '') {
321 if (strpos($tablename, $this->prefix) !== 0) {
322 continue;
324 $tablename = substr($tablename, strlen($this->prefix));
326 $this->tables[$tablename] = $tablename;
328 pg_free_result($result);
330 return $this->tables;
334 * Return table indexes - everything lowercased.
335 * @param string $table The table we want to get indexes from.
336 * @return array of arrays
338 public function get_indexes($table) {
339 $indexes = array();
340 $tablename = $this->prefix.$table;
342 $sql = "SELECT i.*
343 FROM pg_catalog.pg_indexes i
344 JOIN pg_catalog.pg_namespace as ns ON ns.nspname = i.schemaname
345 WHERE i.tablename = '$tablename'
346 AND (i.schemaname = current_schema() OR ns.oid = pg_my_temp_schema())";
348 $this->query_start($sql, null, SQL_QUERY_AUX);
349 $result = pg_query($this->pgsql, $sql);
350 $this->query_end($result);
352 if ($result) {
353 while ($row = pg_fetch_assoc($result)) {
354 // The index definition could be generated schema-qualifying the target table name
355 // for safety, depending on the pgsql version (CVE-2018-1058).
356 if (!preg_match('/CREATE (|UNIQUE )INDEX ([^\s]+) ON (|'.$row['schemaname'].'\.)'.$tablename.' USING ([^\s]+) \(([^\)]+)\)/i', $row['indexdef'], $matches)) {
357 continue;
359 if ($matches[5] === 'id') {
360 continue;
362 $columns = explode(',', $matches[5]);
363 foreach ($columns as $k=>$column) {
364 $column = trim($column);
365 if ($pos = strpos($column, ' ')) {
366 // index type is separated by space
367 $column = substr($column, 0, $pos);
369 $columns[$k] = $this->trim_quotes($column);
371 $indexes[$row['indexname']] = array('unique'=>!empty($matches[1]),
372 'columns'=>$columns);
374 pg_free_result($result);
376 return $indexes;
380 * Returns detailed information about columns in table. This information is cached internally.
381 * @param string $table name
382 * @param bool $usecache
383 * @return database_column_info[] array of database_column_info objects indexed with column names
385 public function get_columns($table, $usecache=true) {
386 if ($usecache) {
387 if ($this->temptables->is_temptable($table)) {
388 if ($data = $this->get_temp_tables_cache()->get($table)) {
389 return $data;
391 } else {
392 if ($data = $this->get_metacache()->get($table)) {
393 return $data;
398 $structure = array();
400 $tablename = $this->prefix.$table;
402 $sql = "SELECT a.attnum, a.attname AS field, t.typname AS type, a.attlen, a.atttypmod, a.attnotnull, a.atthasdef, d.adsrc
403 FROM pg_catalog.pg_class c
404 JOIN pg_catalog.pg_namespace as ns ON ns.oid = c.relnamespace
405 JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid
406 JOIN pg_catalog.pg_type t ON t.oid = a.atttypid
407 LEFT JOIN pg_catalog.pg_attrdef d ON (d.adrelid = c.oid AND d.adnum = a.attnum)
408 WHERE relkind = 'r' AND c.relname = '$tablename' AND c.reltype > 0 AND a.attnum > 0
409 AND (ns.nspname = current_schema() OR ns.oid = pg_my_temp_schema())
410 ORDER BY a.attnum";
412 $this->query_start($sql, null, SQL_QUERY_AUX);
413 $result = pg_query($this->pgsql, $sql);
414 $this->query_end($result);
416 if (!$result) {
417 return array();
419 while ($rawcolumn = pg_fetch_object($result)) {
421 $info = new stdClass();
422 $info->name = $rawcolumn->field;
423 $matches = null;
425 if ($rawcolumn->type === 'varchar') {
426 $info->type = 'varchar';
427 $info->meta_type = 'C';
428 $info->max_length = $rawcolumn->atttypmod - 4;
429 $info->scale = null;
430 $info->not_null = ($rawcolumn->attnotnull === 't');
431 $info->has_default = ($rawcolumn->atthasdef === 't');
432 if ($info->has_default) {
433 $parts = explode('::', $rawcolumn->adsrc);
434 if (count($parts) > 1) {
435 $info->default_value = reset($parts);
436 $info->default_value = trim($info->default_value, "'");
437 } else {
438 $info->default_value = $rawcolumn->adsrc;
440 } else {
441 $info->default_value = null;
443 $info->primary_key = false;
444 $info->binary = false;
445 $info->unsigned = null;
446 $info->auto_increment= false;
447 $info->unique = null;
449 } else if (preg_match('/int(\d)/i', $rawcolumn->type, $matches)) {
450 $info->type = 'int';
451 if (strpos($rawcolumn->adsrc, 'nextval') === 0) {
452 $info->primary_key = true;
453 $info->meta_type = 'R';
454 $info->unique = true;
455 $info->auto_increment= true;
456 $info->has_default = false;
457 } else {
458 $info->primary_key = false;
459 $info->meta_type = 'I';
460 $info->unique = null;
461 $info->auto_increment= false;
462 $info->has_default = ($rawcolumn->atthasdef === 't');
464 // Return number of decimals, not bytes here.
465 if ($matches[1] >= 8) {
466 $info->max_length = 18;
467 } else if ($matches[1] >= 4) {
468 $info->max_length = 9;
469 } else if ($matches[1] >= 2) {
470 $info->max_length = 4;
471 } else if ($matches[1] >= 1) {
472 $info->max_length = 2;
473 } else {
474 $info->max_length = 0;
476 $info->scale = null;
477 $info->not_null = ($rawcolumn->attnotnull === 't');
478 if ($info->has_default) {
479 // PG 9.5+ uses ::<TYPE> syntax for some defaults.
480 $parts = explode('::', $rawcolumn->adsrc);
481 if (count($parts) > 1) {
482 $info->default_value = reset($parts);
483 } else {
484 $info->default_value = $rawcolumn->adsrc;
486 $info->default_value = trim($info->default_value, "()'");
487 } else {
488 $info->default_value = null;
490 $info->binary = false;
491 $info->unsigned = false;
493 } else if ($rawcolumn->type === 'numeric') {
494 $info->type = $rawcolumn->type;
495 $info->meta_type = 'N';
496 $info->primary_key = false;
497 $info->binary = false;
498 $info->unsigned = null;
499 $info->auto_increment= false;
500 $info->unique = null;
501 $info->not_null = ($rawcolumn->attnotnull === 't');
502 $info->has_default = ($rawcolumn->atthasdef === 't');
503 if ($info->has_default) {
504 // PG 9.5+ uses ::<TYPE> syntax for some defaults.
505 $parts = explode('::', $rawcolumn->adsrc);
506 if (count($parts) > 1) {
507 $info->default_value = reset($parts);
508 } else {
509 $info->default_value = $rawcolumn->adsrc;
511 $info->default_value = trim($info->default_value, "()'");
512 } else {
513 $info->default_value = null;
515 $info->max_length = $rawcolumn->atttypmod >> 16;
516 $info->scale = ($rawcolumn->atttypmod & 0xFFFF) - 4;
518 } else if (preg_match('/float(\d)/i', $rawcolumn->type, $matches)) {
519 $info->type = 'float';
520 $info->meta_type = 'N';
521 $info->primary_key = false;
522 $info->binary = false;
523 $info->unsigned = null;
524 $info->auto_increment= false;
525 $info->unique = null;
526 $info->not_null = ($rawcolumn->attnotnull === 't');
527 $info->has_default = ($rawcolumn->atthasdef === 't');
528 if ($info->has_default) {
529 // PG 9.5+ uses ::<TYPE> syntax for some defaults.
530 $parts = explode('::', $rawcolumn->adsrc);
531 if (count($parts) > 1) {
532 $info->default_value = reset($parts);
533 } else {
534 $info->default_value = $rawcolumn->adsrc;
536 $info->default_value = trim($info->default_value, "()'");
537 } else {
538 $info->default_value = null;
540 // just guess expected number of deciaml places :-(
541 if ($matches[1] == 8) {
542 // total 15 digits
543 $info->max_length = 8;
544 $info->scale = 7;
545 } else {
546 // total 6 digits
547 $info->max_length = 4;
548 $info->scale = 2;
551 } else if ($rawcolumn->type === 'text') {
552 $info->type = $rawcolumn->type;
553 $info->meta_type = 'X';
554 $info->max_length = -1;
555 $info->scale = null;
556 $info->not_null = ($rawcolumn->attnotnull === 't');
557 $info->has_default = ($rawcolumn->atthasdef === 't');
558 if ($info->has_default) {
559 $parts = explode('::', $rawcolumn->adsrc);
560 if (count($parts) > 1) {
561 $info->default_value = reset($parts);
562 $info->default_value = trim($info->default_value, "'");
563 } else {
564 $info->default_value = $rawcolumn->adsrc;
566 } else {
567 $info->default_value = null;
569 $info->primary_key = false;
570 $info->binary = false;
571 $info->unsigned = null;
572 $info->auto_increment= false;
573 $info->unique = null;
575 } else if ($rawcolumn->type === 'bytea') {
576 $info->type = $rawcolumn->type;
577 $info->meta_type = 'B';
578 $info->max_length = -1;
579 $info->scale = null;
580 $info->not_null = ($rawcolumn->attnotnull === 't');
581 $info->has_default = false;
582 $info->default_value = null;
583 $info->primary_key = false;
584 $info->binary = true;
585 $info->unsigned = null;
586 $info->auto_increment= false;
587 $info->unique = null;
591 $structure[$info->name] = new database_column_info($info);
594 pg_free_result($result);
596 if ($usecache) {
597 if ($this->temptables->is_temptable($table)) {
598 $this->get_temp_tables_cache()->set($table, $structure);
599 } else {
600 $this->get_metacache()->set($table, $structure);
604 return $structure;
608 * Normalise values based in RDBMS dependencies (booleans, LOBs...)
610 * @param database_column_info $column column metadata corresponding with the value we are going to normalise
611 * @param mixed $value value we are going to normalise
612 * @return mixed the normalised value
614 protected function normalise_value($column, $value) {
615 $this->detect_objects($value);
617 if (is_bool($value)) { // Always, convert boolean to int
618 $value = (int)$value;
620 } else if ($column->meta_type === 'B') {
621 if (!is_null($value)) {
622 // standard_conforming_strings must be enabled, otherwise pg_escape_bytea() will double escape
623 // \ and produce data errors. This is set on the connection.
624 $value = pg_escape_bytea($this->pgsql, $value);
627 } else if ($value === '') {
628 if ($column->meta_type === 'I' or $column->meta_type === 'F' or $column->meta_type === 'N') {
629 $value = 0; // prevent '' problems in numeric fields
632 return $value;
636 * Is db in unicode mode?
637 * @return bool
639 public function setup_is_unicodedb() {
640 // Get PostgreSQL server_encoding value
641 $sql = "SHOW server_encoding";
642 $this->query_start($sql, null, SQL_QUERY_AUX);
643 $result = pg_query($this->pgsql, $sql);
644 $this->query_end($result);
646 if (!$result) {
647 return false;
649 $rawcolumn = pg_fetch_object($result);
650 $encoding = $rawcolumn->server_encoding;
651 pg_free_result($result);
653 return (strtoupper($encoding) == 'UNICODE' || strtoupper($encoding) == 'UTF8');
657 * Do NOT use in code, to be used by database_manager only!
658 * @param string|array $sql query
659 * @param array|null $tablenames an array of xmldb table names affected by this request.
660 * @return bool true
661 * @throws ddl_change_structure_exception A DDL specific exception is thrown for any errors.
663 public function change_database_structure($sql, $tablenames = null) {
664 $this->get_manager(); // Includes DDL exceptions classes ;-)
665 if (is_array($sql)) {
666 $sql = implode("\n;\n", $sql);
668 if (!$this->is_transaction_started()) {
669 // It is better to do all or nothing, this helps with recovery...
670 $sql = "BEGIN ISOLATION LEVEL SERIALIZABLE;\n$sql\n; COMMIT";
673 try {
674 $this->query_start($sql, null, SQL_QUERY_STRUCTURE);
675 $result = pg_query($this->pgsql, $sql);
676 $this->query_end($result);
677 pg_free_result($result);
678 } catch (ddl_change_structure_exception $e) {
679 if (!$this->is_transaction_started()) {
680 $result = @pg_query($this->pgsql, "ROLLBACK");
681 @pg_free_result($result);
683 $this->reset_caches($tablenames);
684 throw $e;
687 $this->reset_caches($tablenames);
688 return true;
692 * Execute general sql query. Should be used only when no other method suitable.
693 * Do NOT use this to make changes in db structure, use database_manager methods instead!
694 * @param string $sql query
695 * @param array $params query parameters
696 * @return bool true
697 * @throws dml_exception A DML specific exception is thrown for any errors.
699 public function execute($sql, array $params=null) {
700 list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
702 if (strpos($sql, ';') !== false) {
703 throw new coding_exception('moodle_database::execute() Multiple sql statements found or bound parameters not used properly in query!');
706 $this->query_start($sql, $params, SQL_QUERY_UPDATE);
707 $result = pg_query_params($this->pgsql, $sql, $params);
708 $this->query_end($result);
710 pg_free_result($result);
711 return true;
715 * Get a number of records as a moodle_recordset using a SQL statement.
717 * Since this method is a little less readable, use of it should be restricted to
718 * code where it's possible there might be large datasets being returned. For known
719 * small datasets use get_records_sql - it leads to simpler code.
721 * The return type is like:
722 * @see function get_recordset.
724 * @param string $sql the SQL select query to execute.
725 * @param array $params array of sql parameters
726 * @param int $limitfrom return a subset of records, starting at this point (optional, required if $limitnum is set).
727 * @param int $limitnum return a subset comprising this many records (optional, required if $limitfrom is set).
728 * @return moodle_recordset instance
729 * @throws dml_exception A DML specific exception is thrown for any errors.
731 public function get_recordset_sql($sql, array $params=null, $limitfrom=0, $limitnum=0) {
733 list($limitfrom, $limitnum) = $this->normalise_limit_from_num($limitfrom, $limitnum);
735 if ($limitnum) {
736 $sql .= " LIMIT $limitnum";
738 if ($limitfrom) {
739 $sql .= " OFFSET $limitfrom";
742 list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
744 $this->query_start($sql, $params, SQL_QUERY_SELECT);
746 // For any query that doesn't explicitly specify a limit, we must use cursors to stop it
747 // loading the entire thing (unless the config setting is turned off).
748 $usecursors = !$limitnum && ($this->get_fetch_buffer_size() > 0);
749 if ($usecursors) {
750 // Work out the cursor unique identifer. This is based on a simple count used which
751 // should be OK because the identifiers only need to be unique within the current
752 // transaction.
753 $this->cursorcount++;
754 $cursorname = 'crs' . $this->cursorcount;
756 // Do the query to a cursor.
757 $sql = 'DECLARE ' . $cursorname . ' NO SCROLL CURSOR WITH HOLD FOR ' . $sql;
758 $result = pg_query_params($this->pgsql, $sql, $params);
759 } else {
760 $result = pg_query_params($this->pgsql, $sql, $params);
761 $cursorname = '';
764 $this->query_end($result);
765 if ($usecursors) {
766 pg_free_result($result);
767 $result = null;
770 return new pgsql_native_moodle_recordset($result, $this, $cursorname);
774 * Gets size of fetch buffer used for recordset queries.
776 * If this returns 0 then cursors will not be used, meaning recordset queries will occupy enough
777 * memory as needed for the Postgres library to hold the entire query results in memory.
779 * @return int Fetch buffer size or 0 indicating not to use cursors
781 protected function get_fetch_buffer_size() {
782 if (array_key_exists('fetchbuffersize', $this->dboptions)) {
783 return (int)$this->dboptions['fetchbuffersize'];
784 } else {
785 return self::DEFAULT_FETCH_BUFFER_SIZE;
790 * Retrieves data from cursor. For use by recordset only; do not call directly.
792 * Return value contains the next batch of Postgres data, and a boolean indicating if this is
793 * definitely the last batch (if false, there may be more)
795 * @param string $cursorname Name of cursor to read from
796 * @return array Array with 2 elements (next data batch and boolean indicating last batch)
798 public function fetch_from_cursor($cursorname) {
799 $count = $this->get_fetch_buffer_size();
801 $sql = 'FETCH ' . $count . ' FROM ' . $cursorname;
803 $this->query_start($sql, [], SQL_QUERY_AUX);
804 $result = pg_query($this->pgsql, $sql);
805 $last = pg_num_rows($result) !== $count;
807 $this->query_end($result);
809 return [$result, $last];
813 * Closes a cursor. For use by recordset only; do not call directly.
815 * @param string $cursorname Name of cursor to close
816 * @return bool True if we actually closed one, false if the transaction was cancelled
818 public function close_cursor($cursorname) {
819 // If the transaction got cancelled, then ignore this request.
820 $sql = 'CLOSE ' . $cursorname;
821 $this->query_start($sql, [], SQL_QUERY_AUX);
822 $result = pg_query($this->pgsql, $sql);
823 $this->query_end($result);
824 if ($result) {
825 pg_free_result($result);
827 return true;
831 * Get a number of records as an array of objects using a SQL statement.
833 * Return value is like:
834 * @see function get_records.
836 * @param string $sql the SQL select query to execute. The first column of this SELECT statement
837 * must be a unique value (usually the 'id' field), as it will be used as the key of the
838 * returned array.
839 * @param array $params array of sql parameters
840 * @param int $limitfrom return a subset of records, starting at this point (optional, required if $limitnum is set).
841 * @param int $limitnum return a subset comprising this many records (optional, required if $limitfrom is set).
842 * @return array of objects, or empty array if no records were found
843 * @throws dml_exception A DML specific exception is thrown for any errors.
845 public function get_records_sql($sql, array $params=null, $limitfrom=0, $limitnum=0) {
847 list($limitfrom, $limitnum) = $this->normalise_limit_from_num($limitfrom, $limitnum);
849 if ($limitnum) {
850 $sql .= " LIMIT $limitnum";
852 if ($limitfrom) {
853 $sql .= " OFFSET $limitfrom";
856 list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
857 $this->query_start($sql, $params, SQL_QUERY_SELECT);
858 $result = pg_query_params($this->pgsql, $sql, $params);
859 $this->query_end($result);
861 // find out if there are any blobs
862 $numfields = pg_num_fields($result);
863 $blobs = array();
864 for ($i = 0; $i < $numfields; $i++) {
865 $type = pg_field_type($result, $i);
866 if ($type == 'bytea') {
867 $blobs[] = pg_field_name($result, $i);
871 $rows = pg_fetch_all($result);
872 pg_free_result($result);
874 $return = array();
875 if ($rows) {
876 foreach ($rows as $row) {
877 $id = reset($row);
878 if ($blobs) {
879 foreach ($blobs as $blob) {
880 $row[$blob] = ($row[$blob] !== null ? pg_unescape_bytea($row[$blob]) : null);
883 if (isset($return[$id])) {
884 $colname = key($row);
885 debugging("Did you remember to make the first column something unique in your call to get_records? Duplicate value '$id' found in column '$colname'.", DEBUG_DEVELOPER);
887 $return[$id] = (object)$row;
891 return $return;
895 * Selects records and return values (first field) as an array using a SQL statement.
897 * @param string $sql The SQL query
898 * @param array $params array of sql parameters
899 * @return array of values
900 * @throws dml_exception A DML specific exception is thrown for any errors.
902 public function get_fieldset_sql($sql, array $params=null) {
903 list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
905 $this->query_start($sql, $params, SQL_QUERY_SELECT);
906 $result = pg_query_params($this->pgsql, $sql, $params);
907 $this->query_end($result);
909 $return = pg_fetch_all_columns($result, 0);
911 if (pg_field_type($result, 0) == 'bytea') {
912 foreach ($return as $key => $value) {
913 $return[$key] = ($value === null ? $value : pg_unescape_bytea($value));
917 pg_free_result($result);
919 return $return;
923 * Insert new record into database, as fast as possible, no safety checks, lobs not supported.
924 * @param string $table name
925 * @param mixed $params data record as object or array
926 * @param bool $returnit return it of inserted record
927 * @param bool $bulk true means repeated inserts expected
928 * @param bool $customsequence true if 'id' included in $params, disables $returnid
929 * @return bool|int true or new id
930 * @throws dml_exception A DML specific exception is thrown for any errors.
932 public function insert_record_raw($table, $params, $returnid=true, $bulk=false, $customsequence=false) {
933 if (!is_array($params)) {
934 $params = (array)$params;
937 $returning = "";
939 if ($customsequence) {
940 if (!isset($params['id'])) {
941 throw new coding_exception('moodle_database::insert_record_raw() id field must be specified if custom sequences used.');
943 $returnid = false;
944 } else {
945 if ($returnid) {
946 $returning = "RETURNING id";
947 unset($params['id']);
948 } else {
949 unset($params['id']);
953 if (empty($params)) {
954 throw new coding_exception('moodle_database::insert_record_raw() no fields found.');
957 $fields = implode(',', array_keys($params));
958 $values = array();
959 $i = 1;
960 foreach ($params as $value) {
961 $this->detect_objects($value);
962 $values[] = "\$".$i++;
964 $values = implode(',', $values);
966 $sql = "INSERT INTO {$this->prefix}$table ($fields) VALUES($values) $returning";
967 $this->query_start($sql, $params, SQL_QUERY_INSERT);
968 $result = pg_query_params($this->pgsql, $sql, $params);
969 $this->query_end($result);
971 if ($returning !== "") {
972 $row = pg_fetch_assoc($result);
973 $params['id'] = reset($row);
975 pg_free_result($result);
977 if (!$returnid) {
978 return true;
981 return (int)$params['id'];
985 * Insert a record into a table and return the "id" field if required.
987 * Some conversions and safety checks are carried out. Lobs are supported.
988 * If the return ID isn't required, then this just reports success as true/false.
989 * $data is an object containing needed data
990 * @param string $table The database table to be inserted into
991 * @param object $data A data object with values for one or more fields in the record
992 * @param bool $returnid Should the id of the newly created record entry be returned? If this option is not requested then true/false is returned.
993 * @return bool|int true or new id
994 * @throws dml_exception A DML specific exception is thrown for any errors.
996 public function insert_record($table, $dataobject, $returnid=true, $bulk=false) {
997 $dataobject = (array)$dataobject;
999 $columns = $this->get_columns($table);
1000 if (empty($columns)) {
1001 throw new dml_exception('ddltablenotexist', $table);
1004 $cleaned = array();
1006 foreach ($dataobject as $field=>$value) {
1007 if ($field === 'id') {
1008 continue;
1010 if (!isset($columns[$field])) {
1011 continue;
1013 $column = $columns[$field];
1014 $cleaned[$field] = $this->normalise_value($column, $value);
1017 return $this->insert_record_raw($table, $cleaned, $returnid, $bulk);
1022 * Insert multiple records into database as fast as possible.
1024 * Order of inserts is maintained, but the operation is not atomic,
1025 * use transactions if necessary.
1027 * This method is intended for inserting of large number of small objects,
1028 * do not use for huge objects with text or binary fields.
1030 * @since Moodle 2.7
1032 * @param string $table The database table to be inserted into
1033 * @param array|Traversable $dataobjects list of objects to be inserted, must be compatible with foreach
1034 * @return void does not return new record ids
1036 * @throws coding_exception if data objects have different structure
1037 * @throws dml_exception A DML specific exception is thrown for any errors.
1039 public function insert_records($table, $dataobjects) {
1040 if (!is_array($dataobjects) and !($dataobjects instanceof Traversable)) {
1041 throw new coding_exception('insert_records() passed non-traversable object');
1044 // PostgreSQL does not seem to have problems with huge queries.
1045 $chunksize = 500;
1046 if (!empty($this->dboptions['bulkinsertsize'])) {
1047 $chunksize = (int)$this->dboptions['bulkinsertsize'];
1050 $columns = $this->get_columns($table, true);
1052 $fields = null;
1053 $count = 0;
1054 $chunk = array();
1055 foreach ($dataobjects as $dataobject) {
1056 if (!is_array($dataobject) and !is_object($dataobject)) {
1057 throw new coding_exception('insert_records() passed invalid record object');
1059 $dataobject = (array)$dataobject;
1060 if ($fields === null) {
1061 $fields = array_keys($dataobject);
1062 $columns = array_intersect_key($columns, $dataobject);
1063 unset($columns['id']);
1064 } else if ($fields !== array_keys($dataobject)) {
1065 throw new coding_exception('All dataobjects in insert_records() must have the same structure!');
1068 $count++;
1069 $chunk[] = $dataobject;
1071 if ($count === $chunksize) {
1072 $this->insert_chunk($table, $chunk, $columns);
1073 $chunk = array();
1074 $count = 0;
1078 if ($count) {
1079 $this->insert_chunk($table, $chunk, $columns);
1084 * Insert records in chunks, strict param types...
1086 * Note: can be used only from insert_records().
1088 * @param string $table
1089 * @param array $chunk
1090 * @param database_column_info[] $columns
1092 protected function insert_chunk($table, array $chunk, array $columns) {
1093 $i = 1;
1094 $params = array();
1095 $values = array();
1096 foreach ($chunk as $dataobject) {
1097 $vals = array();
1098 foreach ($columns as $field => $column) {
1099 $params[] = $this->normalise_value($column, $dataobject[$field]);
1100 $vals[] = "\$".$i++;
1102 $values[] = '('.implode(',', $vals).')';
1105 $fieldssql = '('.implode(',', array_keys($columns)).')';
1106 $valuessql = implode(',', $values);
1108 $sql = "INSERT INTO {$this->prefix}$table $fieldssql VALUES $valuessql";
1109 $this->query_start($sql, $params, SQL_QUERY_INSERT);
1110 $result = pg_query_params($this->pgsql, $sql, $params);
1111 $this->query_end($result);
1112 pg_free_result($result);
1116 * Import a record into a table, id field is required.
1117 * Safety checks are NOT carried out. Lobs are supported.
1119 * @param string $table name of database table to be inserted into
1120 * @param object $dataobject A data object with values for one or more fields in the record
1121 * @return bool true
1122 * @throws dml_exception A DML specific exception is thrown for any errors.
1124 public function import_record($table, $dataobject) {
1125 $dataobject = (array)$dataobject;
1127 $columns = $this->get_columns($table);
1128 $cleaned = array();
1130 foreach ($dataobject as $field=>$value) {
1131 $this->detect_objects($value);
1132 if (!isset($columns[$field])) {
1133 continue;
1135 $column = $columns[$field];
1136 $cleaned[$field] = $this->normalise_value($column, $value);
1139 return $this->insert_record_raw($table, $cleaned, false, true, true);
1143 * Update record in database, as fast as possible, no safety checks, lobs not supported.
1144 * @param string $table name
1145 * @param mixed $params data record as object or array
1146 * @param bool true means repeated updates expected
1147 * @return bool true
1148 * @throws dml_exception A DML specific exception is thrown for any errors.
1150 public function update_record_raw($table, $params, $bulk=false) {
1151 $params = (array)$params;
1153 if (!isset($params['id'])) {
1154 throw new coding_exception('moodle_database::update_record_raw() id field must be specified.');
1156 $id = $params['id'];
1157 unset($params['id']);
1159 if (empty($params)) {
1160 throw new coding_exception('moodle_database::update_record_raw() no fields found.');
1163 $i = 1;
1165 $sets = array();
1166 foreach ($params as $field=>$value) {
1167 $this->detect_objects($value);
1168 $sets[] = "$field = \$".$i++;
1171 $params[] = $id; // last ? in WHERE condition
1173 $sets = implode(',', $sets);
1174 $sql = "UPDATE {$this->prefix}$table SET $sets WHERE id=\$".$i;
1176 $this->query_start($sql, $params, SQL_QUERY_UPDATE);
1177 $result = pg_query_params($this->pgsql, $sql, $params);
1178 $this->query_end($result);
1180 pg_free_result($result);
1181 return true;
1185 * Update a record in a table
1187 * $dataobject is an object containing needed data
1188 * Relies on $dataobject having a variable "id" to
1189 * specify the record to update
1191 * @param string $table The database table to be checked against.
1192 * @param object $dataobject An object with contents equal to fieldname=>fieldvalue. Must have an entry for 'id' to map to the table specified.
1193 * @param bool true means repeated updates expected
1194 * @return bool true
1195 * @throws dml_exception A DML specific exception is thrown for any errors.
1197 public function update_record($table, $dataobject, $bulk=false) {
1198 $dataobject = (array)$dataobject;
1200 $columns = $this->get_columns($table);
1201 $cleaned = array();
1203 foreach ($dataobject as $field=>$value) {
1204 if (!isset($columns[$field])) {
1205 continue;
1207 $column = $columns[$field];
1208 $cleaned[$field] = $this->normalise_value($column, $value);
1211 $this->update_record_raw($table, $cleaned, $bulk);
1213 return true;
1217 * Set a single field in every table record which match a particular WHERE clause.
1219 * @param string $table The database table to be checked against.
1220 * @param string $newfield the field to set.
1221 * @param string $newvalue the value to set the field to.
1222 * @param string $select A fragment of SQL to be used in a where clause in the SQL call.
1223 * @param array $params array of sql parameters
1224 * @return bool true
1225 * @throws dml_exception A DML specific exception is thrown for any errors.
1227 public function set_field_select($table, $newfield, $newvalue, $select, array $params=null) {
1229 if ($select) {
1230 $select = "WHERE $select";
1232 if (is_null($params)) {
1233 $params = array();
1235 list($select, $params, $type) = $this->fix_sql_params($select, $params);
1236 $i = count($params)+1;
1238 // Get column metadata
1239 $columns = $this->get_columns($table);
1240 $column = $columns[$newfield];
1242 $normalisedvalue = $this->normalise_value($column, $newvalue);
1244 $newfield = "$newfield = \$" . $i;
1245 $params[] = $normalisedvalue;
1246 $sql = "UPDATE {$this->prefix}$table SET $newfield $select";
1248 $this->query_start($sql, $params, SQL_QUERY_UPDATE);
1249 $result = pg_query_params($this->pgsql, $sql, $params);
1250 $this->query_end($result);
1252 pg_free_result($result);
1254 return true;
1258 * Delete one or more records from a table which match a particular WHERE clause, lobs not supported.
1260 * @param string $table The database table to be checked against.
1261 * @param string $select A fragment of SQL to be used in a where clause in the SQL call (used to define the selection criteria).
1262 * @param array $params array of sql parameters
1263 * @return bool true
1264 * @throws dml_exception A DML specific exception is thrown for any errors.
1266 public function delete_records_select($table, $select, array $params=null) {
1267 if ($select) {
1268 $select = "WHERE $select";
1270 $sql = "DELETE FROM {$this->prefix}$table $select";
1272 list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
1274 $this->query_start($sql, $params, SQL_QUERY_UPDATE);
1275 $result = pg_query_params($this->pgsql, $sql, $params);
1276 $this->query_end($result);
1278 pg_free_result($result);
1280 return true;
1284 * Returns 'LIKE' part of a query.
1286 * @param string $fieldname usually name of the table column
1287 * @param string $param usually bound query parameter (?, :named)
1288 * @param bool $casesensitive use case sensitive search
1289 * @param bool $accensensitive use accent sensitive search (not all databases support accent insensitive)
1290 * @param bool $notlike true means "NOT LIKE"
1291 * @param string $escapechar escape char for '%' and '_'
1292 * @return string SQL code fragment
1294 public function sql_like($fieldname, $param, $casesensitive = true, $accentsensitive = true, $notlike = false, $escapechar = '\\') {
1295 if (strpos($param, '%') !== false) {
1296 debugging('Potential SQL injection detected, sql_like() expects bound parameters (? or :named)');
1299 // postgresql does not support accent insensitive text comparisons, sorry
1300 if ($casesensitive) {
1301 $LIKE = $notlike ? 'NOT LIKE' : 'LIKE';
1302 } else {
1303 $LIKE = $notlike ? 'NOT ILIKE' : 'ILIKE';
1305 return "$fieldname $LIKE $param ESCAPE '$escapechar'";
1308 public function sql_bitxor($int1, $int2) {
1309 return '((' . $int1 . ') # (' . $int2 . '))';
1312 public function sql_cast_char2int($fieldname, $text=false) {
1313 return ' CAST(' . $fieldname . ' AS INT) ';
1316 public function sql_cast_char2real($fieldname, $text=false) {
1317 return " $fieldname::real ";
1320 public function sql_concat() {
1321 $arr = func_get_args();
1322 $s = implode(' || ', $arr);
1323 if ($s === '') {
1324 return " '' ";
1326 // Add always empty string element so integer-exclusive concats
1327 // will work without needing to cast each element explicitly
1328 return " '' || $s ";
1331 public function sql_concat_join($separator="' '", $elements=array()) {
1332 for ($n=count($elements)-1; $n > 0 ; $n--) {
1333 array_splice($elements, $n, 0, $separator);
1335 $s = implode(' || ', $elements);
1336 if ($s === '') {
1337 return " '' ";
1339 return " $s ";
1342 public function sql_regex_supported() {
1343 return true;
1346 public function sql_regex($positivematch = true, $casesensitive = false) {
1347 if ($casesensitive) {
1348 return $positivematch ? '~' : '!~';
1349 } else {
1350 return $positivematch ? '~*' : '!~*';
1355 * Does this driver support tool_replace?
1357 * @since Moodle 2.6.1
1358 * @return bool
1360 public function replace_all_text_supported() {
1361 return true;
1364 public function session_lock_supported() {
1365 return true;
1369 * Obtain session lock
1370 * @param int $rowid id of the row with session record
1371 * @param int $timeout max allowed time to wait for the lock in seconds
1372 * @return bool success
1374 public function get_session_lock($rowid, $timeout) {
1375 // NOTE: there is a potential locking problem for database running
1376 // multiple instances of moodle, we could try to use pg_advisory_lock(int, int),
1377 // luckily there is not a big chance that they would collide
1378 if (!$this->session_lock_supported()) {
1379 return;
1382 parent::get_session_lock($rowid, $timeout);
1384 $timeoutmilli = $timeout * 1000;
1386 $sql = "SET statement_timeout TO $timeoutmilli";
1387 $this->query_start($sql, null, SQL_QUERY_AUX);
1388 $result = pg_query($this->pgsql, $sql);
1389 $this->query_end($result);
1391 if ($result) {
1392 pg_free_result($result);
1395 $sql = "SELECT pg_advisory_lock($rowid)";
1396 $this->query_start($sql, null, SQL_QUERY_AUX);
1397 $start = time();
1398 $result = pg_query($this->pgsql, $sql);
1399 $end = time();
1400 try {
1401 $this->query_end($result);
1402 } catch (dml_exception $ex) {
1403 if ($end - $start >= $timeout) {
1404 throw new dml_sessionwait_exception();
1405 } else {
1406 throw $ex;
1410 if ($result) {
1411 pg_free_result($result);
1414 $sql = "SET statement_timeout TO DEFAULT";
1415 $this->query_start($sql, null, SQL_QUERY_AUX);
1416 $result = pg_query($this->pgsql, $sql);
1417 $this->query_end($result);
1419 if ($result) {
1420 pg_free_result($result);
1424 public function release_session_lock($rowid) {
1425 if (!$this->session_lock_supported()) {
1426 return;
1428 if (!$this->used_for_db_sessions) {
1429 return;
1432 parent::release_session_lock($rowid);
1434 $sql = "SELECT pg_advisory_unlock($rowid)";
1435 $this->query_start($sql, null, SQL_QUERY_AUX);
1436 $result = pg_query($this->pgsql, $sql);
1437 $this->query_end($result);
1439 if ($result) {
1440 pg_free_result($result);
1445 * Driver specific start of real database transaction,
1446 * this can not be used directly in code.
1447 * @return void
1449 protected function begin_transaction() {
1450 $this->savepointpresent = true;
1451 $sql = "BEGIN ISOLATION LEVEL READ COMMITTED; SAVEPOINT moodle_pg_savepoint";
1452 $this->query_start($sql, null, SQL_QUERY_AUX);
1453 $result = pg_query($this->pgsql, $sql);
1454 $this->query_end($result);
1456 pg_free_result($result);
1460 * Driver specific commit of real database transaction,
1461 * this can not be used directly in code.
1462 * @return void
1464 protected function commit_transaction() {
1465 $this->savepointpresent = false;
1466 $sql = "RELEASE SAVEPOINT moodle_pg_savepoint; COMMIT";
1467 $this->query_start($sql, null, SQL_QUERY_AUX);
1468 $result = pg_query($this->pgsql, $sql);
1469 $this->query_end($result);
1471 pg_free_result($result);
1475 * Driver specific abort of real database transaction,
1476 * this can not be used directly in code.
1477 * @return void
1479 protected function rollback_transaction() {
1480 $this->savepointpresent = false;
1481 $sql = "RELEASE SAVEPOINT moodle_pg_savepoint; ROLLBACK";
1482 $this->query_start($sql, null, SQL_QUERY_AUX);
1483 $result = pg_query($this->pgsql, $sql);
1484 $this->query_end($result);
1486 pg_free_result($result);
1490 * Helper function trimming (whitespace + quotes) any string
1491 * needed because PG uses to enclose with double quotes some
1492 * fields in indexes definition and others
1494 * @param string $str string to apply whitespace + quotes trim
1495 * @return string trimmed string
1497 private function trim_quotes($str) {
1498 return trim(trim($str), "'\"");
1502 * Postgresql supports full-text search indexes.
1504 * @return bool
1506 public function is_fulltext_search_supported() {
1507 return true;