2 // This file is part of Moodle - http://moodle.org/
4 // Moodle is free software: you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation, either version 3 of the License, or
7 // (at your option) any later version.
9 // Moodle is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
14 // You should have received a copy of the GNU General Public License
15 // along with Moodle. If not, see <http://www.gnu.org/licenses/>.
18 * Native pgsql class representing moodle database interface.
21 * @copyright 2008 Petr Skoda (http://skodak.org)
22 * @license http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later
25 defined('MOODLE_INTERNAL') ||
die();
27 require_once(__DIR__
.'/moodle_database.php');
28 require_once(__DIR__
.'/pgsql_native_moodle_recordset.php');
29 require_once(__DIR__
.'/pgsql_native_moodle_temptables.php');
32 * Native pgsql class representing moodle database interface.
35 * @copyright 2008 Petr Skoda (http://skodak.org)
36 * @license http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later
38 class pgsql_native_moodle_database
extends moodle_database
{
40 /** @var resource $pgsql database resource */
41 protected $pgsql = null;
43 protected $last_error_reporting; // To handle pgsql driver default verbosity
45 /** @var bool savepoint hack for MDL-35506 - workaround for automatic transaction rollback on error */
46 protected $savepointpresent = false;
48 /** @var int Number of cursors used (for constructing a unique ID) */
49 protected $cursorcount = 0;
51 /** @var int Default number of rows to fetch at a time when using recordsets with cursors */
52 const DEFAULT_FETCH_BUFFER_SIZE
= 100000;
55 * Detects if all needed PHP stuff installed.
56 * Note: can be used before connect()
57 * @return mixed true if ok, string if something
59 public function driver_installed() {
60 if (!extension_loaded('pgsql')) {
61 return get_string('pgsqlextensionisnotpresentinphp', 'install');
67 * Returns database family type - describes SQL dialect
68 * Note: can be used before connect()
69 * @return string db family name (mysql, postgres, mssql, oracle, etc.)
71 public function get_dbfamily() {
76 * Returns more specific database driver type
77 * Note: can be used before connect()
78 * @return string db type mysqli, pgsql, oci, mssql, sqlsrv
80 protected function get_dbtype() {
85 * Returns general database library name
86 * Note: can be used before connect()
87 * @return string db type pdo, native
89 protected function get_dblibrary() {
94 * Returns localised database type name
95 * Note: can be used before connect()
98 public function get_name() {
99 return get_string('nativepgsql', 'install');
103 * Returns localised database configuration help.
104 * Note: can be used before connect()
107 public function get_configuration_help() {
108 return get_string('nativepgsqlhelp', 'install');
113 * Must be called before other methods.
114 * @param string $dbhost The database host.
115 * @param string $dbuser The database username.
116 * @param string $dbpass The database username's password.
117 * @param string $dbname The name of the database being connected to.
118 * @param mixed $prefix string means moodle db prefix, false used for external databases where prefix not used
119 * @param array $dboptions driver specific options
121 * @throws dml_connection_exception if error
123 public function connect($dbhost, $dbuser, $dbpass, $dbname, $prefix, array $dboptions=null) {
124 if ($prefix == '' and !$this->external
) {
125 //Enforce prefixes for everybody but mysql
126 throw new dml_exception('prefixcannotbeempty', $this->get_dbfamily());
129 $driverstatus = $this->driver_installed();
131 if ($driverstatus !== true) {
132 throw new dml_exception('dbdriverproblem', $driverstatus);
135 $this->store_settings($dbhost, $dbuser, $dbpass, $dbname, $prefix, $dboptions);
137 $pass = addcslashes($this->dbpass
, "'\\");
139 // Unix socket connections should have lower overhead
140 if (!empty($this->dboptions
['dbsocket']) and ($this->dbhost
=== 'localhost' or $this->dbhost
=== '127.0.0.1')) {
141 $connection = "user='$this->dbuser' password='$pass' dbname='$this->dbname'";
142 if (strpos($this->dboptions
['dbsocket'], '/') !== false) {
143 // A directory was specified as the socket location.
144 $connection .= " host='".$this->dboptions
['dbsocket']."'";
146 if (!empty($this->dboptions
['dbport'])) {
147 // A port as specified, add it to the connection as it's used as part of the socket path.
148 $connection .= " port ='".$this->dboptions
['dbport']."'";
151 $this->dboptions
['dbsocket'] = '';
152 if (empty($this->dbname
)) {
153 // probably old style socket connection - do not add port
155 } else if (empty($this->dboptions
['dbport'])) {
156 $port = "port ='5432'";
158 $port = "port ='".$this->dboptions
['dbport']."'";
160 $connection = "host='$this->dbhost' $port user='$this->dbuser' password='$pass' dbname='$this->dbname'";
163 if (empty($this->dboptions
['dbhandlesoptions'])) {
164 // ALTER USER and ALTER DATABASE are overridden by these settings.
165 $options = array('--client_encoding=utf8', '--standard_conforming_strings=on');
166 // Select schema if specified, otherwise the first one wins.
167 if (!empty($this->dboptions
['dbschema'])) {
168 $options[] = "-c search_path=" . addcslashes($this->dboptions
['dbschema'], "'\\");
171 $connection .= " options='" . implode(' ', $options) . "'";
175 if (empty($this->dboptions
['dbpersist'])) {
176 $this->pgsql
= pg_connect($connection, PGSQL_CONNECT_FORCE_NEW
);
178 $this->pgsql
= pg_pconnect($connection, PGSQL_CONNECT_FORCE_NEW
);
180 $dberr = ob_get_contents();
183 $status = pg_connection_status($this->pgsql
);
185 if ($status === false or $status === PGSQL_CONNECTION_BAD
) {
187 throw new dml_connection_exception($dberr);
190 if (!empty($this->dboptions
['dbhandlesoptions'])) {
191 /* We don't trust people who just set the dbhandlesoptions, this code checks up on them.
192 * These functions do not talk to the server, they use the client library knowledge to determine state.
194 if (!empty($this->dboptions
['dbschema'])) {
195 throw new dml_connection_exception('You cannot specify a schema with dbhandlesoptions, use the database to set it.');
197 if (pg_client_encoding($this->pgsql
) != 'UTF8') {
198 throw new dml_connection_exception('client_encoding = UTF8 not set, it is: ' . pg_client_encoding($this->pgsql
));
200 if (pg_escape_string($this->pgsql
, '\\') != '\\') {
201 throw new dml_connection_exception('standard_conforming_strings = on, must be set at the database.');
205 // Connection stabilised and configured, going to instantiate the temptables controller
206 $this->temptables
= new pgsql_native_moodle_temptables($this);
212 * Close database connection and release all resources
213 * and memory (especially circular memory references).
214 * Do NOT use connect() again, create a new instance if needed.
216 public function dispose() {
217 parent
::dispose(); // Call parent dispose to write/close session and other common stuff before closing connection
219 pg_close($this->pgsql
);
226 * Called before each db query.
228 * @param array array of parameters
229 * @param int $type type of query
230 * @param mixed $extrainfo driver specific extra information
233 protected function query_start($sql, array $params=null, $type, $extrainfo=null) {
234 parent
::query_start($sql, $params, $type, $extrainfo);
235 // pgsql driver tents to send debug to output, we do not need that ;-)
236 $this->last_error_reporting
= error_reporting(0);
240 * Called immediately after each db query.
241 * @param mixed db specific result
244 protected function query_end($result) {
245 // reset original debug level
246 error_reporting($this->last_error_reporting
);
248 parent
::query_end($result);
249 if ($this->savepointpresent
and $this->last_type
!= SQL_QUERY_AUX
and $this->last_type
!= SQL_QUERY_SELECT
) {
250 $res = @pg_query
($this->pgsql
, "RELEASE SAVEPOINT moodle_pg_savepoint; SAVEPOINT moodle_pg_savepoint");
252 pg_free_result($res);
255 } catch (Exception
$e) {
256 if ($this->savepointpresent
) {
257 $res = @pg_query
($this->pgsql
, "ROLLBACK TO SAVEPOINT moodle_pg_savepoint; SAVEPOINT moodle_pg_savepoint");
259 pg_free_result($res);
267 * Returns database server info array
268 * @return array Array containing 'description' and 'version' info
270 public function get_server_info() {
273 $this->query_start("--pg_version()", null, SQL_QUERY_AUX
);
274 $info = pg_version($this->pgsql
);
275 $this->query_end(true);
277 return array('description'=>$info['server'], 'version'=>$info['server']);
281 * Returns supported query parameter types
282 * @return int bitmask of accepted SQL_PARAMS_*
284 protected function allowed_param_types() {
285 return SQL_PARAMS_DOLLAR
;
289 * Returns last error reported by database engine.
290 * @return string error message
292 public function get_last_error() {
293 return pg_last_error($this->pgsql
);
297 * Return tables in database WITHOUT current prefix.
298 * @param bool $usecache if true, returns list of cached tables.
299 * @return array of table names in lowercase and without prefix
301 public function get_tables($usecache=true) {
302 if ($usecache and $this->tables
!== null) {
303 return $this->tables
;
305 $this->tables
= array();
306 $prefix = str_replace('_', '|_', $this->prefix
);
307 $sql = "SELECT c.relname
308 FROM pg_catalog.pg_class c
309 JOIN pg_catalog.pg_namespace as ns ON ns.oid = c.relnamespace
310 WHERE c.relname LIKE '$prefix%' ESCAPE '|'
312 AND (ns.nspname = current_schema() OR ns.oid = pg_my_temp_schema())";
313 $this->query_start($sql, null, SQL_QUERY_AUX
);
314 $result = pg_query($this->pgsql
, $sql);
315 $this->query_end($result);
318 while ($row = pg_fetch_row($result)) {
319 $tablename = reset($row);
320 if ($this->prefix
!== false && $this->prefix
!== '') {
321 if (strpos($tablename, $this->prefix
) !== 0) {
324 $tablename = substr($tablename, strlen($this->prefix
));
326 $this->tables
[$tablename] = $tablename;
328 pg_free_result($result);
330 return $this->tables
;
334 * Return table indexes - everything lowercased.
335 * @param string $table The table we want to get indexes from.
336 * @return array of arrays
338 public function get_indexes($table) {
340 $tablename = $this->prefix
.$table;
343 FROM pg_catalog.pg_indexes i
344 JOIN pg_catalog.pg_namespace as ns ON ns.nspname = i.schemaname
345 WHERE i.tablename = '$tablename'
346 AND (i.schemaname = current_schema() OR ns.oid = pg_my_temp_schema())";
348 $this->query_start($sql, null, SQL_QUERY_AUX
);
349 $result = pg_query($this->pgsql
, $sql);
350 $this->query_end($result);
353 while ($row = pg_fetch_assoc($result)) {
354 // The index definition could be generated schema-qualifying the target table name
355 // for safety, depending on the pgsql version (CVE-2018-1058).
356 if (!preg_match('/CREATE (|UNIQUE )INDEX ([^\s]+) ON (|'.$row['schemaname'].'\.)'.$tablename.' USING ([^\s]+) \(([^\)]+)\)/i', $row['indexdef'], $matches)) {
359 if ($matches[5] === 'id') {
362 $columns = explode(',', $matches[5]);
363 foreach ($columns as $k=>$column) {
364 $column = trim($column);
365 if ($pos = strpos($column, ' ')) {
366 // index type is separated by space
367 $column = substr($column, 0, $pos);
369 $columns[$k] = $this->trim_quotes($column);
371 $indexes[$row['indexname']] = array('unique'=>!empty($matches[1]),
372 'columns'=>$columns);
374 pg_free_result($result);
380 * Returns detailed information about columns in table. This information is cached internally.
381 * @param string $table name
382 * @param bool $usecache
383 * @return database_column_info[] array of database_column_info objects indexed with column names
385 public function get_columns($table, $usecache=true) {
387 if ($this->temptables
->is_temptable($table)) {
388 if ($data = $this->get_temp_tables_cache()->get($table)) {
392 if ($data = $this->get_metacache()->get($table)) {
398 $structure = array();
400 $tablename = $this->prefix
.$table;
402 $sql = "SELECT a.attnum, a.attname AS field, t.typname AS type, a.attlen, a.atttypmod, a.attnotnull, a.atthasdef, d.adsrc
403 FROM pg_catalog.pg_class c
404 JOIN pg_catalog.pg_namespace as ns ON ns.oid = c.relnamespace
405 JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid
406 JOIN pg_catalog.pg_type t ON t.oid = a.atttypid
407 LEFT JOIN pg_catalog.pg_attrdef d ON (d.adrelid = c.oid AND d.adnum = a.attnum)
408 WHERE relkind = 'r' AND c.relname = '$tablename' AND c.reltype > 0 AND a.attnum > 0
409 AND (ns.nspname = current_schema() OR ns.oid = pg_my_temp_schema())
412 $this->query_start($sql, null, SQL_QUERY_AUX
);
413 $result = pg_query($this->pgsql
, $sql);
414 $this->query_end($result);
419 while ($rawcolumn = pg_fetch_object($result)) {
421 $info = new stdClass();
422 $info->name
= $rawcolumn->field
;
425 if ($rawcolumn->type
=== 'varchar') {
426 $info->type
= 'varchar';
427 $info->meta_type
= 'C';
428 $info->max_length
= $rawcolumn->atttypmod
- 4;
430 $info->not_null
= ($rawcolumn->attnotnull
=== 't');
431 $info->has_default
= ($rawcolumn->atthasdef
=== 't');
432 if ($info->has_default
) {
433 $parts = explode('::', $rawcolumn->adsrc
);
434 if (count($parts) > 1) {
435 $info->default_value
= reset($parts);
436 $info->default_value
= trim($info->default_value
, "'");
438 $info->default_value
= $rawcolumn->adsrc
;
441 $info->default_value
= null;
443 $info->primary_key
= false;
444 $info->binary
= false;
445 $info->unsigned
= null;
446 $info->auto_increment
= false;
447 $info->unique
= null;
449 } else if (preg_match('/int(\d)/i', $rawcolumn->type
, $matches)) {
451 if (strpos($rawcolumn->adsrc
, 'nextval') === 0) {
452 $info->primary_key
= true;
453 $info->meta_type
= 'R';
454 $info->unique
= true;
455 $info->auto_increment
= true;
456 $info->has_default
= false;
458 $info->primary_key
= false;
459 $info->meta_type
= 'I';
460 $info->unique
= null;
461 $info->auto_increment
= false;
462 $info->has_default
= ($rawcolumn->atthasdef
=== 't');
464 // Return number of decimals, not bytes here.
465 if ($matches[1] >= 8) {
466 $info->max_length
= 18;
467 } else if ($matches[1] >= 4) {
468 $info->max_length
= 9;
469 } else if ($matches[1] >= 2) {
470 $info->max_length
= 4;
471 } else if ($matches[1] >= 1) {
472 $info->max_length
= 2;
474 $info->max_length
= 0;
477 $info->not_null
= ($rawcolumn->attnotnull
=== 't');
478 if ($info->has_default
) {
479 // PG 9.5+ uses ::<TYPE> syntax for some defaults.
480 $parts = explode('::', $rawcolumn->adsrc
);
481 if (count($parts) > 1) {
482 $info->default_value
= reset($parts);
484 $info->default_value
= $rawcolumn->adsrc
;
486 $info->default_value
= trim($info->default_value
, "()'");
488 $info->default_value
= null;
490 $info->binary
= false;
491 $info->unsigned
= false;
493 } else if ($rawcolumn->type
=== 'numeric') {
494 $info->type
= $rawcolumn->type
;
495 $info->meta_type
= 'N';
496 $info->primary_key
= false;
497 $info->binary
= false;
498 $info->unsigned
= null;
499 $info->auto_increment
= false;
500 $info->unique
= null;
501 $info->not_null
= ($rawcolumn->attnotnull
=== 't');
502 $info->has_default
= ($rawcolumn->atthasdef
=== 't');
503 if ($info->has_default
) {
504 // PG 9.5+ uses ::<TYPE> syntax for some defaults.
505 $parts = explode('::', $rawcolumn->adsrc
);
506 if (count($parts) > 1) {
507 $info->default_value
= reset($parts);
509 $info->default_value
= $rawcolumn->adsrc
;
511 $info->default_value
= trim($info->default_value
, "()'");
513 $info->default_value
= null;
515 $info->max_length
= $rawcolumn->atttypmod
>> 16;
516 $info->scale
= ($rawcolumn->atttypmod
& 0xFFFF) - 4;
518 } else if (preg_match('/float(\d)/i', $rawcolumn->type
, $matches)) {
519 $info->type
= 'float';
520 $info->meta_type
= 'N';
521 $info->primary_key
= false;
522 $info->binary
= false;
523 $info->unsigned
= null;
524 $info->auto_increment
= false;
525 $info->unique
= null;
526 $info->not_null
= ($rawcolumn->attnotnull
=== 't');
527 $info->has_default
= ($rawcolumn->atthasdef
=== 't');
528 if ($info->has_default
) {
529 // PG 9.5+ uses ::<TYPE> syntax for some defaults.
530 $parts = explode('::', $rawcolumn->adsrc
);
531 if (count($parts) > 1) {
532 $info->default_value
= reset($parts);
534 $info->default_value
= $rawcolumn->adsrc
;
536 $info->default_value
= trim($info->default_value
, "()'");
538 $info->default_value
= null;
540 // just guess expected number of deciaml places :-(
541 if ($matches[1] == 8) {
543 $info->max_length
= 8;
547 $info->max_length
= 4;
551 } else if ($rawcolumn->type
=== 'text') {
552 $info->type
= $rawcolumn->type
;
553 $info->meta_type
= 'X';
554 $info->max_length
= -1;
556 $info->not_null
= ($rawcolumn->attnotnull
=== 't');
557 $info->has_default
= ($rawcolumn->atthasdef
=== 't');
558 if ($info->has_default
) {
559 $parts = explode('::', $rawcolumn->adsrc
);
560 if (count($parts) > 1) {
561 $info->default_value
= reset($parts);
562 $info->default_value
= trim($info->default_value
, "'");
564 $info->default_value
= $rawcolumn->adsrc
;
567 $info->default_value
= null;
569 $info->primary_key
= false;
570 $info->binary
= false;
571 $info->unsigned
= null;
572 $info->auto_increment
= false;
573 $info->unique
= null;
575 } else if ($rawcolumn->type
=== 'bytea') {
576 $info->type
= $rawcolumn->type
;
577 $info->meta_type
= 'B';
578 $info->max_length
= -1;
580 $info->not_null
= ($rawcolumn->attnotnull
=== 't');
581 $info->has_default
= false;
582 $info->default_value
= null;
583 $info->primary_key
= false;
584 $info->binary
= true;
585 $info->unsigned
= null;
586 $info->auto_increment
= false;
587 $info->unique
= null;
591 $structure[$info->name
] = new database_column_info($info);
594 pg_free_result($result);
597 if ($this->temptables
->is_temptable($table)) {
598 $this->get_temp_tables_cache()->set($table, $structure);
600 $this->get_metacache()->set($table, $structure);
608 * Normalise values based in RDBMS dependencies (booleans, LOBs...)
610 * @param database_column_info $column column metadata corresponding with the value we are going to normalise
611 * @param mixed $value value we are going to normalise
612 * @return mixed the normalised value
614 protected function normalise_value($column, $value) {
615 $this->detect_objects($value);
617 if (is_bool($value)) { // Always, convert boolean to int
618 $value = (int)$value;
620 } else if ($column->meta_type
=== 'B') {
621 if (!is_null($value)) {
622 // standard_conforming_strings must be enabled, otherwise pg_escape_bytea() will double escape
623 // \ and produce data errors. This is set on the connection.
624 $value = pg_escape_bytea($this->pgsql
, $value);
627 } else if ($value === '') {
628 if ($column->meta_type
=== 'I' or $column->meta_type
=== 'F' or $column->meta_type
=== 'N') {
629 $value = 0; // prevent '' problems in numeric fields
636 * Is db in unicode mode?
639 public function setup_is_unicodedb() {
640 // Get PostgreSQL server_encoding value
641 $sql = "SHOW server_encoding";
642 $this->query_start($sql, null, SQL_QUERY_AUX
);
643 $result = pg_query($this->pgsql
, $sql);
644 $this->query_end($result);
649 $rawcolumn = pg_fetch_object($result);
650 $encoding = $rawcolumn->server_encoding
;
651 pg_free_result($result);
653 return (strtoupper($encoding) == 'UNICODE' ||
strtoupper($encoding) == 'UTF8');
657 * Do NOT use in code, to be used by database_manager only!
658 * @param string|array $sql query
659 * @param array|null $tablenames an array of xmldb table names affected by this request.
661 * @throws ddl_change_structure_exception A DDL specific exception is thrown for any errors.
663 public function change_database_structure($sql, $tablenames = null) {
664 $this->get_manager(); // Includes DDL exceptions classes ;-)
665 if (is_array($sql)) {
666 $sql = implode("\n;\n", $sql);
668 if (!$this->is_transaction_started()) {
669 // It is better to do all or nothing, this helps with recovery...
670 $sql = "BEGIN ISOLATION LEVEL SERIALIZABLE;\n$sql\n; COMMIT";
674 $this->query_start($sql, null, SQL_QUERY_STRUCTURE
);
675 $result = pg_query($this->pgsql
, $sql);
676 $this->query_end($result);
677 pg_free_result($result);
678 } catch (ddl_change_structure_exception
$e) {
679 if (!$this->is_transaction_started()) {
680 $result = @pg_query
($this->pgsql
, "ROLLBACK");
681 @pg_free_result
($result);
683 $this->reset_caches($tablenames);
687 $this->reset_caches($tablenames);
692 * Execute general sql query. Should be used only when no other method suitable.
693 * Do NOT use this to make changes in db structure, use database_manager methods instead!
694 * @param string $sql query
695 * @param array $params query parameters
697 * @throws dml_exception A DML specific exception is thrown for any errors.
699 public function execute($sql, array $params=null) {
700 list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
702 if (strpos($sql, ';') !== false) {
703 throw new coding_exception('moodle_database::execute() Multiple sql statements found or bound parameters not used properly in query!');
706 $this->query_start($sql, $params, SQL_QUERY_UPDATE
);
707 $result = pg_query_params($this->pgsql
, $sql, $params);
708 $this->query_end($result);
710 pg_free_result($result);
715 * Get a number of records as a moodle_recordset using a SQL statement.
717 * Since this method is a little less readable, use of it should be restricted to
718 * code where it's possible there might be large datasets being returned. For known
719 * small datasets use get_records_sql - it leads to simpler code.
721 * The return type is like:
722 * @see function get_recordset.
724 * @param string $sql the SQL select query to execute.
725 * @param array $params array of sql parameters
726 * @param int $limitfrom return a subset of records, starting at this point (optional, required if $limitnum is set).
727 * @param int $limitnum return a subset comprising this many records (optional, required if $limitfrom is set).
728 * @return moodle_recordset instance
729 * @throws dml_exception A DML specific exception is thrown for any errors.
731 public function get_recordset_sql($sql, array $params=null, $limitfrom=0, $limitnum=0) {
733 list($limitfrom, $limitnum) = $this->normalise_limit_from_num($limitfrom, $limitnum);
736 $sql .= " LIMIT $limitnum";
739 $sql .= " OFFSET $limitfrom";
742 list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
744 $this->query_start($sql, $params, SQL_QUERY_SELECT
);
746 // For any query that doesn't explicitly specify a limit, we must use cursors to stop it
747 // loading the entire thing (unless the config setting is turned off).
748 $usecursors = !$limitnum && ($this->get_fetch_buffer_size() > 0);
750 // Work out the cursor unique identifer. This is based on a simple count used which
751 // should be OK because the identifiers only need to be unique within the current
753 $this->cursorcount++
;
754 $cursorname = 'crs' . $this->cursorcount
;
756 // Do the query to a cursor.
757 $sql = 'DECLARE ' . $cursorname . ' NO SCROLL CURSOR WITH HOLD FOR ' . $sql;
758 $result = pg_query_params($this->pgsql
, $sql, $params);
760 $result = pg_query_params($this->pgsql
, $sql, $params);
764 $this->query_end($result);
766 pg_free_result($result);
770 return new pgsql_native_moodle_recordset($result, $this, $cursorname);
774 * Gets size of fetch buffer used for recordset queries.
776 * If this returns 0 then cursors will not be used, meaning recordset queries will occupy enough
777 * memory as needed for the Postgres library to hold the entire query results in memory.
779 * @return int Fetch buffer size or 0 indicating not to use cursors
781 protected function get_fetch_buffer_size() {
782 if (array_key_exists('fetchbuffersize', $this->dboptions
)) {
783 return (int)$this->dboptions
['fetchbuffersize'];
785 return self
::DEFAULT_FETCH_BUFFER_SIZE
;
790 * Retrieves data from cursor. For use by recordset only; do not call directly.
792 * Return value contains the next batch of Postgres data, and a boolean indicating if this is
793 * definitely the last batch (if false, there may be more)
795 * @param string $cursorname Name of cursor to read from
796 * @return array Array with 2 elements (next data batch and boolean indicating last batch)
798 public function fetch_from_cursor($cursorname) {
799 $count = $this->get_fetch_buffer_size();
801 $sql = 'FETCH ' . $count . ' FROM ' . $cursorname;
803 $this->query_start($sql, [], SQL_QUERY_AUX
);
804 $result = pg_query($this->pgsql
, $sql);
805 $last = pg_num_rows($result) !== $count;
807 $this->query_end($result);
809 return [$result, $last];
813 * Closes a cursor. For use by recordset only; do not call directly.
815 * @param string $cursorname Name of cursor to close
816 * @return bool True if we actually closed one, false if the transaction was cancelled
818 public function close_cursor($cursorname) {
819 // If the transaction got cancelled, then ignore this request.
820 $sql = 'CLOSE ' . $cursorname;
821 $this->query_start($sql, [], SQL_QUERY_AUX
);
822 $result = pg_query($this->pgsql
, $sql);
823 $this->query_end($result);
825 pg_free_result($result);
831 * Get a number of records as an array of objects using a SQL statement.
833 * Return value is like:
834 * @see function get_records.
836 * @param string $sql the SQL select query to execute. The first column of this SELECT statement
837 * must be a unique value (usually the 'id' field), as it will be used as the key of the
839 * @param array $params array of sql parameters
840 * @param int $limitfrom return a subset of records, starting at this point (optional, required if $limitnum is set).
841 * @param int $limitnum return a subset comprising this many records (optional, required if $limitfrom is set).
842 * @return array of objects, or empty array if no records were found
843 * @throws dml_exception A DML specific exception is thrown for any errors.
845 public function get_records_sql($sql, array $params=null, $limitfrom=0, $limitnum=0) {
847 list($limitfrom, $limitnum) = $this->normalise_limit_from_num($limitfrom, $limitnum);
850 $sql .= " LIMIT $limitnum";
853 $sql .= " OFFSET $limitfrom";
856 list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
857 $this->query_start($sql, $params, SQL_QUERY_SELECT
);
858 $result = pg_query_params($this->pgsql
, $sql, $params);
859 $this->query_end($result);
861 // find out if there are any blobs
862 $numfields = pg_num_fields($result);
864 for ($i = 0; $i < $numfields; $i++
) {
865 $type = pg_field_type($result, $i);
866 if ($type == 'bytea') {
867 $blobs[] = pg_field_name($result, $i);
871 $rows = pg_fetch_all($result);
872 pg_free_result($result);
876 foreach ($rows as $row) {
879 foreach ($blobs as $blob) {
880 $row[$blob] = ($row[$blob] !== null ?
pg_unescape_bytea($row[$blob]) : null);
883 if (isset($return[$id])) {
884 $colname = key($row);
885 debugging("Did you remember to make the first column something unique in your call to get_records? Duplicate value '$id' found in column '$colname'.", DEBUG_DEVELOPER
);
887 $return[$id] = (object)$row;
895 * Selects records and return values (first field) as an array using a SQL statement.
897 * @param string $sql The SQL query
898 * @param array $params array of sql parameters
899 * @return array of values
900 * @throws dml_exception A DML specific exception is thrown for any errors.
902 public function get_fieldset_sql($sql, array $params=null) {
903 list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
905 $this->query_start($sql, $params, SQL_QUERY_SELECT
);
906 $result = pg_query_params($this->pgsql
, $sql, $params);
907 $this->query_end($result);
909 $return = pg_fetch_all_columns($result, 0);
911 if (pg_field_type($result, 0) == 'bytea') {
912 foreach ($return as $key => $value) {
913 $return[$key] = ($value === null ?
$value : pg_unescape_bytea($value));
917 pg_free_result($result);
923 * Insert new record into database, as fast as possible, no safety checks, lobs not supported.
924 * @param string $table name
925 * @param mixed $params data record as object or array
926 * @param bool $returnit return it of inserted record
927 * @param bool $bulk true means repeated inserts expected
928 * @param bool $customsequence true if 'id' included in $params, disables $returnid
929 * @return bool|int true or new id
930 * @throws dml_exception A DML specific exception is thrown for any errors.
932 public function insert_record_raw($table, $params, $returnid=true, $bulk=false, $customsequence=false) {
933 if (!is_array($params)) {
934 $params = (array)$params;
939 if ($customsequence) {
940 if (!isset($params['id'])) {
941 throw new coding_exception('moodle_database::insert_record_raw() id field must be specified if custom sequences used.');
946 $returning = "RETURNING id";
947 unset($params['id']);
949 unset($params['id']);
953 if (empty($params)) {
954 throw new coding_exception('moodle_database::insert_record_raw() no fields found.');
957 $fields = implode(',', array_keys($params));
960 foreach ($params as $value) {
961 $this->detect_objects($value);
962 $values[] = "\$".$i++
;
964 $values = implode(',', $values);
966 $sql = "INSERT INTO {$this->prefix}$table ($fields) VALUES($values) $returning";
967 $this->query_start($sql, $params, SQL_QUERY_INSERT
);
968 $result = pg_query_params($this->pgsql
, $sql, $params);
969 $this->query_end($result);
971 if ($returning !== "") {
972 $row = pg_fetch_assoc($result);
973 $params['id'] = reset($row);
975 pg_free_result($result);
981 return (int)$params['id'];
985 * Insert a record into a table and return the "id" field if required.
987 * Some conversions and safety checks are carried out. Lobs are supported.
988 * If the return ID isn't required, then this just reports success as true/false.
989 * $data is an object containing needed data
990 * @param string $table The database table to be inserted into
991 * @param object $data A data object with values for one or more fields in the record
992 * @param bool $returnid Should the id of the newly created record entry be returned? If this option is not requested then true/false is returned.
993 * @return bool|int true or new id
994 * @throws dml_exception A DML specific exception is thrown for any errors.
996 public function insert_record($table, $dataobject, $returnid=true, $bulk=false) {
997 $dataobject = (array)$dataobject;
999 $columns = $this->get_columns($table);
1000 if (empty($columns)) {
1001 throw new dml_exception('ddltablenotexist', $table);
1006 foreach ($dataobject as $field=>$value) {
1007 if ($field === 'id') {
1010 if (!isset($columns[$field])) {
1013 $column = $columns[$field];
1014 $cleaned[$field] = $this->normalise_value($column, $value);
1017 return $this->insert_record_raw($table, $cleaned, $returnid, $bulk);
1022 * Insert multiple records into database as fast as possible.
1024 * Order of inserts is maintained, but the operation is not atomic,
1025 * use transactions if necessary.
1027 * This method is intended for inserting of large number of small objects,
1028 * do not use for huge objects with text or binary fields.
1032 * @param string $table The database table to be inserted into
1033 * @param array|Traversable $dataobjects list of objects to be inserted, must be compatible with foreach
1034 * @return void does not return new record ids
1036 * @throws coding_exception if data objects have different structure
1037 * @throws dml_exception A DML specific exception is thrown for any errors.
1039 public function insert_records($table, $dataobjects) {
1040 if (!is_array($dataobjects) and !($dataobjects instanceof Traversable
)) {
1041 throw new coding_exception('insert_records() passed non-traversable object');
1044 // PostgreSQL does not seem to have problems with huge queries.
1046 if (!empty($this->dboptions
['bulkinsertsize'])) {
1047 $chunksize = (int)$this->dboptions
['bulkinsertsize'];
1050 $columns = $this->get_columns($table, true);
1055 foreach ($dataobjects as $dataobject) {
1056 if (!is_array($dataobject) and !is_object($dataobject)) {
1057 throw new coding_exception('insert_records() passed invalid record object');
1059 $dataobject = (array)$dataobject;
1060 if ($fields === null) {
1061 $fields = array_keys($dataobject);
1062 $columns = array_intersect_key($columns, $dataobject);
1063 unset($columns['id']);
1064 } else if ($fields !== array_keys($dataobject)) {
1065 throw new coding_exception('All dataobjects in insert_records() must have the same structure!');
1069 $chunk[] = $dataobject;
1071 if ($count === $chunksize) {
1072 $this->insert_chunk($table, $chunk, $columns);
1079 $this->insert_chunk($table, $chunk, $columns);
1084 * Insert records in chunks, strict param types...
1086 * Note: can be used only from insert_records().
1088 * @param string $table
1089 * @param array $chunk
1090 * @param database_column_info[] $columns
1092 protected function insert_chunk($table, array $chunk, array $columns) {
1096 foreach ($chunk as $dataobject) {
1098 foreach ($columns as $field => $column) {
1099 $params[] = $this->normalise_value($column, $dataobject[$field]);
1100 $vals[] = "\$".$i++
;
1102 $values[] = '('.implode(',', $vals).')';
1105 $fieldssql = '('.implode(',', array_keys($columns)).')';
1106 $valuessql = implode(',', $values);
1108 $sql = "INSERT INTO {$this->prefix}$table $fieldssql VALUES $valuessql";
1109 $this->query_start($sql, $params, SQL_QUERY_INSERT
);
1110 $result = pg_query_params($this->pgsql
, $sql, $params);
1111 $this->query_end($result);
1112 pg_free_result($result);
1116 * Import a record into a table, id field is required.
1117 * Safety checks are NOT carried out. Lobs are supported.
1119 * @param string $table name of database table to be inserted into
1120 * @param object $dataobject A data object with values for one or more fields in the record
1122 * @throws dml_exception A DML specific exception is thrown for any errors.
1124 public function import_record($table, $dataobject) {
1125 $dataobject = (array)$dataobject;
1127 $columns = $this->get_columns($table);
1130 foreach ($dataobject as $field=>$value) {
1131 $this->detect_objects($value);
1132 if (!isset($columns[$field])) {
1135 $column = $columns[$field];
1136 $cleaned[$field] = $this->normalise_value($column, $value);
1139 return $this->insert_record_raw($table, $cleaned, false, true, true);
1143 * Update record in database, as fast as possible, no safety checks, lobs not supported.
1144 * @param string $table name
1145 * @param mixed $params data record as object or array
1146 * @param bool true means repeated updates expected
1148 * @throws dml_exception A DML specific exception is thrown for any errors.
1150 public function update_record_raw($table, $params, $bulk=false) {
1151 $params = (array)$params;
1153 if (!isset($params['id'])) {
1154 throw new coding_exception('moodle_database::update_record_raw() id field must be specified.');
1156 $id = $params['id'];
1157 unset($params['id']);
1159 if (empty($params)) {
1160 throw new coding_exception('moodle_database::update_record_raw() no fields found.');
1166 foreach ($params as $field=>$value) {
1167 $this->detect_objects($value);
1168 $sets[] = "$field = \$".$i++
;
1171 $params[] = $id; // last ? in WHERE condition
1173 $sets = implode(',', $sets);
1174 $sql = "UPDATE {$this->prefix}$table SET $sets WHERE id=\$".$i;
1176 $this->query_start($sql, $params, SQL_QUERY_UPDATE
);
1177 $result = pg_query_params($this->pgsql
, $sql, $params);
1178 $this->query_end($result);
1180 pg_free_result($result);
1185 * Update a record in a table
1187 * $dataobject is an object containing needed data
1188 * Relies on $dataobject having a variable "id" to
1189 * specify the record to update
1191 * @param string $table The database table to be checked against.
1192 * @param object $dataobject An object with contents equal to fieldname=>fieldvalue. Must have an entry for 'id' to map to the table specified.
1193 * @param bool true means repeated updates expected
1195 * @throws dml_exception A DML specific exception is thrown for any errors.
1197 public function update_record($table, $dataobject, $bulk=false) {
1198 $dataobject = (array)$dataobject;
1200 $columns = $this->get_columns($table);
1203 foreach ($dataobject as $field=>$value) {
1204 if (!isset($columns[$field])) {
1207 $column = $columns[$field];
1208 $cleaned[$field] = $this->normalise_value($column, $value);
1211 $this->update_record_raw($table, $cleaned, $bulk);
1217 * Set a single field in every table record which match a particular WHERE clause.
1219 * @param string $table The database table to be checked against.
1220 * @param string $newfield the field to set.
1221 * @param string $newvalue the value to set the field to.
1222 * @param string $select A fragment of SQL to be used in a where clause in the SQL call.
1223 * @param array $params array of sql parameters
1225 * @throws dml_exception A DML specific exception is thrown for any errors.
1227 public function set_field_select($table, $newfield, $newvalue, $select, array $params=null) {
1230 $select = "WHERE $select";
1232 if (is_null($params)) {
1235 list($select, $params, $type) = $this->fix_sql_params($select, $params);
1236 $i = count($params)+
1;
1238 // Get column metadata
1239 $columns = $this->get_columns($table);
1240 $column = $columns[$newfield];
1242 $normalisedvalue = $this->normalise_value($column, $newvalue);
1244 $newfield = "$newfield = \$" . $i;
1245 $params[] = $normalisedvalue;
1246 $sql = "UPDATE {$this->prefix}$table SET $newfield $select";
1248 $this->query_start($sql, $params, SQL_QUERY_UPDATE
);
1249 $result = pg_query_params($this->pgsql
, $sql, $params);
1250 $this->query_end($result);
1252 pg_free_result($result);
1258 * Delete one or more records from a table which match a particular WHERE clause, lobs not supported.
1260 * @param string $table The database table to be checked against.
1261 * @param string $select A fragment of SQL to be used in a where clause in the SQL call (used to define the selection criteria).
1262 * @param array $params array of sql parameters
1264 * @throws dml_exception A DML specific exception is thrown for any errors.
1266 public function delete_records_select($table, $select, array $params=null) {
1268 $select = "WHERE $select";
1270 $sql = "DELETE FROM {$this->prefix}$table $select";
1272 list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
1274 $this->query_start($sql, $params, SQL_QUERY_UPDATE
);
1275 $result = pg_query_params($this->pgsql
, $sql, $params);
1276 $this->query_end($result);
1278 pg_free_result($result);
1284 * Returns 'LIKE' part of a query.
1286 * @param string $fieldname usually name of the table column
1287 * @param string $param usually bound query parameter (?, :named)
1288 * @param bool $casesensitive use case sensitive search
1289 * @param bool $accensensitive use accent sensitive search (not all databases support accent insensitive)
1290 * @param bool $notlike true means "NOT LIKE"
1291 * @param string $escapechar escape char for '%' and '_'
1292 * @return string SQL code fragment
1294 public function sql_like($fieldname, $param, $casesensitive = true, $accentsensitive = true, $notlike = false, $escapechar = '\\') {
1295 if (strpos($param, '%') !== false) {
1296 debugging('Potential SQL injection detected, sql_like() expects bound parameters (? or :named)');
1299 // postgresql does not support accent insensitive text comparisons, sorry
1300 if ($casesensitive) {
1301 $LIKE = $notlike ?
'NOT LIKE' : 'LIKE';
1303 $LIKE = $notlike ?
'NOT ILIKE' : 'ILIKE';
1305 return "$fieldname $LIKE $param ESCAPE '$escapechar'";
1308 public function sql_bitxor($int1, $int2) {
1309 return '((' . $int1 . ') # (' . $int2 . '))';
1312 public function sql_cast_char2int($fieldname, $text=false) {
1313 return ' CAST(' . $fieldname . ' AS INT) ';
1316 public function sql_cast_char2real($fieldname, $text=false) {
1317 return " $fieldname::real ";
1320 public function sql_concat() {
1321 $arr = func_get_args();
1322 $s = implode(' || ', $arr);
1326 // Add always empty string element so integer-exclusive concats
1327 // will work without needing to cast each element explicitly
1328 return " '' || $s ";
1331 public function sql_concat_join($separator="' '", $elements=array()) {
1332 for ($n=count($elements)-1; $n > 0 ; $n--) {
1333 array_splice($elements, $n, 0, $separator);
1335 $s = implode(' || ', $elements);
1342 public function sql_regex_supported() {
1346 public function sql_regex($positivematch = true, $casesensitive = false) {
1347 if ($casesensitive) {
1348 return $positivematch ?
'~' : '!~';
1350 return $positivematch ?
'~*' : '!~*';
1355 * Does this driver support tool_replace?
1357 * @since Moodle 2.6.1
1360 public function replace_all_text_supported() {
1364 public function session_lock_supported() {
1369 * Obtain session lock
1370 * @param int $rowid id of the row with session record
1371 * @param int $timeout max allowed time to wait for the lock in seconds
1372 * @return bool success
1374 public function get_session_lock($rowid, $timeout) {
1375 // NOTE: there is a potential locking problem for database running
1376 // multiple instances of moodle, we could try to use pg_advisory_lock(int, int),
1377 // luckily there is not a big chance that they would collide
1378 if (!$this->session_lock_supported()) {
1382 parent
::get_session_lock($rowid, $timeout);
1384 $timeoutmilli = $timeout * 1000;
1386 $sql = "SET statement_timeout TO $timeoutmilli";
1387 $this->query_start($sql, null, SQL_QUERY_AUX
);
1388 $result = pg_query($this->pgsql
, $sql);
1389 $this->query_end($result);
1392 pg_free_result($result);
1395 $sql = "SELECT pg_advisory_lock($rowid)";
1396 $this->query_start($sql, null, SQL_QUERY_AUX
);
1398 $result = pg_query($this->pgsql
, $sql);
1401 $this->query_end($result);
1402 } catch (dml_exception
$ex) {
1403 if ($end - $start >= $timeout) {
1404 throw new dml_sessionwait_exception();
1411 pg_free_result($result);
1414 $sql = "SET statement_timeout TO DEFAULT";
1415 $this->query_start($sql, null, SQL_QUERY_AUX
);
1416 $result = pg_query($this->pgsql
, $sql);
1417 $this->query_end($result);
1420 pg_free_result($result);
1424 public function release_session_lock($rowid) {
1425 if (!$this->session_lock_supported()) {
1428 if (!$this->used_for_db_sessions
) {
1432 parent
::release_session_lock($rowid);
1434 $sql = "SELECT pg_advisory_unlock($rowid)";
1435 $this->query_start($sql, null, SQL_QUERY_AUX
);
1436 $result = pg_query($this->pgsql
, $sql);
1437 $this->query_end($result);
1440 pg_free_result($result);
1445 * Driver specific start of real database transaction,
1446 * this can not be used directly in code.
1449 protected function begin_transaction() {
1450 $this->savepointpresent
= true;
1451 $sql = "BEGIN ISOLATION LEVEL READ COMMITTED; SAVEPOINT moodle_pg_savepoint";
1452 $this->query_start($sql, null, SQL_QUERY_AUX
);
1453 $result = pg_query($this->pgsql
, $sql);
1454 $this->query_end($result);
1456 pg_free_result($result);
1460 * Driver specific commit of real database transaction,
1461 * this can not be used directly in code.
1464 protected function commit_transaction() {
1465 $this->savepointpresent
= false;
1466 $sql = "RELEASE SAVEPOINT moodle_pg_savepoint; COMMIT";
1467 $this->query_start($sql, null, SQL_QUERY_AUX
);
1468 $result = pg_query($this->pgsql
, $sql);
1469 $this->query_end($result);
1471 pg_free_result($result);
1475 * Driver specific abort of real database transaction,
1476 * this can not be used directly in code.
1479 protected function rollback_transaction() {
1480 $this->savepointpresent
= false;
1481 $sql = "RELEASE SAVEPOINT moodle_pg_savepoint; ROLLBACK";
1482 $this->query_start($sql, null, SQL_QUERY_AUX
);
1483 $result = pg_query($this->pgsql
, $sql);
1484 $this->query_end($result);
1486 pg_free_result($result);
1490 * Helper function trimming (whitespace + quotes) any string
1491 * needed because PG uses to enclose with double quotes some
1492 * fields in indexes definition and others
1494 * @param string $str string to apply whitespace + quotes trim
1495 * @return string trimmed string
1497 private function trim_quotes($str) {
1498 return trim(trim($str), "'\"");
1502 * Postgresql supports full-text search indexes.
1506 public function is_fulltext_search_supported() {