MDL-42754 Messages: Show noreply user notifications
[moodle.git] / lib / dml / pgsql_native_moodle_database.php
blob3754d9b1df473770db4f2407d1b0167fabd5774b
1 <?php
2 // This file is part of Moodle - http://moodle.org/
3 //
4 // Moodle is free software: you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation, either version 3 of the License, or
7 // (at your option) any later version.
8 //
9 // Moodle is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
14 // You should have received a copy of the GNU General Public License
15 // along with Moodle. If not, see <http://www.gnu.org/licenses/>.
17 /**
18 * Native pgsql class representing moodle database interface.
20 * @package core_dml
21 * @copyright 2008 Petr Skoda (http://skodak.org)
22 * @license http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later
25 defined('MOODLE_INTERNAL') || die();
27 require_once(__DIR__.'/moodle_database.php');
28 require_once(__DIR__.'/pgsql_native_moodle_recordset.php');
29 require_once(__DIR__.'/pgsql_native_moodle_temptables.php');
31 /**
32 * Native pgsql class representing moodle database interface.
34 * @package core_dml
35 * @copyright 2008 Petr Skoda (http://skodak.org)
36 * @license http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later
38 class pgsql_native_moodle_database extends moodle_database {
40 protected $pgsql = null;
41 protected $bytea_oid = null;
43 protected $last_error_reporting; // To handle pgsql driver default verbosity
45 /** @var bool savepoint hack for MDL-35506 - workaround for automatic transaction rollback on error */
46 protected $savepointpresent = false;
48 /**
49 * Detects if all needed PHP stuff installed.
50 * Note: can be used before connect()
51 * @return mixed true if ok, string if something
53 public function driver_installed() {
54 if (!extension_loaded('pgsql')) {
55 return get_string('pgsqlextensionisnotpresentinphp', 'install');
57 return true;
60 /**
61 * Returns database family type - describes SQL dialect
62 * Note: can be used before connect()
63 * @return string db family name (mysql, postgres, mssql, oracle, etc.)
65 public function get_dbfamily() {
66 return 'postgres';
69 /**
70 * Returns more specific database driver type
71 * Note: can be used before connect()
72 * @return string db type mysqli, pgsql, oci, mssql, sqlsrv
74 protected function get_dbtype() {
75 return 'pgsql';
78 /**
79 * Returns general database library name
80 * Note: can be used before connect()
81 * @return string db type pdo, native
83 protected function get_dblibrary() {
84 return 'native';
87 /**
88 * Returns localised database type name
89 * Note: can be used before connect()
90 * @return string
92 public function get_name() {
93 return get_string('nativepgsql', 'install');
96 /**
97 * Returns localised database configuration help.
98 * Note: can be used before connect()
99 * @return string
101 public function get_configuration_help() {
102 return get_string('nativepgsqlhelp', 'install');
106 * Connect to db
107 * Must be called before other methods.
108 * @param string $dbhost The database host.
109 * @param string $dbuser The database username.
110 * @param string $dbpass The database username's password.
111 * @param string $dbname The name of the database being connected to.
112 * @param mixed $prefix string means moodle db prefix, false used for external databases where prefix not used
113 * @param array $dboptions driver specific options
114 * @return bool true
115 * @throws dml_connection_exception if error
117 public function connect($dbhost, $dbuser, $dbpass, $dbname, $prefix, array $dboptions=null) {
118 if ($prefix == '' and !$this->external) {
119 //Enforce prefixes for everybody but mysql
120 throw new dml_exception('prefixcannotbeempty', $this->get_dbfamily());
123 $driverstatus = $this->driver_installed();
125 if ($driverstatus !== true) {
126 throw new dml_exception('dbdriverproblem', $driverstatus);
129 $this->store_settings($dbhost, $dbuser, $dbpass, $dbname, $prefix, $dboptions);
131 $pass = addcslashes($this->dbpass, "'\\");
133 // Unix socket connections should have lower overhead
134 if (!empty($this->dboptions['dbsocket']) and ($this->dbhost === 'localhost' or $this->dbhost === '127.0.0.1')) {
135 $connection = "user='$this->dbuser' password='$pass' dbname='$this->dbname'";
136 if (strpos($this->dboptions['dbsocket'], '/') !== false) {
137 $connection = $connection." host='".$this->dboptions['dbsocket']."'";
139 } else {
140 $this->dboptions['dbsocket'] = '';
141 if (empty($this->dbname)) {
142 // probably old style socket connection - do not add port
143 $port = "";
144 } else if (empty($this->dboptions['dbport'])) {
145 $port = "port ='5432'";
146 } else {
147 $port = "port ='".$this->dboptions['dbport']."'";
149 $connection = "host='$this->dbhost' $port user='$this->dbuser' password='$pass' dbname='$this->dbname'";
152 ob_start();
153 if (empty($this->dboptions['dbpersist'])) {
154 $this->pgsql = pg_connect($connection, PGSQL_CONNECT_FORCE_NEW);
155 } else {
156 $this->pgsql = pg_pconnect($connection, PGSQL_CONNECT_FORCE_NEW);
158 $dberr = ob_get_contents();
159 ob_end_clean();
161 $status = pg_connection_status($this->pgsql);
163 if ($status === false or $status === PGSQL_CONNECTION_BAD) {
164 $this->pgsql = null;
165 throw new dml_connection_exception($dberr);
168 $this->query_start("--pg_set_client_encoding()", null, SQL_QUERY_AUX);
169 pg_set_client_encoding($this->pgsql, 'utf8');
170 $this->query_end(true);
172 $sql = '';
173 // Only for 9.0 and upwards, set bytea encoding to old format.
174 if ($this->is_min_version('9.0')) {
175 $sql = "SET bytea_output = 'escape'; ";
178 // Select schema if specified, otherwise the first one wins.
179 if (!empty($this->dboptions['dbschema'])) {
180 $sql .= "SET search_path = '".$this->dboptions['dbschema']."'; ";
183 // Find out the bytea oid.
184 $sql .= "SELECT oid FROM pg_type WHERE typname = 'bytea'";
185 $this->query_start($sql, null, SQL_QUERY_AUX);
186 $result = pg_query($this->pgsql, $sql);
187 $this->query_end($result);
189 $this->bytea_oid = pg_fetch_result($result, 0, 0);
190 pg_free_result($result);
191 if ($this->bytea_oid === false) {
192 $this->pgsql = null;
193 throw new dml_connection_exception('Can not read bytea type.');
196 // Connection stabilised and configured, going to instantiate the temptables controller
197 $this->temptables = new pgsql_native_moodle_temptables($this);
199 return true;
203 * Close database connection and release all resources
204 * and memory (especially circular memory references).
205 * Do NOT use connect() again, create a new instance if needed.
207 public function dispose() {
208 parent::dispose(); // Call parent dispose to write/close session and other common stuff before closing connection
209 if ($this->pgsql) {
210 pg_close($this->pgsql);
211 $this->pgsql = null;
217 * Called before each db query.
218 * @param string $sql
219 * @param array array of parameters
220 * @param int $type type of query
221 * @param mixed $extrainfo driver specific extra information
222 * @return void
224 protected function query_start($sql, array $params=null, $type, $extrainfo=null) {
225 parent::query_start($sql, $params, $type, $extrainfo);
226 // pgsql driver tents to send debug to output, we do not need that ;-)
227 $this->last_error_reporting = error_reporting(0);
231 * Called immediately after each db query.
232 * @param mixed db specific result
233 * @return void
235 protected function query_end($result) {
236 // reset original debug level
237 error_reporting($this->last_error_reporting);
238 try {
239 parent::query_end($result);
240 if ($this->savepointpresent and $this->last_type != SQL_QUERY_AUX and $this->last_type != SQL_QUERY_SELECT) {
241 $res = @pg_query($this->pgsql, "RELEASE SAVEPOINT moodle_pg_savepoint; SAVEPOINT moodle_pg_savepoint");
242 if ($res) {
243 pg_free_result($res);
246 } catch (Exception $e) {
247 if ($this->savepointpresent) {
248 $res = @pg_query($this->pgsql, "ROLLBACK TO SAVEPOINT moodle_pg_savepoint; SAVEPOINT moodle_pg_savepoint");
249 if ($res) {
250 pg_free_result($res);
253 throw $e;
258 * Returns database server info array
259 * @return array Array containing 'description' and 'version' info
261 public function get_server_info() {
262 static $info;
263 if (!$info) {
264 $this->query_start("--pg_version()", null, SQL_QUERY_AUX);
265 $info = pg_version($this->pgsql);
266 $this->query_end(true);
268 return array('description'=>$info['server'], 'version'=>$info['server']);
272 * Returns if the RDBMS server fulfills the required version
274 * @param string $version version to check against
275 * @return bool returns if the version is fulfilled (true) or no (false)
277 private function is_min_version($version) {
278 $server = $this->get_server_info();
279 $server = $server['version'];
280 return version_compare($server, $version, '>=');
284 * Returns supported query parameter types
285 * @return int bitmask of accepted SQL_PARAMS_*
287 protected function allowed_param_types() {
288 return SQL_PARAMS_DOLLAR;
292 * Returns last error reported by database engine.
293 * @return string error message
295 public function get_last_error() {
296 return pg_last_error($this->pgsql);
300 * Return tables in database WITHOUT current prefix.
301 * @param bool $usecache if true, returns list of cached tables.
302 * @return array of table names in lowercase and without prefix
304 public function get_tables($usecache=true) {
305 if ($usecache and $this->tables !== null) {
306 return $this->tables;
308 $this->tables = array();
309 $prefix = str_replace('_', '|_', $this->prefix);
310 $sql = "SELECT c.relname
311 FROM pg_catalog.pg_class c
312 JOIN pg_catalog.pg_namespace as ns ON ns.oid = c.relnamespace
313 WHERE c.relname LIKE '$prefix%' ESCAPE '|'
314 AND c.relkind = 'r'
315 AND (ns.nspname = current_schema() OR ns.oid = pg_my_temp_schema())";
316 $this->query_start($sql, null, SQL_QUERY_AUX);
317 $result = pg_query($this->pgsql, $sql);
318 $this->query_end($result);
320 if ($result) {
321 while ($row = pg_fetch_row($result)) {
322 $tablename = reset($row);
323 if ($this->prefix !== '') {
324 if (strpos($tablename, $this->prefix) !== 0) {
325 continue;
327 $tablename = substr($tablename, strlen($this->prefix));
329 $this->tables[$tablename] = $tablename;
331 pg_free_result($result);
333 return $this->tables;
337 * Return table indexes - everything lowercased.
338 * @param string $table The table we want to get indexes from.
339 * @return array of arrays
341 public function get_indexes($table) {
342 $indexes = array();
343 $tablename = $this->prefix.$table;
345 $sql = "SELECT i.*
346 FROM pg_catalog.pg_indexes i
347 JOIN pg_catalog.pg_namespace as ns ON ns.nspname = i.schemaname
348 WHERE i.tablename = '$tablename'
349 AND (i.schemaname = current_schema() OR ns.oid = pg_my_temp_schema())";
351 $this->query_start($sql, null, SQL_QUERY_AUX);
352 $result = pg_query($this->pgsql, $sql);
353 $this->query_end($result);
355 if ($result) {
356 while ($row = pg_fetch_assoc($result)) {
357 if (!preg_match('/CREATE (|UNIQUE )INDEX ([^\s]+) ON '.$tablename.' USING ([^\s]+) \(([^\)]+)\)/i', $row['indexdef'], $matches)) {
358 continue;
360 if ($matches[4] === 'id') {
361 continue;
363 $columns = explode(',', $matches[4]);
364 foreach ($columns as $k=>$column) {
365 $column = trim($column);
366 if ($pos = strpos($column, ' ')) {
367 // index type is separated by space
368 $column = substr($column, 0, $pos);
370 $columns[$k] = $this->trim_quotes($column);
372 $indexes[$row['indexname']] = array('unique'=>!empty($matches[1]),
373 'columns'=>$columns);
375 pg_free_result($result);
377 return $indexes;
381 * Returns detailed information about columns in table. This information is cached internally.
382 * @param string $table name
383 * @param bool $usecache
384 * @return array array of database_column_info objects indexed with column names
386 public function get_columns($table, $usecache=true) {
387 if ($usecache) {
388 $properties = array('dbfamily' => $this->get_dbfamily(), 'settings' => $this->get_settings_hash());
389 $cache = cache::make('core', 'databasemeta', $properties);
390 if ($data = $cache->get($table)) {
391 return $data;
395 $structure = array();
397 $tablename = $this->prefix.$table;
399 $sql = "SELECT a.attnum, a.attname AS field, t.typname AS type, a.attlen, a.atttypmod, a.attnotnull, a.atthasdef, d.adsrc
400 FROM pg_catalog.pg_class c
401 JOIN pg_catalog.pg_namespace as ns ON ns.oid = c.relnamespace
402 JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid
403 JOIN pg_catalog.pg_type t ON t.oid = a.atttypid
404 LEFT JOIN pg_catalog.pg_attrdef d ON (d.adrelid = c.oid AND d.adnum = a.attnum)
405 WHERE relkind = 'r' AND c.relname = '$tablename' AND c.reltype > 0 AND a.attnum > 0
406 AND (ns.nspname = current_schema() OR ns.oid = pg_my_temp_schema())
407 ORDER BY a.attnum";
409 $this->query_start($sql, null, SQL_QUERY_AUX);
410 $result = pg_query($this->pgsql, $sql);
411 $this->query_end($result);
413 if (!$result) {
414 return array();
416 while ($rawcolumn = pg_fetch_object($result)) {
418 $info = new stdClass();
419 $info->name = $rawcolumn->field;
420 $matches = null;
422 if ($rawcolumn->type === 'varchar') {
423 $info->type = 'varchar';
424 $info->meta_type = 'C';
425 $info->max_length = $rawcolumn->atttypmod - 4;
426 $info->scale = null;
427 $info->not_null = ($rawcolumn->attnotnull === 't');
428 $info->has_default = ($rawcolumn->atthasdef === 't');
429 if ($info->has_default) {
430 $parts = explode('::', $rawcolumn->adsrc);
431 if (count($parts) > 1) {
432 $info->default_value = reset($parts);
433 $info->default_value = trim($info->default_value, "'");
434 } else {
435 $info->default_value = $rawcolumn->adsrc;
437 } else {
438 $info->default_value = null;
440 $info->primary_key = false;
441 $info->binary = false;
442 $info->unsigned = null;
443 $info->auto_increment= false;
444 $info->unique = null;
446 } else if (preg_match('/int(\d)/i', $rawcolumn->type, $matches)) {
447 $info->type = 'int';
448 if (strpos($rawcolumn->adsrc, 'nextval') === 0) {
449 $info->primary_key = true;
450 $info->meta_type = 'R';
451 $info->unique = true;
452 $info->auto_increment= true;
453 $info->has_default = false;
454 } else {
455 $info->primary_key = false;
456 $info->meta_type = 'I';
457 $info->unique = null;
458 $info->auto_increment= false;
459 $info->has_default = ($rawcolumn->atthasdef === 't');
461 // Return number of decimals, not bytes here.
462 if ($matches[1] >= 8) {
463 $info->max_length = 18;
464 } else if ($matches[1] >= 4) {
465 $info->max_length = 9;
466 } else if ($matches[1] >= 2) {
467 $info->max_length = 4;
468 } else if ($matches[1] >= 1) {
469 $info->max_length = 2;
470 } else {
471 $info->max_length = 0;
473 $info->scale = null;
474 $info->not_null = ($rawcolumn->attnotnull === 't');
475 if ($info->has_default) {
476 $info->default_value = trim($rawcolumn->adsrc, '()');
477 } else {
478 $info->default_value = null;
480 $info->binary = false;
481 $info->unsigned = false;
483 } else if ($rawcolumn->type === 'numeric') {
484 $info->type = $rawcolumn->type;
485 $info->meta_type = 'N';
486 $info->primary_key = false;
487 $info->binary = false;
488 $info->unsigned = null;
489 $info->auto_increment= false;
490 $info->unique = null;
491 $info->not_null = ($rawcolumn->attnotnull === 't');
492 $info->has_default = ($rawcolumn->atthasdef === 't');
493 if ($info->has_default) {
494 $info->default_value = trim($rawcolumn->adsrc, '()');
495 } else {
496 $info->default_value = null;
498 $info->max_length = $rawcolumn->atttypmod >> 16;
499 $info->scale = ($rawcolumn->atttypmod & 0xFFFF) - 4;
501 } else if (preg_match('/float(\d)/i', $rawcolumn->type, $matches)) {
502 $info->type = 'float';
503 $info->meta_type = 'N';
504 $info->primary_key = false;
505 $info->binary = false;
506 $info->unsigned = null;
507 $info->auto_increment= false;
508 $info->unique = null;
509 $info->not_null = ($rawcolumn->attnotnull === 't');
510 $info->has_default = ($rawcolumn->atthasdef === 't');
511 if ($info->has_default) {
512 $info->default_value = trim($rawcolumn->adsrc, '()');
513 } else {
514 $info->default_value = null;
516 // just guess expected number of deciaml places :-(
517 if ($matches[1] == 8) {
518 // total 15 digits
519 $info->max_length = 8;
520 $info->scale = 7;
521 } else {
522 // total 6 digits
523 $info->max_length = 4;
524 $info->scale = 2;
527 } else if ($rawcolumn->type === 'text') {
528 $info->type = $rawcolumn->type;
529 $info->meta_type = 'X';
530 $info->max_length = -1;
531 $info->scale = null;
532 $info->not_null = ($rawcolumn->attnotnull === 't');
533 $info->has_default = ($rawcolumn->atthasdef === 't');
534 if ($info->has_default) {
535 $parts = explode('::', $rawcolumn->adsrc);
536 if (count($parts) > 1) {
537 $info->default_value = reset($parts);
538 $info->default_value = trim($info->default_value, "'");
539 } else {
540 $info->default_value = $rawcolumn->adsrc;
542 } else {
543 $info->default_value = null;
545 $info->primary_key = false;
546 $info->binary = false;
547 $info->unsigned = null;
548 $info->auto_increment= false;
549 $info->unique = null;
551 } else if ($rawcolumn->type === 'bytea') {
552 $info->type = $rawcolumn->type;
553 $info->meta_type = 'B';
554 $info->max_length = -1;
555 $info->scale = null;
556 $info->not_null = ($rawcolumn->attnotnull === 't');
557 $info->has_default = false;
558 $info->default_value = null;
559 $info->primary_key = false;
560 $info->binary = true;
561 $info->unsigned = null;
562 $info->auto_increment= false;
563 $info->unique = null;
567 $structure[$info->name] = new database_column_info($info);
570 pg_free_result($result);
572 if ($usecache) {
573 $cache->set($table, $structure);
576 return $structure;
580 * Normalise values based in RDBMS dependencies (booleans, LOBs...)
582 * @param database_column_info $column column metadata corresponding with the value we are going to normalise
583 * @param mixed $value value we are going to normalise
584 * @return mixed the normalised value
586 protected function normalise_value($column, $value) {
587 $this->detect_objects($value);
589 if (is_bool($value)) { // Always, convert boolean to int
590 $value = (int)$value;
592 } else if ($column->meta_type === 'B') { // BLOB detected, we return 'blob' array instead of raw value to allow
593 if (!is_null($value)) { // binding/executing code later to know about its nature
594 $value = array('blob' => $value);
597 } else if ($value === '') {
598 if ($column->meta_type === 'I' or $column->meta_type === 'F' or $column->meta_type === 'N') {
599 $value = 0; // prevent '' problems in numeric fields
602 return $value;
606 * Is db in unicode mode?
607 * @return bool
609 public function setup_is_unicodedb() {
610 // Get PostgreSQL server_encoding value
611 $sql = "SHOW server_encoding";
612 $this->query_start($sql, null, SQL_QUERY_AUX);
613 $result = pg_query($this->pgsql, $sql);
614 $this->query_end($result);
616 if (!$result) {
617 return false;
619 $rawcolumn = pg_fetch_object($result);
620 $encoding = $rawcolumn->server_encoding;
621 pg_free_result($result);
623 return (strtoupper($encoding) == 'UNICODE' || strtoupper($encoding) == 'UTF8');
627 * Do NOT use in code, to be used by database_manager only!
628 * @param string $sql query
629 * @return bool true
630 * @throws dml_exception A DML specific exception is thrown for any errors.
632 public function change_database_structure($sql) {
633 $this->reset_caches();
635 $this->query_start($sql, null, SQL_QUERY_STRUCTURE);
636 $result = pg_query($this->pgsql, $sql);
637 $this->query_end($result);
639 pg_free_result($result);
640 return true;
644 * Execute general sql query. Should be used only when no other method suitable.
645 * Do NOT use this to make changes in db structure, use database_manager methods instead!
646 * @param string $sql query
647 * @param array $params query parameters
648 * @return bool true
649 * @throws dml_exception A DML specific exception is thrown for any errors.
651 public function execute($sql, array $params=null) {
652 list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
654 if (strpos($sql, ';') !== false) {
655 throw new coding_exception('moodle_database::execute() Multiple sql statements found or bound parameters not used properly in query!');
658 $this->query_start($sql, $params, SQL_QUERY_UPDATE);
659 $result = pg_query_params($this->pgsql, $sql, $params);
660 $this->query_end($result);
662 pg_free_result($result);
663 return true;
667 * Get a number of records as a moodle_recordset using a SQL statement.
669 * Since this method is a little less readable, use of it should be restricted to
670 * code where it's possible there might be large datasets being returned. For known
671 * small datasets use get_records_sql - it leads to simpler code.
673 * The return type is like:
674 * @see function get_recordset.
676 * @param string $sql the SQL select query to execute.
677 * @param array $params array of sql parameters
678 * @param int $limitfrom return a subset of records, starting at this point (optional, required if $limitnum is set).
679 * @param int $limitnum return a subset comprising this many records (optional, required if $limitfrom is set).
680 * @return moodle_recordset instance
681 * @throws dml_exception A DML specific exception is thrown for any errors.
683 public function get_recordset_sql($sql, array $params=null, $limitfrom=0, $limitnum=0) {
684 $limitfrom = (int)$limitfrom;
685 $limitnum = (int)$limitnum;
686 $limitfrom = ($limitfrom < 0) ? 0 : $limitfrom;
687 $limitnum = ($limitnum < 0) ? 0 : $limitnum;
688 if ($limitfrom or $limitnum) {
689 if ($limitnum < 1) {
690 $limitnum = "ALL";
691 } else if (PHP_INT_MAX - $limitnum < $limitfrom) {
692 // this is a workaround for weird max int problem
693 $limitnum = "ALL";
695 $sql .= " LIMIT $limitnum OFFSET $limitfrom";
698 list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
700 $this->query_start($sql, $params, SQL_QUERY_SELECT);
701 $result = pg_query_params($this->pgsql, $sql, $params);
702 $this->query_end($result);
704 return $this->create_recordset($result);
707 protected function create_recordset($result) {
708 return new pgsql_native_moodle_recordset($result, $this->bytea_oid);
712 * Get a number of records as an array of objects using a SQL statement.
714 * Return value is like:
715 * @see function get_records.
717 * @param string $sql the SQL select query to execute. The first column of this SELECT statement
718 * must be a unique value (usually the 'id' field), as it will be used as the key of the
719 * returned array.
720 * @param array $params array of sql parameters
721 * @param int $limitfrom return a subset of records, starting at this point (optional, required if $limitnum is set).
722 * @param int $limitnum return a subset comprising this many records (optional, required if $limitfrom is set).
723 * @return array of objects, or empty array if no records were found
724 * @throws dml_exception A DML specific exception is thrown for any errors.
726 public function get_records_sql($sql, array $params=null, $limitfrom=0, $limitnum=0) {
727 $limitfrom = (int)$limitfrom;
728 $limitnum = (int)$limitnum;
729 $limitfrom = ($limitfrom < 0) ? 0 : $limitfrom;
730 $limitnum = ($limitnum < 0) ? 0 : $limitnum;
731 if ($limitfrom or $limitnum) {
732 if ($limitnum < 1) {
733 $limitnum = "ALL";
734 } else if (PHP_INT_MAX - $limitnum < $limitfrom) {
735 // this is a workaround for weird max int problem
736 $limitnum = "ALL";
738 $sql .= " LIMIT $limitnum OFFSET $limitfrom";
741 list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
742 $this->query_start($sql, $params, SQL_QUERY_SELECT);
743 $result = pg_query_params($this->pgsql, $sql, $params);
744 $this->query_end($result);
746 // find out if there are any blobs
747 $numrows = pg_num_fields($result);
748 $blobs = array();
749 for($i=0; $i<$numrows; $i++) {
750 $type_oid = pg_field_type_oid($result, $i);
751 if ($type_oid == $this->bytea_oid) {
752 $blobs[] = pg_field_name($result, $i);
756 $rows = pg_fetch_all($result);
757 pg_free_result($result);
759 $return = array();
760 if ($rows) {
761 foreach ($rows as $row) {
762 $id = reset($row);
763 if ($blobs) {
764 foreach ($blobs as $blob) {
765 // note: in PostgreSQL 9.0 the returned blobs are hexencoded by default - see http://www.postgresql.org/docs/9.0/static/runtime-config-client.html#GUC-BYTEA-OUTPUT
766 $row[$blob] = $row[$blob] !== null ? pg_unescape_bytea($row[$blob]) : null;
769 if (isset($return[$id])) {
770 $colname = key($row);
771 debugging("Did you remember to make the first column something unique in your call to get_records? Duplicate value '$id' found in column '$colname'.", DEBUG_DEVELOPER);
773 $return[$id] = (object)$row;
777 return $return;
781 * Selects records and return values (first field) as an array using a SQL statement.
783 * @param string $sql The SQL query
784 * @param array $params array of sql parameters
785 * @return array of values
786 * @throws dml_exception A DML specific exception is thrown for any errors.
788 public function get_fieldset_sql($sql, array $params=null) {
789 list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
791 $this->query_start($sql, $params, SQL_QUERY_SELECT);
792 $result = pg_query_params($this->pgsql, $sql, $params);
793 $this->query_end($result);
795 $return = pg_fetch_all_columns($result, 0);
796 pg_free_result($result);
798 return $return;
802 * Insert new record into database, as fast as possible, no safety checks, lobs not supported.
803 * @param string $table name
804 * @param mixed $params data record as object or array
805 * @param bool $returnit return it of inserted record
806 * @param bool $bulk true means repeated inserts expected
807 * @param bool $customsequence true if 'id' included in $params, disables $returnid
808 * @return bool|int true or new id
809 * @throws dml_exception A DML specific exception is thrown for any errors.
811 public function insert_record_raw($table, $params, $returnid=true, $bulk=false, $customsequence=false) {
812 if (!is_array($params)) {
813 $params = (array)$params;
816 $returning = "";
818 if ($customsequence) {
819 if (!isset($params['id'])) {
820 throw new coding_exception('moodle_database::insert_record_raw() id field must be specified if custom sequences used.');
822 $returnid = false;
823 } else {
824 if ($returnid) {
825 $returning = "RETURNING id";
826 unset($params['id']);
827 } else {
828 unset($params['id']);
832 if (empty($params)) {
833 throw new coding_exception('moodle_database::insert_record_raw() no fields found.');
836 $fields = implode(',', array_keys($params));
837 $values = array();
838 $i = 1;
839 foreach ($params as $value) {
840 $this->detect_objects($value);
841 $values[] = "\$".$i++;
843 $values = implode(',', $values);
845 $sql = "INSERT INTO {$this->prefix}$table ($fields) VALUES($values) $returning";
846 $this->query_start($sql, $params, SQL_QUERY_INSERT);
847 $result = pg_query_params($this->pgsql, $sql, $params);
848 $this->query_end($result);
850 if ($returning !== "") {
851 $row = pg_fetch_assoc($result);
852 $params['id'] = reset($row);
854 pg_free_result($result);
856 if (!$returnid) {
857 return true;
860 return (int)$params['id'];
864 * Insert a record into a table and return the "id" field if required.
866 * Some conversions and safety checks are carried out. Lobs are supported.
867 * If the return ID isn't required, then this just reports success as true/false.
868 * $data is an object containing needed data
869 * @param string $table The database table to be inserted into
870 * @param object $data A data object with values for one or more fields in the record
871 * @param bool $returnid Should the id of the newly created record entry be returned? If this option is not requested then true/false is returned.
872 * @return bool|int true or new id
873 * @throws dml_exception A DML specific exception is thrown for any errors.
875 public function insert_record($table, $dataobject, $returnid=true, $bulk=false) {
876 $dataobject = (array)$dataobject;
878 $columns = $this->get_columns($table);
879 $cleaned = array();
880 $blobs = array();
882 foreach ($dataobject as $field=>$value) {
883 if ($field === 'id') {
884 continue;
886 if (!isset($columns[$field])) {
887 continue;
889 $column = $columns[$field];
890 $normalised_value = $this->normalise_value($column, $value);
891 if (is_array($normalised_value) && array_key_exists('blob', $normalised_value)) {
892 $cleaned[$field] = '@#BLOB#@';
893 $blobs[$field] = $normalised_value['blob'];
894 } else {
895 $cleaned[$field] = $normalised_value;
899 if (empty($blobs)) {
900 return $this->insert_record_raw($table, $cleaned, $returnid, $bulk);
903 $id = $this->insert_record_raw($table, $cleaned, true, $bulk);
905 foreach ($blobs as $key=>$value) {
906 $value = pg_escape_bytea($this->pgsql, $value);
907 $sql = "UPDATE {$this->prefix}$table SET $key = '$value'::bytea WHERE id = $id";
908 $this->query_start($sql, NULL, SQL_QUERY_UPDATE);
909 $result = pg_query($this->pgsql, $sql);
910 $this->query_end($result);
911 if ($result !== false) {
912 pg_free_result($result);
916 return ($returnid ? $id : true);
921 * Import a record into a table, id field is required.
922 * Safety checks are NOT carried out. Lobs are supported.
924 * @param string $table name of database table to be inserted into
925 * @param object $dataobject A data object with values for one or more fields in the record
926 * @return bool true
927 * @throws dml_exception A DML specific exception is thrown for any errors.
929 public function import_record($table, $dataobject) {
930 $dataobject = (array)$dataobject;
932 $columns = $this->get_columns($table);
933 $cleaned = array();
934 $blobs = array();
936 foreach ($dataobject as $field=>$value) {
937 $this->detect_objects($value);
938 if (!isset($columns[$field])) {
939 continue;
941 if ($columns[$field]->meta_type === 'B') {
942 if (!is_null($value)) {
943 $cleaned[$field] = '@#BLOB#@';
944 $blobs[$field] = $value;
945 continue;
949 $cleaned[$field] = $value;
952 $this->insert_record_raw($table, $cleaned, false, true, true);
953 $id = $dataobject['id'];
955 foreach ($blobs as $key=>$value) {
956 $value = pg_escape_bytea($this->pgsql, $value);
957 $sql = "UPDATE {$this->prefix}$table SET $key = '$value'::bytea WHERE id = $id";
958 $this->query_start($sql, NULL, SQL_QUERY_UPDATE);
959 $result = pg_query($this->pgsql, $sql);
960 $this->query_end($result);
961 if ($result !== false) {
962 pg_free_result($result);
966 return true;
970 * Update record in database, as fast as possible, no safety checks, lobs not supported.
971 * @param string $table name
972 * @param mixed $params data record as object or array
973 * @param bool true means repeated updates expected
974 * @return bool true
975 * @throws dml_exception A DML specific exception is thrown for any errors.
977 public function update_record_raw($table, $params, $bulk=false) {
978 $params = (array)$params;
980 if (!isset($params['id'])) {
981 throw new coding_exception('moodle_database::update_record_raw() id field must be specified.');
983 $id = $params['id'];
984 unset($params['id']);
986 if (empty($params)) {
987 throw new coding_exception('moodle_database::update_record_raw() no fields found.');
990 $i = 1;
992 $sets = array();
993 foreach ($params as $field=>$value) {
994 $this->detect_objects($value);
995 $sets[] = "$field = \$".$i++;
998 $params[] = $id; // last ? in WHERE condition
1000 $sets = implode(',', $sets);
1001 $sql = "UPDATE {$this->prefix}$table SET $sets WHERE id=\$".$i;
1003 $this->query_start($sql, $params, SQL_QUERY_UPDATE);
1004 $result = pg_query_params($this->pgsql, $sql, $params);
1005 $this->query_end($result);
1007 pg_free_result($result);
1008 return true;
1012 * Update a record in a table
1014 * $dataobject is an object containing needed data
1015 * Relies on $dataobject having a variable "id" to
1016 * specify the record to update
1018 * @param string $table The database table to be checked against.
1019 * @param object $dataobject An object with contents equal to fieldname=>fieldvalue. Must have an entry for 'id' to map to the table specified.
1020 * @param bool true means repeated updates expected
1021 * @return bool true
1022 * @throws dml_exception A DML specific exception is thrown for any errors.
1024 public function update_record($table, $dataobject, $bulk=false) {
1025 $dataobject = (array)$dataobject;
1027 $columns = $this->get_columns($table);
1028 $cleaned = array();
1029 $blobs = array();
1031 foreach ($dataobject as $field=>$value) {
1032 if (!isset($columns[$field])) {
1033 continue;
1035 $column = $columns[$field];
1036 $normalised_value = $this->normalise_value($column, $value);
1037 if (is_array($normalised_value) && array_key_exists('blob', $normalised_value)) {
1038 $cleaned[$field] = '@#BLOB#@';
1039 $blobs[$field] = $normalised_value['blob'];
1040 } else {
1041 $cleaned[$field] = $normalised_value;
1045 $this->update_record_raw($table, $cleaned, $bulk);
1047 if (empty($blobs)) {
1048 return true;
1051 $id = (int)$dataobject['id'];
1053 foreach ($blobs as $key=>$value) {
1054 $value = pg_escape_bytea($this->pgsql, $value);
1055 $sql = "UPDATE {$this->prefix}$table SET $key = '$value'::bytea WHERE id = $id";
1056 $this->query_start($sql, NULL, SQL_QUERY_UPDATE);
1057 $result = pg_query($this->pgsql, $sql);
1058 $this->query_end($result);
1060 pg_free_result($result);
1063 return true;
1067 * Set a single field in every table record which match a particular WHERE clause.
1069 * @param string $table The database table to be checked against.
1070 * @param string $newfield the field to set.
1071 * @param string $newvalue the value to set the field to.
1072 * @param string $select A fragment of SQL to be used in a where clause in the SQL call.
1073 * @param array $params array of sql parameters
1074 * @return bool true
1075 * @throws dml_exception A DML specific exception is thrown for any errors.
1077 public function set_field_select($table, $newfield, $newvalue, $select, array $params=null) {
1079 if ($select) {
1080 $select = "WHERE $select";
1082 if (is_null($params)) {
1083 $params = array();
1085 list($select, $params, $type) = $this->fix_sql_params($select, $params);
1086 $i = count($params)+1;
1088 // Get column metadata
1089 $columns = $this->get_columns($table);
1090 $column = $columns[$newfield];
1092 $normalised_value = $this->normalise_value($column, $newvalue);
1093 if (is_array($normalised_value) && array_key_exists('blob', $normalised_value)) {
1094 // Update BYTEA and return
1095 $normalised_value = pg_escape_bytea($this->pgsql, $normalised_value['blob']);
1096 $sql = "UPDATE {$this->prefix}$table SET $newfield = '$normalised_value'::bytea $select";
1097 $this->query_start($sql, NULL, SQL_QUERY_UPDATE);
1098 $result = pg_query_params($this->pgsql, $sql, $params);
1099 $this->query_end($result);
1100 pg_free_result($result);
1101 return true;
1104 if (is_null($normalised_value)) {
1105 $newfield = "$newfield = NULL";
1106 } else {
1107 $newfield = "$newfield = \$".$i;
1108 $params[] = $normalised_value;
1110 $sql = "UPDATE {$this->prefix}$table SET $newfield $select";
1112 $this->query_start($sql, $params, SQL_QUERY_UPDATE);
1113 $result = pg_query_params($this->pgsql, $sql, $params);
1114 $this->query_end($result);
1116 pg_free_result($result);
1118 return true;
1122 * Delete one or more records from a table which match a particular WHERE clause.
1124 * @param string $table The database table to be checked against.
1125 * @param string $select A fragment of SQL to be used in a where clause in the SQL call (used to define the selection criteria).
1126 * @param array $params array of sql parameters
1127 * @return bool true
1128 * @throws dml_exception A DML specific exception is thrown for any errors.
1130 public function delete_records_select($table, $select, array $params=null) {
1131 if ($select) {
1132 $select = "WHERE $select";
1134 $sql = "DELETE FROM {$this->prefix}$table $select";
1136 list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
1138 $this->query_start($sql, $params, SQL_QUERY_UPDATE);
1139 $result = pg_query_params($this->pgsql, $sql, $params);
1140 $this->query_end($result);
1142 pg_free_result($result);
1144 return true;
1148 * Returns 'LIKE' part of a query.
1150 * @param string $fieldname usually name of the table column
1151 * @param string $param usually bound query parameter (?, :named)
1152 * @param bool $casesensitive use case sensitive search
1153 * @param bool $accensensitive use accent sensitive search (not all databases support accent insensitive)
1154 * @param bool $notlike true means "NOT LIKE"
1155 * @param string $escapechar escape char for '%' and '_'
1156 * @return string SQL code fragment
1158 public function sql_like($fieldname, $param, $casesensitive = true, $accentsensitive = true, $notlike = false, $escapechar = '\\') {
1159 if (strpos($param, '%') !== false) {
1160 debugging('Potential SQL injection detected, sql_like() expects bound parameters (? or :named)');
1162 if ($escapechar === '\\') {
1163 // Prevents problems with C-style escapes of enclosing '\',
1164 // E'... bellow prevents compatibility warnings.
1165 $escapechar = '\\\\';
1168 // postgresql does not support accent insensitive text comparisons, sorry
1169 if ($casesensitive) {
1170 $LIKE = $notlike ? 'NOT LIKE' : 'LIKE';
1171 } else {
1172 $LIKE = $notlike ? 'NOT ILIKE' : 'ILIKE';
1174 return "$fieldname $LIKE $param ESCAPE E'$escapechar'";
1177 public function sql_bitxor($int1, $int2) {
1178 return '((' . $int1 . ') # (' . $int2 . '))';
1181 public function sql_cast_char2int($fieldname, $text=false) {
1182 return ' CAST(' . $fieldname . ' AS INT) ';
1185 public function sql_cast_char2real($fieldname, $text=false) {
1186 return " $fieldname::real ";
1189 public function sql_concat() {
1190 $arr = func_get_args();
1191 $s = implode(' || ', $arr);
1192 if ($s === '') {
1193 return " '' ";
1195 // Add always empty string element so integer-exclusive concats
1196 // will work without needing to cast each element explicitly
1197 return " '' || $s ";
1200 public function sql_concat_join($separator="' '", $elements=array()) {
1201 for ($n=count($elements)-1; $n > 0 ; $n--) {
1202 array_splice($elements, $n, 0, $separator);
1204 $s = implode(' || ', $elements);
1205 if ($s === '') {
1206 return " '' ";
1208 return " $s ";
1211 public function sql_regex_supported() {
1212 return true;
1215 public function sql_regex($positivematch=true) {
1216 return $positivematch ? '~*' : '!~*';
1219 public function session_lock_supported() {
1220 return true;
1224 * Obtain session lock
1225 * @param int $rowid id of the row with session record
1226 * @param int $timeout max allowed time to wait for the lock in seconds
1227 * @return bool success
1229 public function get_session_lock($rowid, $timeout) {
1230 // NOTE: there is a potential locking problem for database running
1231 // multiple instances of moodle, we could try to use pg_advisory_lock(int, int),
1232 // luckily there is not a big chance that they would collide
1233 if (!$this->session_lock_supported()) {
1234 return;
1237 parent::get_session_lock($rowid, $timeout);
1239 $timeoutmilli = $timeout * 1000;
1241 $sql = "SET statement_timeout TO $timeoutmilli";
1242 $this->query_start($sql, null, SQL_QUERY_AUX);
1243 $result = pg_query($this->pgsql, $sql);
1244 $this->query_end($result);
1246 if ($result) {
1247 pg_free_result($result);
1250 $sql = "SELECT pg_advisory_lock($rowid)";
1251 $this->query_start($sql, null, SQL_QUERY_AUX);
1252 $start = time();
1253 $result = pg_query($this->pgsql, $sql);
1254 $end = time();
1255 try {
1256 $this->query_end($result);
1257 } catch (dml_exception $ex) {
1258 if ($end - $start >= $timeout) {
1259 throw new dml_sessionwait_exception();
1260 } else {
1261 throw $ex;
1265 if ($result) {
1266 pg_free_result($result);
1269 $sql = "SET statement_timeout TO DEFAULT";
1270 $this->query_start($sql, null, SQL_QUERY_AUX);
1271 $result = pg_query($this->pgsql, $sql);
1272 $this->query_end($result);
1274 if ($result) {
1275 pg_free_result($result);
1279 public function release_session_lock($rowid) {
1280 if (!$this->session_lock_supported()) {
1281 return;
1283 if (!$this->used_for_db_sessions) {
1284 return;
1287 parent::release_session_lock($rowid);
1289 $sql = "SELECT pg_advisory_unlock($rowid)";
1290 $this->query_start($sql, null, SQL_QUERY_AUX);
1291 $result = pg_query($this->pgsql, $sql);
1292 $this->query_end($result);
1294 if ($result) {
1295 pg_free_result($result);
1300 * Driver specific start of real database transaction,
1301 * this can not be used directly in code.
1302 * @return void
1304 protected function begin_transaction() {
1305 $this->savepointpresent = true;
1306 $sql = "BEGIN ISOLATION LEVEL READ COMMITTED; SAVEPOINT moodle_pg_savepoint";
1307 $this->query_start($sql, NULL, SQL_QUERY_AUX);
1308 $result = pg_query($this->pgsql, $sql);
1309 $this->query_end($result);
1311 pg_free_result($result);
1315 * Driver specific commit of real database transaction,
1316 * this can not be used directly in code.
1317 * @return void
1319 protected function commit_transaction() {
1320 $this->savepointpresent = false;
1321 $sql = "RELEASE SAVEPOINT moodle_pg_savepoint; COMMIT";
1322 $this->query_start($sql, NULL, SQL_QUERY_AUX);
1323 $result = pg_query($this->pgsql, $sql);
1324 $this->query_end($result);
1326 pg_free_result($result);
1330 * Driver specific abort of real database transaction,
1331 * this can not be used directly in code.
1332 * @return void
1334 protected function rollback_transaction() {
1335 $this->savepointpresent = false;
1336 $sql = "RELEASE SAVEPOINT moodle_pg_savepoint; ROLLBACK";
1337 $this->query_start($sql, NULL, SQL_QUERY_AUX);
1338 $result = pg_query($this->pgsql, $sql);
1339 $this->query_end($result);
1341 pg_free_result($result);
1345 * Helper function trimming (whitespace + quotes) any string
1346 * needed because PG uses to enclose with double quotes some
1347 * fields in indexes definition and others
1349 * @param string $str string to apply whitespace + quotes trim
1350 * @return string trimmed string
1352 private function trim_quotes($str) {
1353 return trim(trim($str), "'\"");