Merge branch 'MDL-47162' of git://github.com/merrill-oakland/moodle
[moodle.git] / lib / dml / pgsql_native_moodle_database.php
blobaaac5c87ef814a8bb92fc5a43ec70f42dea65743
1 <?php
2 // This file is part of Moodle - http://moodle.org/
3 //
4 // Moodle is free software: you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation, either version 3 of the License, or
7 // (at your option) any later version.
8 //
9 // Moodle is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
14 // You should have received a copy of the GNU General Public License
15 // along with Moodle. If not, see <http://www.gnu.org/licenses/>.
17 /**
18 * Native pgsql class representing moodle database interface.
20 * @package core_dml
21 * @copyright 2008 Petr Skoda (http://skodak.org)
22 * @license http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later
25 defined('MOODLE_INTERNAL') || die();
27 require_once(__DIR__.'/moodle_database.php');
28 require_once(__DIR__.'/pgsql_native_moodle_recordset.php');
29 require_once(__DIR__.'/pgsql_native_moodle_temptables.php');
31 /**
32 * Native pgsql class representing moodle database interface.
34 * @package core_dml
35 * @copyright 2008 Petr Skoda (http://skodak.org)
36 * @license http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later
38 class pgsql_native_moodle_database extends moodle_database {
40 /** @var resource $pgsql database resource */
41 protected $pgsql = null;
43 protected $last_error_reporting; // To handle pgsql driver default verbosity
45 /** @var bool savepoint hack for MDL-35506 - workaround for automatic transaction rollback on error */
46 protected $savepointpresent = false;
48 /**
49 * Detects if all needed PHP stuff installed.
50 * Note: can be used before connect()
51 * @return mixed true if ok, string if something
53 public function driver_installed() {
54 if (!extension_loaded('pgsql')) {
55 return get_string('pgsqlextensionisnotpresentinphp', 'install');
57 return true;
60 /**
61 * Returns database family type - describes SQL dialect
62 * Note: can be used before connect()
63 * @return string db family name (mysql, postgres, mssql, oracle, etc.)
65 public function get_dbfamily() {
66 return 'postgres';
69 /**
70 * Returns more specific database driver type
71 * Note: can be used before connect()
72 * @return string db type mysqli, pgsql, oci, mssql, sqlsrv
74 protected function get_dbtype() {
75 return 'pgsql';
78 /**
79 * Returns general database library name
80 * Note: can be used before connect()
81 * @return string db type pdo, native
83 protected function get_dblibrary() {
84 return 'native';
87 /**
88 * Returns localised database type name
89 * Note: can be used before connect()
90 * @return string
92 public function get_name() {
93 return get_string('nativepgsql', 'install');
96 /**
97 * Returns localised database configuration help.
98 * Note: can be used before connect()
99 * @return string
101 public function get_configuration_help() {
102 return get_string('nativepgsqlhelp', 'install');
106 * Connect to db
107 * Must be called before other methods.
108 * @param string $dbhost The database host.
109 * @param string $dbuser The database username.
110 * @param string $dbpass The database username's password.
111 * @param string $dbname The name of the database being connected to.
112 * @param mixed $prefix string means moodle db prefix, false used for external databases where prefix not used
113 * @param array $dboptions driver specific options
114 * @return bool true
115 * @throws dml_connection_exception if error
117 public function connect($dbhost, $dbuser, $dbpass, $dbname, $prefix, array $dboptions=null) {
118 if ($prefix == '' and !$this->external) {
119 //Enforce prefixes for everybody but mysql
120 throw new dml_exception('prefixcannotbeempty', $this->get_dbfamily());
123 $driverstatus = $this->driver_installed();
125 if ($driverstatus !== true) {
126 throw new dml_exception('dbdriverproblem', $driverstatus);
129 $this->store_settings($dbhost, $dbuser, $dbpass, $dbname, $prefix, $dboptions);
131 $pass = addcslashes($this->dbpass, "'\\");
133 // Unix socket connections should have lower overhead
134 if (!empty($this->dboptions['dbsocket']) and ($this->dbhost === 'localhost' or $this->dbhost === '127.0.0.1')) {
135 $connection = "user='$this->dbuser' password='$pass' dbname='$this->dbname'";
136 if (strpos($this->dboptions['dbsocket'], '/') !== false) {
137 // A directory was specified as the socket location.
138 $connection .= " host='".$this->dboptions['dbsocket']."'";
140 if (!empty($this->dboptions['dbport'])) {
141 // A port as specified, add it to the connection as it's used as part of the socket path.
142 $connection .= " port ='".$this->dboptions['dbport']."'";
144 } else {
145 $this->dboptions['dbsocket'] = '';
146 if (empty($this->dbname)) {
147 // probably old style socket connection - do not add port
148 $port = "";
149 } else if (empty($this->dboptions['dbport'])) {
150 $port = "port ='5432'";
151 } else {
152 $port = "port ='".$this->dboptions['dbport']."'";
154 $connection = "host='$this->dbhost' $port user='$this->dbuser' password='$pass' dbname='$this->dbname'";
157 if (empty($this->dboptions['dbhandlesoptions'])) {
158 // ALTER USER and ALTER DATABASE are overridden by these settings.
159 $options = array('--client_encoding=utf8', '--standard_conforming_strings=on');
160 // Select schema if specified, otherwise the first one wins.
161 if (!empty($this->dboptions['dbschema'])) {
162 $options[] = "-c search_path=" . addcslashes($this->dboptions['dbschema'], "'\\");
165 $connection .= " options='" . implode(' ', $options) . "'";
168 ob_start();
169 if (empty($this->dboptions['dbpersist'])) {
170 $this->pgsql = pg_connect($connection, PGSQL_CONNECT_FORCE_NEW);
171 } else {
172 $this->pgsql = pg_pconnect($connection, PGSQL_CONNECT_FORCE_NEW);
174 $dberr = ob_get_contents();
175 ob_end_clean();
177 $status = pg_connection_status($this->pgsql);
179 if ($status === false or $status === PGSQL_CONNECTION_BAD) {
180 $this->pgsql = null;
181 throw new dml_connection_exception($dberr);
184 if (!empty($this->dboptions['dbhandlesoptions'])) {
185 /* We don't trust people who just set the dbhandlesoptions, this code checks up on them.
186 * These functions do not talk to the server, they use the client library knowledge to determine state.
188 if (!empty($this->dboptions['dbschema'])) {
189 throw new dml_connection_exception('You cannot specify a schema with dbhandlesoptions, use the database to set it.');
191 if (pg_client_encoding($this->pgsql) != 'UTF8') {
192 throw new dml_connection_exception('client_encoding = UTF8 not set, it is: ' . pg_client_encoding($this->pgsql));
194 if (pg_escape_string($this->pgsql, '\\') != '\\') {
195 throw new dml_connection_exception('standard_conforming_strings = on, must be set at the database.');
199 // Connection stabilised and configured, going to instantiate the temptables controller
200 $this->temptables = new pgsql_native_moodle_temptables($this);
202 return true;
206 * Close database connection and release all resources
207 * and memory (especially circular memory references).
208 * Do NOT use connect() again, create a new instance if needed.
210 public function dispose() {
211 parent::dispose(); // Call parent dispose to write/close session and other common stuff before closing connection
212 if ($this->pgsql) {
213 pg_close($this->pgsql);
214 $this->pgsql = null;
220 * Called before each db query.
221 * @param string $sql
222 * @param array array of parameters
223 * @param int $type type of query
224 * @param mixed $extrainfo driver specific extra information
225 * @return void
227 protected function query_start($sql, array $params=null, $type, $extrainfo=null) {
228 parent::query_start($sql, $params, $type, $extrainfo);
229 // pgsql driver tents to send debug to output, we do not need that ;-)
230 $this->last_error_reporting = error_reporting(0);
234 * Called immediately after each db query.
235 * @param mixed db specific result
236 * @return void
238 protected function query_end($result) {
239 // reset original debug level
240 error_reporting($this->last_error_reporting);
241 try {
242 parent::query_end($result);
243 if ($this->savepointpresent and $this->last_type != SQL_QUERY_AUX and $this->last_type != SQL_QUERY_SELECT) {
244 $res = @pg_query($this->pgsql, "RELEASE SAVEPOINT moodle_pg_savepoint; SAVEPOINT moodle_pg_savepoint");
245 if ($res) {
246 pg_free_result($res);
249 } catch (Exception $e) {
250 if ($this->savepointpresent) {
251 $res = @pg_query($this->pgsql, "ROLLBACK TO SAVEPOINT moodle_pg_savepoint; SAVEPOINT moodle_pg_savepoint");
252 if ($res) {
253 pg_free_result($res);
256 throw $e;
261 * Returns database server info array
262 * @return array Array containing 'description' and 'version' info
264 public function get_server_info() {
265 static $info;
266 if (!$info) {
267 $this->query_start("--pg_version()", null, SQL_QUERY_AUX);
268 $info = pg_version($this->pgsql);
269 $this->query_end(true);
271 return array('description'=>$info['server'], 'version'=>$info['server']);
275 * Returns supported query parameter types
276 * @return int bitmask of accepted SQL_PARAMS_*
278 protected function allowed_param_types() {
279 return SQL_PARAMS_DOLLAR;
283 * Returns last error reported by database engine.
284 * @return string error message
286 public function get_last_error() {
287 return pg_last_error($this->pgsql);
291 * Return tables in database WITHOUT current prefix.
292 * @param bool $usecache if true, returns list of cached tables.
293 * @return array of table names in lowercase and without prefix
295 public function get_tables($usecache=true) {
296 if ($usecache and $this->tables !== null) {
297 return $this->tables;
299 $this->tables = array();
300 $prefix = str_replace('_', '|_', $this->prefix);
301 $sql = "SELECT c.relname
302 FROM pg_catalog.pg_class c
303 JOIN pg_catalog.pg_namespace as ns ON ns.oid = c.relnamespace
304 WHERE c.relname LIKE '$prefix%' ESCAPE '|'
305 AND c.relkind = 'r'
306 AND (ns.nspname = current_schema() OR ns.oid = pg_my_temp_schema())";
307 $this->query_start($sql, null, SQL_QUERY_AUX);
308 $result = pg_query($this->pgsql, $sql);
309 $this->query_end($result);
311 if ($result) {
312 while ($row = pg_fetch_row($result)) {
313 $tablename = reset($row);
314 if ($this->prefix !== false && $this->prefix !== '') {
315 if (strpos($tablename, $this->prefix) !== 0) {
316 continue;
318 $tablename = substr($tablename, strlen($this->prefix));
320 $this->tables[$tablename] = $tablename;
322 pg_free_result($result);
324 return $this->tables;
328 * Return table indexes - everything lowercased.
329 * @param string $table The table we want to get indexes from.
330 * @return array of arrays
332 public function get_indexes($table) {
333 $indexes = array();
334 $tablename = $this->prefix.$table;
336 $sql = "SELECT i.*
337 FROM pg_catalog.pg_indexes i
338 JOIN pg_catalog.pg_namespace as ns ON ns.nspname = i.schemaname
339 WHERE i.tablename = '$tablename'
340 AND (i.schemaname = current_schema() OR ns.oid = pg_my_temp_schema())";
342 $this->query_start($sql, null, SQL_QUERY_AUX);
343 $result = pg_query($this->pgsql, $sql);
344 $this->query_end($result);
346 if ($result) {
347 while ($row = pg_fetch_assoc($result)) {
348 if (!preg_match('/CREATE (|UNIQUE )INDEX ([^\s]+) ON '.$tablename.' USING ([^\s]+) \(([^\)]+)\)/i', $row['indexdef'], $matches)) {
349 continue;
351 if ($matches[4] === 'id') {
352 continue;
354 $columns = explode(',', $matches[4]);
355 foreach ($columns as $k=>$column) {
356 $column = trim($column);
357 if ($pos = strpos($column, ' ')) {
358 // index type is separated by space
359 $column = substr($column, 0, $pos);
361 $columns[$k] = $this->trim_quotes($column);
363 $indexes[$row['indexname']] = array('unique'=>!empty($matches[1]),
364 'columns'=>$columns);
366 pg_free_result($result);
368 return $indexes;
372 * Returns detailed information about columns in table. This information is cached internally.
373 * @param string $table name
374 * @param bool $usecache
375 * @return database_column_info[] array of database_column_info objects indexed with column names
377 public function get_columns($table, $usecache=true) {
378 if ($usecache) {
379 if ($this->temptables->is_temptable($table)) {
380 if ($data = $this->get_temp_tables_cache()->get($table)) {
381 return $data;
383 } else {
384 if ($data = $this->get_metacache()->get($table)) {
385 return $data;
390 $structure = array();
392 $tablename = $this->prefix.$table;
394 $sql = "SELECT a.attnum, a.attname AS field, t.typname AS type, a.attlen, a.atttypmod, a.attnotnull, a.atthasdef, d.adsrc
395 FROM pg_catalog.pg_class c
396 JOIN pg_catalog.pg_namespace as ns ON ns.oid = c.relnamespace
397 JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid
398 JOIN pg_catalog.pg_type t ON t.oid = a.atttypid
399 LEFT JOIN pg_catalog.pg_attrdef d ON (d.adrelid = c.oid AND d.adnum = a.attnum)
400 WHERE relkind = 'r' AND c.relname = '$tablename' AND c.reltype > 0 AND a.attnum > 0
401 AND (ns.nspname = current_schema() OR ns.oid = pg_my_temp_schema())
402 ORDER BY a.attnum";
404 $this->query_start($sql, null, SQL_QUERY_AUX);
405 $result = pg_query($this->pgsql, $sql);
406 $this->query_end($result);
408 if (!$result) {
409 return array();
411 while ($rawcolumn = pg_fetch_object($result)) {
413 $info = new stdClass();
414 $info->name = $rawcolumn->field;
415 $matches = null;
417 if ($rawcolumn->type === 'varchar') {
418 $info->type = 'varchar';
419 $info->meta_type = 'C';
420 $info->max_length = $rawcolumn->atttypmod - 4;
421 $info->scale = null;
422 $info->not_null = ($rawcolumn->attnotnull === 't');
423 $info->has_default = ($rawcolumn->atthasdef === 't');
424 if ($info->has_default) {
425 $parts = explode('::', $rawcolumn->adsrc);
426 if (count($parts) > 1) {
427 $info->default_value = reset($parts);
428 $info->default_value = trim($info->default_value, "'");
429 } else {
430 $info->default_value = $rawcolumn->adsrc;
432 } else {
433 $info->default_value = null;
435 $info->primary_key = false;
436 $info->binary = false;
437 $info->unsigned = null;
438 $info->auto_increment= false;
439 $info->unique = null;
441 } else if (preg_match('/int(\d)/i', $rawcolumn->type, $matches)) {
442 $info->type = 'int';
443 if (strpos($rawcolumn->adsrc, 'nextval') === 0) {
444 $info->primary_key = true;
445 $info->meta_type = 'R';
446 $info->unique = true;
447 $info->auto_increment= true;
448 $info->has_default = false;
449 } else {
450 $info->primary_key = false;
451 $info->meta_type = 'I';
452 $info->unique = null;
453 $info->auto_increment= false;
454 $info->has_default = ($rawcolumn->atthasdef === 't');
456 // Return number of decimals, not bytes here.
457 if ($matches[1] >= 8) {
458 $info->max_length = 18;
459 } else if ($matches[1] >= 4) {
460 $info->max_length = 9;
461 } else if ($matches[1] >= 2) {
462 $info->max_length = 4;
463 } else if ($matches[1] >= 1) {
464 $info->max_length = 2;
465 } else {
466 $info->max_length = 0;
468 $info->scale = null;
469 $info->not_null = ($rawcolumn->attnotnull === 't');
470 if ($info->has_default) {
471 // PG 9.5+ uses ::<TYPE> syntax for some defaults.
472 $parts = explode('::', $rawcolumn->adsrc);
473 if (count($parts) > 1) {
474 $info->default_value = reset($parts);
475 } else {
476 $info->default_value = $rawcolumn->adsrc;
478 $info->default_value = trim($info->default_value, "()'");
479 } else {
480 $info->default_value = null;
482 $info->binary = false;
483 $info->unsigned = false;
485 } else if ($rawcolumn->type === 'numeric') {
486 $info->type = $rawcolumn->type;
487 $info->meta_type = 'N';
488 $info->primary_key = false;
489 $info->binary = false;
490 $info->unsigned = null;
491 $info->auto_increment= false;
492 $info->unique = null;
493 $info->not_null = ($rawcolumn->attnotnull === 't');
494 $info->has_default = ($rawcolumn->atthasdef === 't');
495 if ($info->has_default) {
496 // PG 9.5+ uses ::<TYPE> syntax for some defaults.
497 $parts = explode('::', $rawcolumn->adsrc);
498 if (count($parts) > 1) {
499 $info->default_value = reset($parts);
500 } else {
501 $info->default_value = $rawcolumn->adsrc;
503 $info->default_value = trim($info->default_value, "()'");
504 } else {
505 $info->default_value = null;
507 $info->max_length = $rawcolumn->atttypmod >> 16;
508 $info->scale = ($rawcolumn->atttypmod & 0xFFFF) - 4;
510 } else if (preg_match('/float(\d)/i', $rawcolumn->type, $matches)) {
511 $info->type = 'float';
512 $info->meta_type = 'N';
513 $info->primary_key = false;
514 $info->binary = false;
515 $info->unsigned = null;
516 $info->auto_increment= false;
517 $info->unique = null;
518 $info->not_null = ($rawcolumn->attnotnull === 't');
519 $info->has_default = ($rawcolumn->atthasdef === 't');
520 if ($info->has_default) {
521 // PG 9.5+ uses ::<TYPE> syntax for some defaults.
522 $parts = explode('::', $rawcolumn->adsrc);
523 if (count($parts) > 1) {
524 $info->default_value = reset($parts);
525 } else {
526 $info->default_value = $rawcolumn->adsrc;
528 $info->default_value = trim($info->default_value, "()'");
529 } else {
530 $info->default_value = null;
532 // just guess expected number of deciaml places :-(
533 if ($matches[1] == 8) {
534 // total 15 digits
535 $info->max_length = 8;
536 $info->scale = 7;
537 } else {
538 // total 6 digits
539 $info->max_length = 4;
540 $info->scale = 2;
543 } else if ($rawcolumn->type === 'text') {
544 $info->type = $rawcolumn->type;
545 $info->meta_type = 'X';
546 $info->max_length = -1;
547 $info->scale = null;
548 $info->not_null = ($rawcolumn->attnotnull === 't');
549 $info->has_default = ($rawcolumn->atthasdef === 't');
550 if ($info->has_default) {
551 $parts = explode('::', $rawcolumn->adsrc);
552 if (count($parts) > 1) {
553 $info->default_value = reset($parts);
554 $info->default_value = trim($info->default_value, "'");
555 } else {
556 $info->default_value = $rawcolumn->adsrc;
558 } else {
559 $info->default_value = null;
561 $info->primary_key = false;
562 $info->binary = false;
563 $info->unsigned = null;
564 $info->auto_increment= false;
565 $info->unique = null;
567 } else if ($rawcolumn->type === 'bytea') {
568 $info->type = $rawcolumn->type;
569 $info->meta_type = 'B';
570 $info->max_length = -1;
571 $info->scale = null;
572 $info->not_null = ($rawcolumn->attnotnull === 't');
573 $info->has_default = false;
574 $info->default_value = null;
575 $info->primary_key = false;
576 $info->binary = true;
577 $info->unsigned = null;
578 $info->auto_increment= false;
579 $info->unique = null;
583 $structure[$info->name] = new database_column_info($info);
586 pg_free_result($result);
588 if ($usecache) {
589 if ($this->temptables->is_temptable($table)) {
590 $this->get_temp_tables_cache()->set($table, $structure);
591 } else {
592 $this->get_metacache()->set($table, $structure);
596 return $structure;
600 * Normalise values based in RDBMS dependencies (booleans, LOBs...)
602 * @param database_column_info $column column metadata corresponding with the value we are going to normalise
603 * @param mixed $value value we are going to normalise
604 * @return mixed the normalised value
606 protected function normalise_value($column, $value) {
607 $this->detect_objects($value);
609 if (is_bool($value)) { // Always, convert boolean to int
610 $value = (int)$value;
612 } else if ($column->meta_type === 'B') {
613 if (!is_null($value)) {
614 // standard_conforming_strings must be enabled, otherwise pg_escape_bytea() will double escape
615 // \ and produce data errors. This is set on the connection.
616 $value = pg_escape_bytea($this->pgsql, $value);
619 } else if ($value === '') {
620 if ($column->meta_type === 'I' or $column->meta_type === 'F' or $column->meta_type === 'N') {
621 $value = 0; // prevent '' problems in numeric fields
624 return $value;
628 * Is db in unicode mode?
629 * @return bool
631 public function setup_is_unicodedb() {
632 // Get PostgreSQL server_encoding value
633 $sql = "SHOW server_encoding";
634 $this->query_start($sql, null, SQL_QUERY_AUX);
635 $result = pg_query($this->pgsql, $sql);
636 $this->query_end($result);
638 if (!$result) {
639 return false;
641 $rawcolumn = pg_fetch_object($result);
642 $encoding = $rawcolumn->server_encoding;
643 pg_free_result($result);
645 return (strtoupper($encoding) == 'UNICODE' || strtoupper($encoding) == 'UTF8');
649 * Do NOT use in code, to be used by database_manager only!
650 * @param string|array $sql query
651 * @param array|null $tablenames an array of xmldb table names affected by this request.
652 * @return bool true
653 * @throws ddl_change_structure_exception A DDL specific exception is thrown for any errors.
655 public function change_database_structure($sql, $tablenames = null) {
656 $this->get_manager(); // Includes DDL exceptions classes ;-)
657 if (is_array($sql)) {
658 $sql = implode("\n;\n", $sql);
660 if (!$this->is_transaction_started()) {
661 // It is better to do all or nothing, this helps with recovery...
662 $sql = "BEGIN ISOLATION LEVEL SERIALIZABLE;\n$sql\n; COMMIT";
665 try {
666 $this->query_start($sql, null, SQL_QUERY_STRUCTURE);
667 $result = pg_query($this->pgsql, $sql);
668 $this->query_end($result);
669 pg_free_result($result);
670 } catch (ddl_change_structure_exception $e) {
671 if (!$this->is_transaction_started()) {
672 $result = @pg_query($this->pgsql, "ROLLBACK");
673 @pg_free_result($result);
675 $this->reset_caches($tablenames);
676 throw $e;
679 $this->reset_caches($tablenames);
680 return true;
684 * Execute general sql query. Should be used only when no other method suitable.
685 * Do NOT use this to make changes in db structure, use database_manager methods instead!
686 * @param string $sql query
687 * @param array $params query parameters
688 * @return bool true
689 * @throws dml_exception A DML specific exception is thrown for any errors.
691 public function execute($sql, array $params=null) {
692 list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
694 if (strpos($sql, ';') !== false) {
695 throw new coding_exception('moodle_database::execute() Multiple sql statements found or bound parameters not used properly in query!');
698 $this->query_start($sql, $params, SQL_QUERY_UPDATE);
699 $result = pg_query_params($this->pgsql, $sql, $params);
700 $this->query_end($result);
702 pg_free_result($result);
703 return true;
707 * Get a number of records as a moodle_recordset using a SQL statement.
709 * Since this method is a little less readable, use of it should be restricted to
710 * code where it's possible there might be large datasets being returned. For known
711 * small datasets use get_records_sql - it leads to simpler code.
713 * The return type is like:
714 * @see function get_recordset.
716 * @param string $sql the SQL select query to execute.
717 * @param array $params array of sql parameters
718 * @param int $limitfrom return a subset of records, starting at this point (optional, required if $limitnum is set).
719 * @param int $limitnum return a subset comprising this many records (optional, required if $limitfrom is set).
720 * @return moodle_recordset instance
721 * @throws dml_exception A DML specific exception is thrown for any errors.
723 public function get_recordset_sql($sql, array $params=null, $limitfrom=0, $limitnum=0) {
725 list($limitfrom, $limitnum) = $this->normalise_limit_from_num($limitfrom, $limitnum);
727 if ($limitnum) {
728 $sql .= " LIMIT $limitnum";
730 if ($limitfrom) {
731 $sql .= " OFFSET $limitfrom";
734 list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
736 $this->query_start($sql, $params, SQL_QUERY_SELECT);
737 $result = pg_query_params($this->pgsql, $sql, $params);
738 $this->query_end($result);
740 return $this->create_recordset($result);
743 protected function create_recordset($result) {
744 return new pgsql_native_moodle_recordset($result);
748 * Get a number of records as an array of objects using a SQL statement.
750 * Return value is like:
751 * @see function get_records.
753 * @param string $sql the SQL select query to execute. The first column of this SELECT statement
754 * must be a unique value (usually the 'id' field), as it will be used as the key of the
755 * returned array.
756 * @param array $params array of sql parameters
757 * @param int $limitfrom return a subset of records, starting at this point (optional, required if $limitnum is set).
758 * @param int $limitnum return a subset comprising this many records (optional, required if $limitfrom is set).
759 * @return array of objects, or empty array if no records were found
760 * @throws dml_exception A DML specific exception is thrown for any errors.
762 public function get_records_sql($sql, array $params=null, $limitfrom=0, $limitnum=0) {
764 list($limitfrom, $limitnum) = $this->normalise_limit_from_num($limitfrom, $limitnum);
766 if ($limitnum) {
767 $sql .= " LIMIT $limitnum";
769 if ($limitfrom) {
770 $sql .= " OFFSET $limitfrom";
773 list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
774 $this->query_start($sql, $params, SQL_QUERY_SELECT);
775 $result = pg_query_params($this->pgsql, $sql, $params);
776 $this->query_end($result);
778 // find out if there are any blobs
779 $numfields = pg_num_fields($result);
780 $blobs = array();
781 for ($i = 0; $i < $numfields; $i++) {
782 $type = pg_field_type($result, $i);
783 if ($type == 'bytea') {
784 $blobs[] = pg_field_name($result, $i);
788 $rows = pg_fetch_all($result);
789 pg_free_result($result);
791 $return = array();
792 if ($rows) {
793 foreach ($rows as $row) {
794 $id = reset($row);
795 if ($blobs) {
796 foreach ($blobs as $blob) {
797 $row[$blob] = ($row[$blob] !== null ? pg_unescape_bytea($row[$blob]) : null);
800 if (isset($return[$id])) {
801 $colname = key($row);
802 debugging("Did you remember to make the first column something unique in your call to get_records? Duplicate value '$id' found in column '$colname'.", DEBUG_DEVELOPER);
804 $return[$id] = (object)$row;
808 return $return;
812 * Selects records and return values (first field) as an array using a SQL statement.
814 * @param string $sql The SQL query
815 * @param array $params array of sql parameters
816 * @return array of values
817 * @throws dml_exception A DML specific exception is thrown for any errors.
819 public function get_fieldset_sql($sql, array $params=null) {
820 list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
822 $this->query_start($sql, $params, SQL_QUERY_SELECT);
823 $result = pg_query_params($this->pgsql, $sql, $params);
824 $this->query_end($result);
826 $return = pg_fetch_all_columns($result, 0);
828 if (pg_field_type($result, 0) == 'bytea') {
829 foreach ($return as $key => $value) {
830 $return[$key] = ($value === null ? $value : pg_unescape_bytea($value));
834 pg_free_result($result);
836 return $return;
840 * Insert new record into database, as fast as possible, no safety checks, lobs not supported.
841 * @param string $table name
842 * @param mixed $params data record as object or array
843 * @param bool $returnit return it of inserted record
844 * @param bool $bulk true means repeated inserts expected
845 * @param bool $customsequence true if 'id' included in $params, disables $returnid
846 * @return bool|int true or new id
847 * @throws dml_exception A DML specific exception is thrown for any errors.
849 public function insert_record_raw($table, $params, $returnid=true, $bulk=false, $customsequence=false) {
850 if (!is_array($params)) {
851 $params = (array)$params;
854 $returning = "";
856 if ($customsequence) {
857 if (!isset($params['id'])) {
858 throw new coding_exception('moodle_database::insert_record_raw() id field must be specified if custom sequences used.');
860 $returnid = false;
861 } else {
862 if ($returnid) {
863 $returning = "RETURNING id";
864 unset($params['id']);
865 } else {
866 unset($params['id']);
870 if (empty($params)) {
871 throw new coding_exception('moodle_database::insert_record_raw() no fields found.');
874 $fields = implode(',', array_keys($params));
875 $values = array();
876 $i = 1;
877 foreach ($params as $value) {
878 $this->detect_objects($value);
879 $values[] = "\$".$i++;
881 $values = implode(',', $values);
883 $sql = "INSERT INTO {$this->prefix}$table ($fields) VALUES($values) $returning";
884 $this->query_start($sql, $params, SQL_QUERY_INSERT);
885 $result = pg_query_params($this->pgsql, $sql, $params);
886 $this->query_end($result);
888 if ($returning !== "") {
889 $row = pg_fetch_assoc($result);
890 $params['id'] = reset($row);
892 pg_free_result($result);
894 if (!$returnid) {
895 return true;
898 return (int)$params['id'];
902 * Insert a record into a table and return the "id" field if required.
904 * Some conversions and safety checks are carried out. Lobs are supported.
905 * If the return ID isn't required, then this just reports success as true/false.
906 * $data is an object containing needed data
907 * @param string $table The database table to be inserted into
908 * @param object $data A data object with values for one or more fields in the record
909 * @param bool $returnid Should the id of the newly created record entry be returned? If this option is not requested then true/false is returned.
910 * @return bool|int true or new id
911 * @throws dml_exception A DML specific exception is thrown for any errors.
913 public function insert_record($table, $dataobject, $returnid=true, $bulk=false) {
914 $dataobject = (array)$dataobject;
916 $columns = $this->get_columns($table);
917 if (empty($columns)) {
918 throw new dml_exception('ddltablenotexist', $table);
921 $cleaned = array();
923 foreach ($dataobject as $field=>$value) {
924 if ($field === 'id') {
925 continue;
927 if (!isset($columns[$field])) {
928 continue;
930 $column = $columns[$field];
931 $cleaned[$field] = $this->normalise_value($column, $value);
934 return $this->insert_record_raw($table, $cleaned, $returnid, $bulk);
939 * Insert multiple records into database as fast as possible.
941 * Order of inserts is maintained, but the operation is not atomic,
942 * use transactions if necessary.
944 * This method is intended for inserting of large number of small objects,
945 * do not use for huge objects with text or binary fields.
947 * @since Moodle 2.7
949 * @param string $table The database table to be inserted into
950 * @param array|Traversable $dataobjects list of objects to be inserted, must be compatible with foreach
951 * @return void does not return new record ids
953 * @throws coding_exception if data objects have different structure
954 * @throws dml_exception A DML specific exception is thrown for any errors.
956 public function insert_records($table, $dataobjects) {
957 if (!is_array($dataobjects) and !($dataobjects instanceof Traversable)) {
958 throw new coding_exception('insert_records() passed non-traversable object');
961 // PostgreSQL does not seem to have problems with huge queries.
962 $chunksize = 500;
963 if (!empty($this->dboptions['bulkinsertsize'])) {
964 $chunksize = (int)$this->dboptions['bulkinsertsize'];
967 $columns = $this->get_columns($table, true);
969 $fields = null;
970 $count = 0;
971 $chunk = array();
972 foreach ($dataobjects as $dataobject) {
973 if (!is_array($dataobject) and !is_object($dataobject)) {
974 throw new coding_exception('insert_records() passed invalid record object');
976 $dataobject = (array)$dataobject;
977 if ($fields === null) {
978 $fields = array_keys($dataobject);
979 $columns = array_intersect_key($columns, $dataobject);
980 unset($columns['id']);
981 } else if ($fields !== array_keys($dataobject)) {
982 throw new coding_exception('All dataobjects in insert_records() must have the same structure!');
985 $count++;
986 $chunk[] = $dataobject;
988 if ($count === $chunksize) {
989 $this->insert_chunk($table, $chunk, $columns);
990 $chunk = array();
991 $count = 0;
995 if ($count) {
996 $this->insert_chunk($table, $chunk, $columns);
1001 * Insert records in chunks, strict param types...
1003 * Note: can be used only from insert_records().
1005 * @param string $table
1006 * @param array $chunk
1007 * @param database_column_info[] $columns
1009 protected function insert_chunk($table, array $chunk, array $columns) {
1010 $i = 1;
1011 $params = array();
1012 $values = array();
1013 foreach ($chunk as $dataobject) {
1014 $vals = array();
1015 foreach ($columns as $field => $column) {
1016 $params[] = $this->normalise_value($column, $dataobject[$field]);
1017 $vals[] = "\$".$i++;
1019 $values[] = '('.implode(',', $vals).')';
1022 $fieldssql = '('.implode(',', array_keys($columns)).')';
1023 $valuessql = implode(',', $values);
1025 $sql = "INSERT INTO {$this->prefix}$table $fieldssql VALUES $valuessql";
1026 $this->query_start($sql, $params, SQL_QUERY_INSERT);
1027 $result = pg_query_params($this->pgsql, $sql, $params);
1028 $this->query_end($result);
1029 pg_free_result($result);
1033 * Import a record into a table, id field is required.
1034 * Safety checks are NOT carried out. Lobs are supported.
1036 * @param string $table name of database table to be inserted into
1037 * @param object $dataobject A data object with values for one or more fields in the record
1038 * @return bool true
1039 * @throws dml_exception A DML specific exception is thrown for any errors.
1041 public function import_record($table, $dataobject) {
1042 $dataobject = (array)$dataobject;
1044 $columns = $this->get_columns($table);
1045 $cleaned = array();
1047 foreach ($dataobject as $field=>$value) {
1048 $this->detect_objects($value);
1049 if (!isset($columns[$field])) {
1050 continue;
1052 $column = $columns[$field];
1053 $cleaned[$field] = $this->normalise_value($column, $value);
1056 return $this->insert_record_raw($table, $cleaned, false, true, true);
1060 * Update record in database, as fast as possible, no safety checks, lobs not supported.
1061 * @param string $table name
1062 * @param mixed $params data record as object or array
1063 * @param bool true means repeated updates expected
1064 * @return bool true
1065 * @throws dml_exception A DML specific exception is thrown for any errors.
1067 public function update_record_raw($table, $params, $bulk=false) {
1068 $params = (array)$params;
1070 if (!isset($params['id'])) {
1071 throw new coding_exception('moodle_database::update_record_raw() id field must be specified.');
1073 $id = $params['id'];
1074 unset($params['id']);
1076 if (empty($params)) {
1077 throw new coding_exception('moodle_database::update_record_raw() no fields found.');
1080 $i = 1;
1082 $sets = array();
1083 foreach ($params as $field=>$value) {
1084 $this->detect_objects($value);
1085 $sets[] = "$field = \$".$i++;
1088 $params[] = $id; // last ? in WHERE condition
1090 $sets = implode(',', $sets);
1091 $sql = "UPDATE {$this->prefix}$table SET $sets WHERE id=\$".$i;
1093 $this->query_start($sql, $params, SQL_QUERY_UPDATE);
1094 $result = pg_query_params($this->pgsql, $sql, $params);
1095 $this->query_end($result);
1097 pg_free_result($result);
1098 return true;
1102 * Update a record in a table
1104 * $dataobject is an object containing needed data
1105 * Relies on $dataobject having a variable "id" to
1106 * specify the record to update
1108 * @param string $table The database table to be checked against.
1109 * @param object $dataobject An object with contents equal to fieldname=>fieldvalue. Must have an entry for 'id' to map to the table specified.
1110 * @param bool true means repeated updates expected
1111 * @return bool true
1112 * @throws dml_exception A DML specific exception is thrown for any errors.
1114 public function update_record($table, $dataobject, $bulk=false) {
1115 $dataobject = (array)$dataobject;
1117 $columns = $this->get_columns($table);
1118 $cleaned = array();
1120 foreach ($dataobject as $field=>$value) {
1121 if (!isset($columns[$field])) {
1122 continue;
1124 $column = $columns[$field];
1125 $cleaned[$field] = $this->normalise_value($column, $value);
1128 $this->update_record_raw($table, $cleaned, $bulk);
1130 return true;
1134 * Set a single field in every table record which match a particular WHERE clause.
1136 * @param string $table The database table to be checked against.
1137 * @param string $newfield the field to set.
1138 * @param string $newvalue the value to set the field to.
1139 * @param string $select A fragment of SQL to be used in a where clause in the SQL call.
1140 * @param array $params array of sql parameters
1141 * @return bool true
1142 * @throws dml_exception A DML specific exception is thrown for any errors.
1144 public function set_field_select($table, $newfield, $newvalue, $select, array $params=null) {
1146 if ($select) {
1147 $select = "WHERE $select";
1149 if (is_null($params)) {
1150 $params = array();
1152 list($select, $params, $type) = $this->fix_sql_params($select, $params);
1153 $i = count($params)+1;
1155 // Get column metadata
1156 $columns = $this->get_columns($table);
1157 $column = $columns[$newfield];
1159 $normalisedvalue = $this->normalise_value($column, $newvalue);
1161 $newfield = "$newfield = \$" . $i;
1162 $params[] = $normalisedvalue;
1163 $sql = "UPDATE {$this->prefix}$table SET $newfield $select";
1165 $this->query_start($sql, $params, SQL_QUERY_UPDATE);
1166 $result = pg_query_params($this->pgsql, $sql, $params);
1167 $this->query_end($result);
1169 pg_free_result($result);
1171 return true;
1175 * Delete one or more records from a table which match a particular WHERE clause, lobs not supported.
1177 * @param string $table The database table to be checked against.
1178 * @param string $select A fragment of SQL to be used in a where clause in the SQL call (used to define the selection criteria).
1179 * @param array $params array of sql parameters
1180 * @return bool true
1181 * @throws dml_exception A DML specific exception is thrown for any errors.
1183 public function delete_records_select($table, $select, array $params=null) {
1184 if ($select) {
1185 $select = "WHERE $select";
1187 $sql = "DELETE FROM {$this->prefix}$table $select";
1189 list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
1191 $this->query_start($sql, $params, SQL_QUERY_UPDATE);
1192 $result = pg_query_params($this->pgsql, $sql, $params);
1193 $this->query_end($result);
1195 pg_free_result($result);
1197 return true;
1201 * Returns 'LIKE' part of a query.
1203 * @param string $fieldname usually name of the table column
1204 * @param string $param usually bound query parameter (?, :named)
1205 * @param bool $casesensitive use case sensitive search
1206 * @param bool $accensensitive use accent sensitive search (not all databases support accent insensitive)
1207 * @param bool $notlike true means "NOT LIKE"
1208 * @param string $escapechar escape char for '%' and '_'
1209 * @return string SQL code fragment
1211 public function sql_like($fieldname, $param, $casesensitive = true, $accentsensitive = true, $notlike = false, $escapechar = '\\') {
1212 if (strpos($param, '%') !== false) {
1213 debugging('Potential SQL injection detected, sql_like() expects bound parameters (? or :named)');
1216 // postgresql does not support accent insensitive text comparisons, sorry
1217 if ($casesensitive) {
1218 $LIKE = $notlike ? 'NOT LIKE' : 'LIKE';
1219 } else {
1220 $LIKE = $notlike ? 'NOT ILIKE' : 'ILIKE';
1222 return "$fieldname $LIKE $param ESCAPE '$escapechar'";
1225 public function sql_bitxor($int1, $int2) {
1226 return '((' . $int1 . ') # (' . $int2 . '))';
1229 public function sql_cast_char2int($fieldname, $text=false) {
1230 return ' CAST(' . $fieldname . ' AS INT) ';
1233 public function sql_cast_char2real($fieldname, $text=false) {
1234 return " $fieldname::real ";
1237 public function sql_concat() {
1238 $arr = func_get_args();
1239 $s = implode(' || ', $arr);
1240 if ($s === '') {
1241 return " '' ";
1243 // Add always empty string element so integer-exclusive concats
1244 // will work without needing to cast each element explicitly
1245 return " '' || $s ";
1248 public function sql_concat_join($separator="' '", $elements=array()) {
1249 for ($n=count($elements)-1; $n > 0 ; $n--) {
1250 array_splice($elements, $n, 0, $separator);
1252 $s = implode(' || ', $elements);
1253 if ($s === '') {
1254 return " '' ";
1256 return " $s ";
1259 public function sql_regex_supported() {
1260 return true;
1263 public function sql_regex($positivematch=true) {
1264 return $positivematch ? '~*' : '!~*';
1268 * Does this driver support tool_replace?
1270 * @since Moodle 2.6.1
1271 * @return bool
1273 public function replace_all_text_supported() {
1274 return true;
1277 public function session_lock_supported() {
1278 return true;
1282 * Obtain session lock
1283 * @param int $rowid id of the row with session record
1284 * @param int $timeout max allowed time to wait for the lock in seconds
1285 * @return bool success
1287 public function get_session_lock($rowid, $timeout) {
1288 // NOTE: there is a potential locking problem for database running
1289 // multiple instances of moodle, we could try to use pg_advisory_lock(int, int),
1290 // luckily there is not a big chance that they would collide
1291 if (!$this->session_lock_supported()) {
1292 return;
1295 parent::get_session_lock($rowid, $timeout);
1297 $timeoutmilli = $timeout * 1000;
1299 $sql = "SET statement_timeout TO $timeoutmilli";
1300 $this->query_start($sql, null, SQL_QUERY_AUX);
1301 $result = pg_query($this->pgsql, $sql);
1302 $this->query_end($result);
1304 if ($result) {
1305 pg_free_result($result);
1308 $sql = "SELECT pg_advisory_lock($rowid)";
1309 $this->query_start($sql, null, SQL_QUERY_AUX);
1310 $start = time();
1311 $result = pg_query($this->pgsql, $sql);
1312 $end = time();
1313 try {
1314 $this->query_end($result);
1315 } catch (dml_exception $ex) {
1316 if ($end - $start >= $timeout) {
1317 throw new dml_sessionwait_exception();
1318 } else {
1319 throw $ex;
1323 if ($result) {
1324 pg_free_result($result);
1327 $sql = "SET statement_timeout TO DEFAULT";
1328 $this->query_start($sql, null, SQL_QUERY_AUX);
1329 $result = pg_query($this->pgsql, $sql);
1330 $this->query_end($result);
1332 if ($result) {
1333 pg_free_result($result);
1337 public function release_session_lock($rowid) {
1338 if (!$this->session_lock_supported()) {
1339 return;
1341 if (!$this->used_for_db_sessions) {
1342 return;
1345 parent::release_session_lock($rowid);
1347 $sql = "SELECT pg_advisory_unlock($rowid)";
1348 $this->query_start($sql, null, SQL_QUERY_AUX);
1349 $result = pg_query($this->pgsql, $sql);
1350 $this->query_end($result);
1352 if ($result) {
1353 pg_free_result($result);
1358 * Driver specific start of real database transaction,
1359 * this can not be used directly in code.
1360 * @return void
1362 protected function begin_transaction() {
1363 $this->savepointpresent = true;
1364 $sql = "BEGIN ISOLATION LEVEL READ COMMITTED; SAVEPOINT moodle_pg_savepoint";
1365 $this->query_start($sql, NULL, SQL_QUERY_AUX);
1366 $result = pg_query($this->pgsql, $sql);
1367 $this->query_end($result);
1369 pg_free_result($result);
1373 * Driver specific commit of real database transaction,
1374 * this can not be used directly in code.
1375 * @return void
1377 protected function commit_transaction() {
1378 $this->savepointpresent = false;
1379 $sql = "RELEASE SAVEPOINT moodle_pg_savepoint; COMMIT";
1380 $this->query_start($sql, NULL, SQL_QUERY_AUX);
1381 $result = pg_query($this->pgsql, $sql);
1382 $this->query_end($result);
1384 pg_free_result($result);
1388 * Driver specific abort of real database transaction,
1389 * this can not be used directly in code.
1390 * @return void
1392 protected function rollback_transaction() {
1393 $this->savepointpresent = false;
1394 $sql = "RELEASE SAVEPOINT moodle_pg_savepoint; ROLLBACK";
1395 $this->query_start($sql, NULL, SQL_QUERY_AUX);
1396 $result = pg_query($this->pgsql, $sql);
1397 $this->query_end($result);
1399 pg_free_result($result);
1403 * Helper function trimming (whitespace + quotes) any string
1404 * needed because PG uses to enclose with double quotes some
1405 * fields in indexes definition and others
1407 * @param string $str string to apply whitespace + quotes trim
1408 * @return string trimmed string
1410 private function trim_quotes($str) {
1411 return trim(trim($str), "'\"");