From 5dadb4917b74848f29bd9e7378d0e63e1e1cd95b Mon Sep 17 00:00:00 2001 From: Jay Edgar Date: Wed, 7 Sep 2022 09:48:56 -0700 Subject: [PATCH] Add synchronous connection pool option Summary: Add the sync connection pool code. This takes advantage of all the previous refactoring work. The synchronous pool attempts to use as much of the original code as possible. Reviewed By: jupyung Differential Revision: D38913824 fbshipit-source-id: 1cf6e850deb2c10ab59a12d64c9ca3b7aadfd8a1 --- .../squangle/mysql_client/AsyncConnectionPool.cpp | 5 +- .../squangle/mysql_client/AsyncConnectionPool.h | 6 ++ .../src/squangle/mysql_client/AsyncMysqlClient.h | 5 + .../src/squangle/mysql_client/ConnectionPool.h | 48 +++++++++- .../src/squangle/mysql_client/MysqlClientBase.h | 2 +- .../squangle/src/squangle/mysql_client/Operation.h | 5 + .../src/squangle/mysql_client/PoolStorage.h | 25 +++++ .../squangle/mysql_client/SyncConnectionPool.cpp | 88 +++++++++++++++++ .../src/squangle/mysql_client/SyncConnectionPool.h | 106 +++++++++++++++++++++ .../src/squangle/mysql_client/SyncMysqlClient.cpp | 37 +++++++ .../src/squangle/mysql_client/SyncMysqlClient.h | 24 ++++- 11 files changed, 339 insertions(+), 12 deletions(-) create mode 100644 third-party/squangle/src/squangle/mysql_client/SyncConnectionPool.cpp create mode 100644 third-party/squangle/src/squangle/mysql_client/SyncConnectionPool.h diff --git a/third-party/squangle/src/squangle/mysql_client/AsyncConnectionPool.cpp b/third-party/squangle/src/squangle/mysql_client/AsyncConnectionPool.cpp index dbeade75054..c898802f3d9 100644 --- a/third-party/squangle/src/squangle/mysql_client/AsyncConnectionPool.cpp +++ b/third-party/squangle/src/squangle/mysql_client/AsyncConnectionPool.cpp @@ -157,8 +157,7 @@ std::unique_ptr AsyncConnectionPool::connect( } template <> -ConnectPoolOperation* -ConnectPoolOperation::specializedRun() { +AsyncConnectPoolOperation* AsyncConnectPoolOperation::specializedRun() { std::weak_ptr weakSelf = getSharedPointer(); if (!client()->runInThread([weakSelf]() { // There is a race confition that allows a cancelled or completed @@ -202,7 +201,7 @@ std::ostream& operator<<(std::ostream& os, PoolKey key) { } template <> -std::string ConnectPoolOperation::createTimeoutErrorMessage( +std::string AsyncConnectPoolOperation::createTimeoutErrorMessage( const PoolKeyStats& pool_key_stats, size_t per_key_limit) { auto delta = std::chrono::duration_cast( diff --git a/third-party/squangle/src/squangle/mysql_client/AsyncConnectionPool.h b/third-party/squangle/src/squangle/mysql_client/AsyncConnectionPool.h index 4049c73d7dc..852bd1e7f4f 100644 --- a/third-party/squangle/src/squangle/mysql_client/AsyncConnectionPool.h +++ b/third-party/squangle/src/squangle/mysql_client/AsyncConnectionPool.h @@ -168,6 +168,12 @@ class AsyncConnectionPool : public ConnectionPool { return shutdown_data_.rlock()->shutting_down; } + // Nothing needed here - the async connection pool should not wait. + void openNewConnectionPrep(AsyncConnectPoolOperation& /*pool_op*/) override {} + void openNewConnectionFinish( + AsyncConnectPoolOperation& /*pool_op*/, + const PoolKey& /*pool_key*/) override {} + AsyncConnectionPool(const AsyncConnectionPool&) = delete; AsyncConnectionPool& operator=(const AsyncConnectionPool&) = delete; }; diff --git a/third-party/squangle/src/squangle/mysql_client/AsyncMysqlClient.h b/third-party/squangle/src/squangle/mysql_client/AsyncMysqlClient.h index 710edc2339d..1e1ebbbdbe5 100644 --- a/third-party/squangle/src/squangle/mysql_client/AsyncMysqlClient.h +++ b/third-party/squangle/src/squangle/mysql_client/AsyncMysqlClient.h @@ -79,6 +79,7 @@ namespace facebook { namespace common { namespace mysql_client { +class AsyncConnection; class AsyncMysqlClient; class SyncMysqlClient; class Operation; @@ -91,6 +92,10 @@ class MysqlConnectionHolder; // and use the client it returns, which is shared process-wide. class AsyncMysqlClient : public MysqlClientBase { public: + // Having this type (`uses_one_thread`) tells the pool storage later that we + // don't need synchronization - see PoolStorage.h + using uses_one_thread = void; + AsyncMysqlClient(); ~AsyncMysqlClient() override; diff --git a/third-party/squangle/src/squangle/mysql_client/ConnectionPool.h b/third-party/squangle/src/squangle/mysql_client/ConnectionPool.h index fc80d132f01..58010e1537d 100644 --- a/third-party/squangle/src/squangle/mysql_client/ConnectionPool.h +++ b/third-party/squangle/src/squangle/mysql_client/ConnectionPool.h @@ -10,6 +10,7 @@ #include #include +#include #include #include @@ -593,6 +594,9 @@ class ConnectionPool rawPoolOp->getSharedPointer()); // Sanity check DCHECK(pool_op != nullptr); + + openNewConnectionPrep(*pool_op); + conn_storage_.queueOperation(poolKey, pool_op); // Propagate the ConnectionContext from the incoming operation. These // contexts contain application specific logging that will be lost if not @@ -600,6 +604,8 @@ class ConnectionPool // miss. Propagating the context pointer ensures that both operations are // logged with the expected additional logging tryRequestNewConnection(poolKey, pool_op->connection_context_); + + openNewConnectionFinish(*pool_op, poolKey); } void resetConnection( @@ -752,6 +758,12 @@ class ConnectionPool connectionSpotFreed(pool_key); } + virtual void openNewConnectionPrep(ConnectPoolOperation& pool_op) = 0; + + virtual void openNewConnectionFinish( + ConnectPoolOperation& pool_op, + const PoolKey& pool_key) = 0; + virtual std::unique_ptr makeNewConnection( const ConnectionKey& conn_key, std::unique_ptr> mysqlConn) = 0; @@ -847,6 +859,8 @@ class ConnectPoolOperation : public ConnectOperation { private: friend class ConnectionPool; friend class PoolStorageData; + friend class AsyncConnectionPool; + friend class SyncConnectionPool; void specializedRunImpl() { // Initialize all we need from our tevent handler @@ -867,9 +881,11 @@ class ConnectPoolOperation : public ConnectOperation { return; } - conn()->socketHandler()->scheduleTimeout( - std::chrono::duration_cast(end - now) - .count()); + if constexpr (uses_one_thread_v) { + conn()->socketHandler()->scheduleTimeout( + std::chrono::duration_cast(end - now) + .count()); + } // Remove before to not count against itself removeClientReference(); @@ -888,7 +904,7 @@ class ConnectPoolOperation : public ConnectOperation { // Called when the connection is matched by the pool client void connectionCallback( std::unique_ptr> mysql_conn) { - DCHECK(client()->getEventBase()->isInEventBaseThread()); + // TODO: validate we are in the correct thread (for async) if (!mysql_conn) { LOG(DFATAL) << "Unexpected error"; @@ -921,6 +937,8 @@ class ConnectPoolOperation : public ConnectOperation { VLOG(2) << "Error: Failed to acquire connection"; attemptFailed(OperationResult::Failed); } + + signalWaiter(); } // Called when the connection that the pool is trying to acquire failed @@ -938,12 +956,34 @@ class ConnectPoolOperation : public ConnectOperation { LOG(DFATAL) << "Should not be called"; } + void prepWait() { + baton_ = std::make_unique>(); + } + + bool syncWait() { + DCHECK(baton_); + auto end = timeout_ + start_time_; + return baton_->try_wait_until(end); + } + + void cleanupWait() { + baton_.reset(); + } + + void signalWaiter() { + if (baton_) { + baton_->post(); + } + } + std::string createTimeoutErrorMessage( const PoolKeyStats& pool_key_stats, size_t per_key_limit); std::weak_ptr> pool_; + std::unique_ptr> baton_; + // PreOperation keeps any other operation that needs to be canceled when // ConnectPoolOperation is cancelled. // PreOperation is not reused and its lifetime is with ConnectPoolOperation. diff --git a/third-party/squangle/src/squangle/mysql_client/MysqlClientBase.h b/third-party/squangle/src/squangle/mysql_client/MysqlClientBase.h index 31b932c373d..fe009189a68 100644 --- a/third-party/squangle/src/squangle/mysql_client/MysqlClientBase.h +++ b/third-party/squangle/src/squangle/mysql_client/MysqlClientBase.h @@ -128,7 +128,7 @@ class MysqlClientBase { friend class ChangeUserOperation; friend class MysqlConnectionHolder; friend class AsyncConnection; - friend class AsyncConnectionPool; + friend class SyncConnection; virtual db::SquangleLoggingData makeSquangleLoggingData( const ConnectionKey* connKey, const db::ConnectionContextBase* connContext) = 0; diff --git a/third-party/squangle/src/squangle/mysql_client/Operation.h b/third-party/squangle/src/squangle/mysql_client/Operation.h index b127630a949..bb97b82f470 100644 --- a/third-party/squangle/src/squangle/mysql_client/Operation.h +++ b/third-party/squangle/src/squangle/mysql_client/Operation.h @@ -818,6 +818,7 @@ class Operation : public std::enable_shared_from_this { friend class Connection; friend class SyncConnection; + friend class SyncConnectionPool; friend class ConnectionSocketHandler; }; @@ -1006,6 +1007,10 @@ class ConnectOperation : public Operation { return db::OperationType::Connect; } + bool isActive() const { + return active_in_client_; + } + protected: virtual void attemptFailed(OperationResult result); virtual void attemptSucceeded(OperationResult result); diff --git a/third-party/squangle/src/squangle/mysql_client/PoolStorage.h b/third-party/squangle/src/squangle/mysql_client/PoolStorage.h index e0ef2835608..03b9db0b000 100644 --- a/third-party/squangle/src/squangle/mysql_client/PoolStorage.h +++ b/third-party/squangle/src/squangle/mysql_client/PoolStorage.h @@ -66,6 +66,19 @@ class PoolStorageData { list.push_back(std::move(weak_op)); } + // Searches for an operation in the queue and removes it + bool dequeueOperation( + const PoolKey& pool_key, + const ConnectPoolOperation& pool_op) { + auto& list = waitList_[pool_key]; + auto it = std::find_if(list.begin(), list.end(), [&](const auto& weak_op) { + auto locked_op = weak_op.lock(); + return locked_op && locked_op.get() == &pool_op; + }); + + return it != list.end(); + } + // Calls failureCallback with the error description and removed all // the operations for conn_key from the queue. void failOperations( @@ -278,6 +291,18 @@ class PoolStorage { } } + bool dequeueOperation( + const PoolKey& pool_key, + const ConnectPoolOperation& pool_op) { + if constexpr (uses_one_thread_v) { + return data_.dequeueOperation(pool_key, pool_op); + } else { + return data_.wlock()->dequeueOperation(pool_key, pool_op); + } + } + + // Calls failureCallback with the error description and removed all + // the operations for conn_key from the queue. void failOperations( const PoolKey& pool_key, OperationResult op_result, diff --git a/third-party/squangle/src/squangle/mysql_client/SyncConnectionPool.cpp b/third-party/squangle/src/squangle/mysql_client/SyncConnectionPool.cpp new file mode 100644 index 00000000000..68d1a405b59 --- /dev/null +++ b/third-party/squangle/src/squangle/mysql_client/SyncConnectionPool.cpp @@ -0,0 +1,88 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include "squangle/mysql_client/SyncConnectionPool.h" + +using namespace std::chrono_literals; + +namespace facebook::common::mysql_client { + +std::shared_ptr SyncConnectionPool::makePool( + std::shared_ptr mysql_client, + PoolOptions pool_options) { + auto connectionPool = std::make_shared( + std::move(mysql_client), std::move(pool_options)); + return connectionPool; +} + +std::unique_ptr SyncConnectionPool::connect( + const std::string& host, + int port, + const std::string& database_name, + const std::string& user, + const std::string& password, + const ConnectionOptions& conn_opts) { + auto op = beginConnection(host, port, database_name, user, password); + op->setConnectionOptions(conn_opts); + // This will throw (intended behaviour) in case the operation didn't succeed + return blockingConnectHelper(std::move(op)); +} + +void SyncConnectionPool::openNewConnectionPrep( + SyncConnectPoolOperation& pool_op) { + pool_op.prepWait(); +} + +void SyncConnectionPool::openNewConnectionFinish( + SyncConnectPoolOperation& pool_op, + const PoolKey& pool_key) { + if (!pool_op.syncWait()) { + if (!conn_storage_.dequeueOperation(pool_key, pool_op)) { + // The operation was not found in the queue, so someone must be fulfilling + // the operation. Wait until that is finished. + while (pool_op.isActive()) { + /* sleep_override */ std::this_thread::sleep_for(1ms); + } + + return; + } + + pool_op.timeoutTriggered(); + } + + pool_op.cleanupWait(); +} + +template <> +std::string SyncConnectPoolOperation::createTimeoutErrorMessage( + const PoolKeyStats& pool_key_stats, + size_t per_key_limit) { + auto delta = std::chrono::duration_cast( + std::chrono::steady_clock::now() - start_time_); + + return fmt::format( + "[{}]({})Connection to {}:{} timed out in pool " + "(open {}, opening {}, key limit {}) {}", + static_cast(SquangleErrno::SQ_ERRNO_POOL_CONN_TIMEOUT), + kErrorPrefix, + host().c_str(), + port(), + pool_key_stats.open_connections, + pool_key_stats.pending_connections, + per_key_limit, + timeoutMessage(delta)); +} + +template <> +SyncConnectPoolOperation* SyncConnectPoolOperation::specializedRun() { + // No special thread manipulation needed for sync client + ConnectPoolOperation::specializedRunImpl(); + return this; +} + +} // namespace facebook::common::mysql_client diff --git a/third-party/squangle/src/squangle/mysql_client/SyncConnectionPool.h b/third-party/squangle/src/squangle/mysql_client/SyncConnectionPool.h new file mode 100644 index 00000000000..5cd01ff51f3 --- /dev/null +++ b/third-party/squangle/src/squangle/mysql_client/SyncConnectionPool.h @@ -0,0 +1,106 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include + +#include "squangle/mysql_client/ConnectionPool.h" +#include "squangle/mysql_client/SyncMysqlClient.h" + +namespace facebook::common::mysql_client { + +using SyncConnectPoolOperation = ConnectPoolOperation; + +class SyncConnectionPool : public ConnectionPool { + public: + // Don't use std::chrono::duration::MAX to avoid overflows + static std::shared_ptr makePool( + std::shared_ptr mysql_client, + PoolOptions pool_options = PoolOptions()); + + std::unique_ptr connect( + const std::string& host, + int port, + const std::string& database_name, + const std::string& user, + const std::string& password, + const ConnectionOptions& conn_opts = ConnectionOptions()); + + // Don't use the constructor directly, only public to use make_shared + SyncConnectionPool( + std::shared_ptr mysql_client, + PoolOptions pool_options) + : ConnectionPool( + std::move(mysql_client), + std::move(pool_options)) { + scheduler_.addFunction( + [&]() { + conn_storage_.cleanupOperations(); + conn_storage_.cleanupConnections(); + }, + PoolOptions::kCleanUpTimeout, + "pool_periodic_cleanup"); + scheduler_.start(); + } + + ~SyncConnectionPool() { + VLOG(2) << "Connection pool dying"; + + shutdown(); + + VLOG(2) << "Connection pool shutdown completed"; + } + + void shutdown() override { + bool expected = false; + if (shutting_down_.compare_exchange_strong(expected, true)) { + scheduler_.shutdown(); + conn_storage_.clearAll(); + } + } + + private: + bool isShuttingDown() const override { + return shutting_down_; + } + + void validateCorrectThread() const override { + // The sync connection pool runs everything in the clients' threads so don't + // do anything here. + } + + bool runInCorrectThread(std::function&& func, bool /*wait*/) + override { + // The sync connection pool runs everything in the clients' threads. + func(); + return true; + } + + std::unique_ptr makeNewConnection( + const ConnectionKey& conn_key, + std::unique_ptr> mysqlConn) override { + return std::make_unique( + mysql_client_.get(), conn_key, std::move(mysqlConn)); + } + + void openNewConnectionPrep(SyncConnectPoolOperation& pool_op) override; + + void openNewConnectionFinish( + SyncConnectPoolOperation& pool_op, + const PoolKey& pool_key) override; + + std::atomic shutting_down_{false}; + + folly::FunctionScheduler scheduler_; + + SyncConnectionPool(const SyncConnectionPool&) = delete; + SyncConnectionPool& operator=(const SyncConnectionPool&) = delete; +}; + +} // namespace facebook::common::mysql_client diff --git a/third-party/squangle/src/squangle/mysql_client/SyncMysqlClient.cpp b/third-party/squangle/src/squangle/mysql_client/SyncMysqlClient.cpp index 2989dfc2a23..7371efdb1a0 100644 --- a/third-party/squangle/src/squangle/mysql_client/SyncMysqlClient.cpp +++ b/third-party/squangle/src/squangle/mysql_client/SyncMysqlClient.cpp @@ -6,18 +6,55 @@ * LICENSE file in the root directory of this source tree. */ +#include + #include "squangle/mysql_client/SyncMysqlClient.h" namespace facebook { namespace common { namespace mysql_client { +namespace { +folly::Singleton client([]() { return new SyncMysqlClient; }); +} // namespace + +std::shared_ptr SyncMysqlClient::defaultClient() { + return folly::Singleton::try_get(); +} + std::unique_ptr SyncMysqlClient::createConnection( ConnectionKey conn_key, MYSQL* mysql_conn) { return std::make_unique(this, conn_key, mysql_conn); } +SyncConnection::~SyncConnection() { + if (mysql_connection_ && conn_dying_callback_ && needToCloneConnection_ && + isReusable() && !inTransaction() && + getConnectionOptions().isEnableResetConnBeforeClose()) { + // We clone this Connection object to send COM_RESET_CONNECTION command + // via the connection before returning it to the connection pool. + // The callback function points to recycleMysqlConnection(), which is + // responsible for recyclining the connection. + // This object's callback is set to null and the cloned object's + // callback instead points to the original callback function, which will + // be called after COM_RESET_CONNECTION. + + auto connHolder = stealMysqlConnectionHolder(true); + auto conn = std::make_unique( + client(), *getKey(), std::move(connHolder)); + conn->needToCloneConnection_ = false; + conn->setConnectionOptions(getConnectionOptions()); + conn->setConnectionDyingCallback(std::move(conn_dying_callback_)); + conn_dying_callback_ = nullptr; + auto resetOp = Connection::resetConn(std::move(conn)); + // addOperation() is necessary here for proper cancelling of reset + // operation in case of sudden SyncMysqlClient shutdown + resetOp->connection()->client()->addOperation(resetOp); + resetOp->run()->wait(); + } +} + } // namespace mysql_client } // namespace common } // namespace facebook diff --git a/third-party/squangle/src/squangle/mysql_client/SyncMysqlClient.h b/third-party/squangle/src/squangle/mysql_client/SyncMysqlClient.h index 0c6074195b3..032f873ad69 100644 --- a/third-party/squangle/src/squangle/mysql_client/SyncMysqlClient.h +++ b/third-party/squangle/src/squangle/mysql_client/SyncMysqlClient.h @@ -14,6 +14,8 @@ namespace facebook { namespace common { namespace mysql_client { +class SyncConnection; + class SyncMysqlClient : public MysqlClientBase { public: SyncMysqlClient() : SyncMysqlClient(nullptr) {} @@ -50,9 +52,18 @@ class SyncMysqlClient : public MysqlClientBase { return true; } + uint64_t getPoolsConnectionLimit() { + // This is used by HHVM in the async client. We don't need it here in the + // sync client. + return std::numeric_limits::max(); + } + + static std::shared_ptr defaultClient(); + protected: // Private methods, primarily used by Operations and its subclasses. - friend class AsyncConnectionPool; + template + friend class ConnectionPool; bool runInThread(folly::Cob&& fn, bool wait = false) override { fn(); @@ -141,10 +152,15 @@ class SyncMysqlClient : public MysqlClientBase { class SyncConnection : public Connection { public: SyncConnection( - MysqlClientBase* async_client, + MysqlClientBase* client, ConnectionKey conn_key, - MYSQL* existing_connection) - : Connection(async_client, conn_key, existing_connection) {} + std::unique_ptr conn) + : Connection(client, conn_key, std::move(conn)) {} + + SyncConnection(MysqlClientBase* client, ConnectionKey conn_key, MYSQL* conn) + : Connection(client, conn_key, conn) {} + + ~SyncConnection(); // Operations call these methods as the operation becomes unblocked, as // callers want to wait for completion, etc. -- 2.11.4.GIT