diff --git a/GUIDE.md b/GUIDE.md index 7697615..fcc84ea 100644 --- a/GUIDE.md +++ b/GUIDE.md @@ -28,6 +28,7 @@ * [2.5 Configuring the Tenant Resolver](#25-configuring-the-tenant-resolver) * [2.6 Other Tenant Configuration](#26-other-tenant-configuration) * [2.7 Related Rails Configurations](#27-related-rails-configurations) + * [2.8 Configuring Shared Pool Mode (MySQL)](#28-configuring-shared-pool-mode-mysql) - [Documentation "work in progress"](#documentation-work-in-progress) * [Active Record API](#active-record-api) * [Caching](#caching) @@ -66,7 +67,7 @@ The goal is that developers will rarely need to think about managing tenant isol ### 1.2 High-level implementation -Active Record Tenanted extends Active Record to dynamically create a Connection Pool for a tenant on demand. It does this in a thread-safe way by relying heavily on Rails' horizontal sharding features. +Active Record Tenanted extends Active Record to dynamically create a Connection Pool for a tenant on demand. It does this in a thread-safe way by relying heavily on Rails' horizontal sharding features. For MySQL databases, the gem additionally supports a shared pool mode where all tenants share a single connection pool per role, with tenant switching performed via session-level `USE` statements. This drastically reduces memory and connection overhead at high tenant cardinality. It extends Rails' testing frameworks so that tests don't need to explicitly set up a tenant or otherwise be aware of tenanting (unless tenanting behavior is explicitly being tested). @@ -390,6 +391,55 @@ TODO: - `active_record.check_schema_cache_dump_version = false` +### 2.8 Configuring Shared Pool Mode (MySQL) + +By default, Active Record Tenanted creates a separate connection pool for each tenant. This works well for SQLite and low-cardinality MySQL deployments, but at high tenant counts (thousands of tenants) the per-tenant pool model can cause excessive memory usage, file descriptor pressure, and latency spikes from frequent pool recreation. + +For MySQL databases, Active Record Tenanted supports a **shared pool mode** where all tenants share a single connection pool per role. Tenant switching is performed via MySQL's `USE` statement at connection checkout time. + +To enable shared pool mode, add `shared_pool: true` and `untenanted_database` to the tenanted database configuration: + +``` yaml +production: + primary: + adapter: mysql2 # or trilogy + tenanted: true + database: app_%{tenant} + shared_pool: true + untenanted_database: information_schema + prepared_statements: false +``` + +The `untenanted_database` is the database that idle connections are reset to on checkin. It must be accessible by the connection user (e.g. `information_schema`). + +Shared pool mode has the following constraints: + +- **Adapter**: must be `mysql2` or `trilogy`. Shared pool mode uses `USE `, which is MySQL-specific. +- **Prepared statements**: must be `false`. Prepared statement caches are tied to a specific database and cannot be shared across tenants. +- **Host templating**: `%{tenant}` in the `host` config is not supported because a single shared pool implies a single host. +- **Database name length**: the full database name (after tenant interpolation) must not exceed 64 characters (MySQL's limit). + +The shared pool implementation uses a dual-layer safety model: + +1. **Layer 1 (adapter callbacks)**: the `SharedPool` module, included into MySQL adapters via Railtie load hooks, switches the connection to the correct tenant database on checkout and resets it to the fallback database on checkin. +2. **Layer 2 (tenant context reconciliation)**: `with_tenant` and `current_tenant=` reconcile any leased connection to the current tenant, handling sticky leases and nested `with_tenant` calls. + +Failed `USE` statements discard the connection entirely (`throw_away!`) and raise a typed error. Tenant switching inside an open transaction is prohibited and raises `TenantSwitchInTransactionError`. Query cache keys are automatically tenant-namespaced to prevent cross-tenant cache hits. + +All existing subsystems (Active Job, Action Cable, Active Storage, Action Mailer, Console, Testing) work unchanged with shared pool mode because they interact through the tenant context API (`with_tenant`, `current_tenant`), not through pool internals. + +The `host` configuration also supports tenant interpolation for per-tenant host routing (in non-shared-pool mode): + +``` yaml +production: + primary: + adapter: mysql2 + tenanted: true + database: app_%{tenant} + host: "%{tenant}.db.example.com" +``` + + ## Documentation "work in progress" ### Active Record API @@ -400,7 +450,8 @@ Documentation outline: - `.with_tenant` and `.current_tenant=` - and the callbacks for each, `:with_tenant` and `:set_current_tenant` - validation - - invalid characters in a tenant name (which is database-dependent) + - invalid characters in a tenant name (which is database-dependent: path separators for SQLite, backticks/non-printable for MySQL) + - MySQL 64-character database name limit (validated on the full interpolated name) - and how the application may want to do additional validation (e.g. ICANN subdomain restrictions) - `#tenant` is a readonly attribute on all tenanted model instances - `.current_tenant` returns the execution context for the model connection class @@ -438,8 +489,41 @@ TODO: - [x] `#database_path_for(tenant_name)` - [x] `#tenants` returns all the tenants on disk (for iteration) - [x] raise an exception if tenant name contains a path separator + - [x] `#host_for(tenant_name)` for per-tenant host routing + - [x] MySQL database name 64-character limit validation in `#database_for` + - [x] `#shared_pool?` predicate + - [x] `#fallback_database` + - [x] `#build_shared_pool_config` + - [x] `#validate_shared_pool` (adapter, fallback db, host templating, prepared statements) - [ ] bucketed database paths +- implement MySQL database adapter (`AR::Tenanted::DatabaseAdapters::MySQL`) + - [x] create `DatabaseAdapters::MySQL` class following SQLite adapter interface + - [x] register `mysql2` and `trilogy` adapters in `DatabaseAdapter` + - [x] Zeitwerk inflection for `"mysql" => "MySQL"` + - [x] `tenant_databases` via `SHOW DATABASES LIKE` with connection quoting + - [x] `validate_tenant_name` for MySQL identifier constraints + - [x] `create_database` with charset/collation options + - [x] `drop_database`, `database_exist?` via `information_schema.schemata` + - [x] `with_server_connection` helper (adapter-agnostic error handling) + - [x] `test_workerize` with double-suffix guard + +- implement shared pool mode + - [x] `TenantSwitchError`, `TenantResetError`, `TenantSwitchInTransactionError` error classes + - [x] `SHARED_POOL_SHARD` sentinel constant in `Tenant` + - [x] `pool_shard_for` single decision point for physical shard key + - [x] shared pool creation in `_create_tenanted_pool` via `build_shared_pool_config` + - [x] shared pool guard in `destroy_tenant` (drop database, never remove shared pool) + - [x] `SharedPool` adapter module with `:checkout` and `:checkin` callbacks + - [x] `apply_current_tenant` (checkout): switch to tenant DB via `USE` + - [x] `reset_to_fallback` (checkin): reset to fallback DB, discard on failure + - [x] `switch_tenant_database`: no-op guard, transaction guard, `throw_away!` on failure + - [x] `NamespaceStore` for tenant-namespaced query cache keys + - [x] `ensure_shared_pool_tenant_switch` reconciliation method (Layer 2) + - [x] `with_tenant` restructured with ensure-based reconciliation + - [x] `:after :set_current_tenant` callback for shared pool switching + - [x] Railtie load hooks for `mysql2` and `trilogy` adapters to include `SharedPool` + - implement `AR::Tenanted::DatabaseConfigurations::TenantConfig` - [x] make sure the logs include the tenant name (via `#new_connection`) @@ -588,6 +672,7 @@ Documentation outline: - explain why we're not worried about russian doll caching - explain why calling Rails.cache directly requires care that it's either explicitly tenanted or global - explain why we're not worried about sql query caching (it belongs to the connection pool) +- explain how shared pool mode namespaces query cache keys by tenant database (`NamespaceStore`) TODO: @@ -595,6 +680,7 @@ TODO: - [x] make basic fragment caching work - [x] investigate: is collection caching going to be tenanted properly - [x] investigate: make sure the QueryCache executor is clearing query caches for tenanted pool +- [x] tenant-namespaced query cache for shared pool mode (`NamespaceStore`) - [x] do we need to do some exploration on how to make sure all caching is tenanted? - I'm making the call not to pursue this. Rails.cache is a primitive. Just document it. diff --git a/lib/active_record/tenanted.rb b/lib/active_record/tenanted.rb index 1b0b62b..7774b27 100644 --- a/lib/active_record/tenanted.rb +++ b/lib/active_record/tenanted.rb @@ -6,6 +6,7 @@ loader = Zeitwerk::Loader.for_gem_extension(ActiveRecord) loader.inflector.inflect( "sqlite" => "SQLite", + "mysql" => "MySQL", ) loader.setup @@ -41,6 +42,15 @@ class IntegrationNotConfiguredError < Error; end # Raised when an unsupported database adapter is used. class UnsupportedDatabaseError < Error; end + # Raised when a tenant database switch via USE fails. + class TenantSwitchError < Error; end + + # Raised when resetting a connection to the fallback database fails during checkin. + class TenantResetError < Error; end + + # Raised when a tenant switch is attempted while a database transaction is open. + class TenantSwitchInTransactionError < Error; end + # Return the constantized connection class configured in `config.active_record_tenanted.connection_class`, # or nil if none is configured. def self.connection_class diff --git a/lib/active_record/tenanted/database_adapter.rb b/lib/active_record/tenanted/database_adapter.rb index b773941..60a099e 100644 --- a/lib/active_record/tenanted/database_adapter.rb +++ b/lib/active_record/tenanted/database_adapter.rb @@ -25,6 +25,8 @@ def new(db_config) end register "sqlite3", "ActiveRecord::Tenanted::DatabaseAdapters::SQLite" + register "mysql2", "ActiveRecord::Tenanted::DatabaseAdapters::MySQL" + register "trilogy", "ActiveRecord::Tenanted::DatabaseAdapters::MySQL" end end end diff --git a/lib/active_record/tenanted/database_adapters/mysql.rb b/lib/active_record/tenanted/database_adapters/mysql.rb new file mode 100644 index 0000000..8e99a7e --- /dev/null +++ b/lib/active_record/tenanted/database_adapters/mysql.rb @@ -0,0 +1,167 @@ +# frozen_string_literal: true + +require "digest" + +module ActiveRecord + module Tenanted + module DatabaseAdapters # :nodoc: + class MySQL + attr_reader :db_config + + def initialize(db_config) + @db_config = db_config + end + + def tenant_databases + database_pattern = db_config.database_for("%") + scanner = Regexp.new(db_config.database_for("(.+)")) + + with_server_connection do |conn| + conn.select_values( + "SHOW DATABASES LIKE #{conn.quote(database_pattern)}" + ).filter_map do |name| + match = name.match(scanner) + if match + match[1] + else + ActiveRecord::Base.logger&.warn "ActiveRecord::Tenanted: Cannot parse tenant name from database #{name.inspect}" + nil + end + end + end + rescue ActiveRecord::NoDatabaseError => error + ActiveRecord::Base.logger&.warn "ActiveRecord::Tenanted: tenant_databases returned empty due to NoDatabaseError: #{error.message}" + [] + end + + def validate_tenant_name(tenant_name) + if tenant_name.empty? + raise BadTenantNameError, "Tenant name cannot be empty." + end + + if tenant_name.match?(/[\/`]/) || !tenant_name.match?(/\A[\x20-\x7E]+\z/) + raise BadTenantNameError, "Tenant name contains an invalid character: #{tenant_name.inspect}" + end + end + + def create_database + with_server_connection do |conn| + options = {} + options[:charset] = db_config.configuration_hash[:encoding] if db_config.configuration_hash[:encoding] + options[:collation] = db_config.configuration_hash[:collation] if db_config.configuration_hash[:collation] + conn.create_database(database_path, options) + end + end + + def drop_database + with_server_connection do |conn| + conn.drop_database(database_path) + end + end + + def database_exist? + with_server_connection do |conn| + conn.select_values( + "SELECT schema_name FROM information_schema.schemata WHERE schema_name = #{conn.quote(database_path)}" + ).any? + end + rescue ActiveRecord::NoDatabaseError => error + ActiveRecord::Base.logger&.warn "ActiveRecord::Tenanted: database_exist? returned false due to NoDatabaseError: #{error.message}" + false + end + + def database_ready? + database_exist? + end + + def acquire_ready_lock + lock_name = "tenanted:#{Digest::SHA256.hexdigest(database_path)}"[0, 64] + + with_server_connection do |conn| + result = conn.select_value("SELECT GET_LOCK(#{conn.quote(lock_name)}, 30)") + unless result == 1 + raise ActiveRecord::LockWaitTimeout, + "Could not acquire advisory lock for tenant database #{database_path.inspect}" + end + + begin + yield + ensure + begin + conn.select_value("SELECT RELEASE_LOCK(#{conn.quote(lock_name)})") + rescue => error + # MySQL releases advisory locks automatically when the + # connection closes, so a failed RELEASE_LOCK is recoverable. + # Letting it propagate would mask the real operation result and + # could cause create_tenant to drop a successfully created + # database. + ActiveRecord::Base.logger&.warn( + "ActiveRecord::Tenanted: failed to release advisory lock " \ + "#{lock_name.inspect}: #{error.message}; the lock will be " \ + "released when the connection closes" + ) + end + end + end + end + + def ensure_database_directory_exists + true + end + + def database_path + db_config.database + end + + def test_workerize(db, test_worker_id) + test_worker_suffix = "_#{test_worker_id}" + + if db.end_with?(test_worker_suffix) + db + else + db + test_worker_suffix + end + end + + def path_for(database) + database + end + + private + + # Establishes an isolated connection to the MySQL server (without a + # specific database selected). We intentionally avoid + # DatabaseTasks.with_temporary_connection here because that method + # replaces ActiveRecord::Base's global connection pool for the + # duration of the block — any Base-backed query running concurrently + # would hit the database-less server config. + # + # Instead we spin up a throwaway ConnectionHandler so the server + # connection never touches the global pool. + def with_server_connection + if db_config.configuration_hash[:host]&.include?("%{tenant}") + raise TenantConfigurationError, + "Cannot connect to the MySQL server because the host contains " \ + "an unresolved %{tenant} template. Host-templated configurations " \ + "cannot enumerate tenant databases from a single server. Use a " \ + "non-templated host, or configure shared_pool mode for single-host setups." + end + + server_config_hash = db_config.configuration_hash.except(:database) + server_db_config = ActiveRecord::DatabaseConfigurations::HashConfig.new( + db_config.env_name, "#{db_config.name}_server", server_config_hash + ) + + handler = ActiveRecord::ConnectionAdapters::ConnectionHandler.new + pool = handler.establish_connection(server_db_config) + + pool.with_connection do |conn| + yield conn + end + ensure + handler&.clear_all_connections!(:all) + end + end + end + end +end diff --git a/lib/active_record/tenanted/database_adapters/sqlite.rb b/lib/active_record/tenanted/database_adapters/sqlite.rb index 35745de..1cf4287 100644 --- a/lib/active_record/tenanted/database_adapters/sqlite.rb +++ b/lib/active_record/tenanted/database_adapters/sqlite.rb @@ -28,7 +28,7 @@ def tenant_databases Dir.glob(glob).filter_map do |path| result = path.scan(scanner).flatten.first if result.nil? - Rails.logger.warn "ActiveRecord::Tenanted: Cannot parse tenant name from filename #{path.inspect}" + ActiveRecord::Base.logger&.warn "ActiveRecord::Tenanted: Cannot parse tenant name from filename #{path.inspect}" end result end diff --git a/lib/active_record/tenanted/database_configurations/base_config.rb b/lib/active_record/tenanted/database_configurations/base_config.rb index 51e37bb..d6fc94c 100644 --- a/lib/active_record/tenanted/database_configurations/base_config.rb +++ b/lib/active_record/tenanted/database_configurations/base_config.rb @@ -27,15 +27,39 @@ def database_for(tenant_name) config_adapter.validate_tenant_name(tenant_name) - db = sprintf(database, tenant: tenant_name) + db = database.gsub("%{tenant}", tenant_name) + + if db.match?(/%\{[^}]+\}/) + raise ActiveRecord::Tenanted::TenantConfigurationError, + "Database template contains an unrecognized token: #{database.inspect}. " \ + "Only %{tenant} is supported." + end if test_worker_id db = config_adapter.test_workerize(db, test_worker_id) end + if %w[mysql2 trilogy].include?(adapter) && db.length > 64 + raise BadTenantNameError, "Database name too long (max 64 characters): #{db.inspect}" + end + db end + def host_for(tenant_name) + return unless host + + resolved = host.gsub("%{tenant}", tenant_name.to_s) + + if resolved.match?(/%\{[^}]+\}/) + raise ActiveRecord::Tenanted::TenantConfigurationError, + "Host template contains an unrecognized token: #{host.inspect}. " \ + "Only %{tenant} is supported." + end + + resolved + end + def tenants config_adapter.tenant_databases end @@ -45,6 +69,7 @@ def new_tenant_config(tenant_name) config_hash = configuration_hash.dup.tap do |hash| hash[:tenant] = tenant_name hash[:database] = database_for(tenant_name) + hash[:host] = host_for(tenant_name) if configuration_hash.key?(:host) hash[:tenanted_config_name] = name end Tenanted::DatabaseConfigurations::TenantConfig.new(env_name, config_name, config_hash) @@ -60,6 +85,60 @@ def new_connection def max_connection_pools (configuration_hash[:max_connection_pools] || DEFAULT_MAX_CONNECTION_POOLS).to_i end + + def shared_pool? + configuration_hash[:shared_pool] == true + end + + def fallback_database + configuration_hash[:untenanted_database].presence + end + + def build_shared_pool_config(connection_class_name:) + validate_shared_pool + + hash = configuration_hash.merge( + database: fallback_database, + tenanted_connection_class_name: connection_class_name, + tenanted_config_name: name + ) + + ActiveRecord::DatabaseConfigurations::HashConfig.new(env_name, "#{name}_shared_pool", hash) + end + + def validate_shared_pool + return unless shared_pool? + + unless %w[mysql2 trilogy].include?(adapter) + raise ActiveRecord::Tenanted::TenantConfigurationError, + "Shared pool mode requires the mysql2 or trilogy adapter, " \ + "but #{name.inspect} is configured with #{adapter.inspect}." + end + + if fallback_database.blank? + raise ActiveRecord::Tenanted::TenantConfigurationError, + "Shared pool mode requires an untenanted_database to be configured " \ + "for #{name.inspect}." + end + + if configuration_hash[:host]&.include?("%{tenant}") + raise ActiveRecord::Tenanted::TenantConfigurationError, + "Shared pool mode does not support host templating " \ + "because a single pool implies a single host (config #{name.inspect})." + end + + ps_value = configuration_hash[:prepared_statements] + unless ps_value.nil? + ps_cast = ActiveRecord::ConnectionAdapters::AbstractAdapter + .type_cast_config_to_boolean(ps_value) + + if ps_cast != false + raise ActiveRecord::Tenanted::TenantConfigurationError, + "Shared pool mode requires prepared_statements: false " \ + "for #{name.inspect}." + end + end + end end end end diff --git a/lib/active_record/tenanted/railtie.rb b/lib/active_record/tenanted/railtie.rb index 423a351..9b64054 100644 --- a/lib/active_record/tenanted/railtie.rb +++ b/lib/active_record/tenanted/railtie.rb @@ -46,6 +46,21 @@ class Railtie < ::Rails::Railtie # Defaults to false in development and test environments, and true in all other environments. config.active_record_tenanted.log_tenant_tag = !Rails.env.local? + # Set this to false to skip the USE statement that resets connections to the + # fallback database on checkin in shared pool mode. When false, connections + # preserve their tenant database across checkout/checkin cycles, and the + # checkout callback switches only when the next checkout targets a different + # tenant. This eliminates most USE roundtrips in the per-query connection + # lifecycle. + # + # The checkout callback (apply_current_tenant) validates the tenant on every + # checkout regardless of this setting, so tenant safety is maintained. + # + # Only applies when shared_pool: true is set in database.yml. + # + # Defaults to true (always reset on checkin). + config.active_record_tenanted.reset_tenant_on_checkin = true + # Set this to override the default tenant name used in development and test environments. # # This is the default tenant name used by database tasks and in the Rails console. In both @@ -137,6 +152,16 @@ class Railtie < ::Rails::Railtie end end + initializer "active_record_tenanted.shared_pool" do + ActiveSupport.on_load(:active_record_mysql2adapter) do + include ActiveRecord::Tenanted::SharedPool + end + + ActiveSupport.on_load(:active_record_trilogyadapter) do + include ActiveRecord::Tenanted::SharedPool + end + end + initializer "active_record_tenanted.action_mailer" do ActiveSupport.on_load(:action_mailer) do prepend ActiveRecord::Tenanted::Mailer @@ -148,6 +173,8 @@ class Railtie < ::Rails::Railtie end config.after_initialize do + ActiveRecord::Tenanted.base_configs.each(&:validate_shared_pool) + ActiveRecord::QueryLogs.taggings = ActiveRecord::QueryLogs.taggings.merge( tenant: ->(context) { context[:connection].tenant } ) diff --git a/lib/active_record/tenanted/shared_pool.rb b/lib/active_record/tenanted/shared_pool.rb new file mode 100644 index 0000000..48c63e3 --- /dev/null +++ b/lib/active_record/tenanted/shared_pool.rb @@ -0,0 +1,146 @@ +# frozen_string_literal: true + +module ActiveRecord + module Tenanted + # MySQL adapter extension that makes shared pool connections tenant-aware. + # + # This module handles two lifecycle seams: + # + # 1. Checkout: switch the connection to the current tenant's database via + # USE and attach a tenant-namespaced query cache. + # 2. Checkin: reset the connection to the fallback database. + # + # This is Layer 1 of the dual-layer safety model. It covers first checkouts + # and connection teardown. Layer 2 (tenant context reconciliation in Tenant) + # covers sticky leases and nested with_tenant. + # + # Included into Mysql2Adapter and TrilogyAdapter via Railtie load hooks. + # All methods guard on shared_pool? so non-shared-pool connections see a + # single hash lookup and early return. + module SharedPool + extend ActiveSupport::Concern + + included do + attr_accessor :tenant_database + + set_callback :checkout, :after, :apply_current_tenant + set_callback :checkin, :after, :reset_to_fallback + end + + def apply_current_tenant + return unless shared_pool? + + attach_query_cache_namespace + + klass = tenanted_connection_class + tenant = klass.current_tenant + + if tenant.blank? + raise TenantSwitchError, + "Cannot switch tenant database during checkout because no tenant " \ + "context is set (connection class #{klass.name.inspect})." + end + + database = klass.tenanted_root_config.database_for(tenant) + switch_tenant_database(tenant: tenant.to_s, database: database) + end + + def reset_to_fallback + return unless shared_pool? + return unless Rails.application.config.active_record_tenanted.reset_tenant_on_checkin + + db = @config[:untenanted_database] + internal_execute("USE #{quote_table_name(db)}", "TENANT RESET", allow_retry: false) + + self.tenant = nil + self.tenant_database = db + rescue => error + throw_away! + raise TenantResetError, + "Failed to reset connection to fallback database #{db.inspect} " \ + "from tenant #{tenant.inspect}: #{error.class}: #{error.message}" + end + + # Override the query_cache getter to ensure tenant namespace isolation + # on Rails' pinned cross-thread path. When a connection is pinned + # (transactional fixtures, system tests) and accessed from a non-owner + # thread, Rails returns pool.query_cache directly, bypassing the + # NamespaceStore wrapper set during checkout. This re-wraps it so + # cache keys remain namespaced by tenant database. + def query_cache + cache = super + if cache && shared_pool? && !cache.is_a?(NamespaceStore) + NamespaceStore.new(cache, -> { tenant_database || @config[:untenanted_database] }) + else + cache + end + end + + def switch_tenant_database(tenant:, database:) + return if self.tenant == tenant && tenant_database == database + + if transaction_open? + raise TenantSwitchInTransactionError, + "Cannot switch to tenant #{tenant.inspect} (database #{database.inspect}) " \ + "because a transaction is open." + end + + internal_execute("USE #{quote_table_name(database)}", "TENANT SWITCH", allow_retry: false) + + self.tenant = tenant + self.tenant_database = database + rescue TenantSwitchInTransactionError + raise + rescue => error + throw_away! + raise TenantSwitchError, + "Failed to switch to tenant #{tenant.inspect} " \ + "(database #{database.inspect}): #{error.class}: #{error.message}" + end + + private + def shared_pool? + @config[:shared_pool] == true && @config.key?(:tenanted_connection_class_name) + end + + def tenanted_connection_class + @config.fetch(:tenanted_connection_class_name).constantize + end + + def attach_query_cache_namespace + self.query_cache = NamespaceStore.new( + pool.query_cache, + -> { tenant_database || @config[:untenanted_database] } + ) + end + + # Thin wrapper that prefixes query cache keys with the current tenant + # database name. Prevents cross-tenant cache hits when two tenants + # execute the same SQL on the same connection within one request. + class NamespaceStore + delegate :enabled, :enabled=, :enabled?, :dirties, :dirties=, :dirties?, + :clear, :size, :empty?, to: :base_store + + attr_reader :base_store + + def initialize(base_store, namespace_proc) + @base_store = base_store + @namespace_proc = namespace_proc + end + + def [](key) + base_store[namespaced(key)] + end + + def compute_if_absent(key, &block) + base_store.compute_if_absent(namespaced(key), &block) + end + + private + def namespaced(key) + [@namespace_proc.call, key] + end + end + end + end +end diff --git a/lib/active_record/tenanted/tenant.rb b/lib/active_record/tenanted/tenant.rb index dc91b97..18484f4 100644 --- a/lib/active_record/tenanted/tenant.rb +++ b/lib/active_record/tenanted/tenant.rb @@ -82,6 +82,10 @@ def to_s end end.new.freeze + # Synthetic shard key used as the physical pool key in shared pool mode. + # All tenants map to one pool per role, keyed by this sentinel. + SHARED_POOL_SHARD = :__tenanted_shared_pool__ + CONNECTION_POOL_CREATION_LOCK = Thread::Mutex.new # :nodoc: class_methods do @@ -116,17 +120,19 @@ def tenant_exist?(tenant_name) def with_tenant(tenant_name, prohibit_shard_swapping: true, &block) tenant_name = tenant_name.to_s unless tenant_name == UNTENANTED_SENTINEL - if tenant_name == current_tenant - run_callbacks :with_tenant, &block - else - connection_class_for_self.connected_to(shard: tenant_name, role: ActiveRecord.writing_role) do - run_callbacks :with_tenant do - prohibit_shard_swapping(prohibit_shard_swapping) do - log_tenant_tag(tenant_name, &block) - end + return run_callbacks(:with_tenant, &block) if tenant_name == current_tenant + + connection_class_for_self.connected_to(shard: tenant_name, role: ActiveRecord.writing_role) do + ensure_shared_pool_tenant_switch + + run_callbacks :with_tenant do + prohibit_shard_swapping(prohibit_shard_swapping) do + log_tenant_tag(tenant_name, &block) end end end + ensure + ensure_shared_pool_tenant_switch end def create_tenant(tenant_name, if_not_exists: false, &block) @@ -137,16 +143,15 @@ def create_tenant(tenant_name, if_not_exists: false, &block) adapter.acquire_ready_lock do unless adapter.database_exist? adapter.create_database + created_db = true with_tenant(tenant_name) do connection_pool(schema_version_check: false) ActiveRecord::Tenanted::DatabaseTasks.new(base_config).migrate_tenant(tenant_name) end - - created_db = true end rescue - adapter.drop_database + adapter.drop_database if created_db raise end @@ -158,11 +163,13 @@ def create_tenant(tenant_name, if_not_exists: false, &block) end def destroy_tenant(tenant_name) - ActiveRecord::Base.logger.info " DESTROY [tenant=#{tenant_name}] Destroying tenant database" + ActiveRecord::Base.logger&.info " DESTROY [tenant=#{tenant_name}] Destroying tenant database" - with_tenant(tenant_name, prohibit_shard_swapping: false) do - if retrieve_connection_pool(strict: false) - remove_connection + unless tenanted_root_config.shared_pool? + with_tenant(tenant_name, prohibit_shard_swapping: false) do + if retrieve_connection_pool(strict: false) + remove_connection + end end end @@ -186,55 +193,65 @@ def without_tenant(&block) # :nodoc: end def connection_pool(schema_version_check: true) # :nodoc: - if current_tenant - pool = retrieve_connection_pool(strict: false) - - if pool.nil? - CONNECTION_POOL_CREATION_LOCK.synchronize do - # re-check now that we have the lock - pool = retrieve_connection_pool(strict: false) - - if pool.nil? - _create_tenanted_pool(schema_version_check: schema_version_check) - pool = retrieve_connection_pool(strict: true) - end + return Tenanted::UntenantedConnectionPool.new(tenanted_root_config, self) unless current_tenant + + shard = pool_shard_for(current_tenant) + pool = retrieve_connection_pool(shard: shard, strict: false) + + unless pool + CONNECTION_POOL_CREATION_LOCK.synchronize do + pool = retrieve_connection_pool(shard: shard, strict: false) + + unless pool + _create_tenanted_pool(shard, schema_version_check: schema_version_check) + pool = retrieve_connection_pool(shard: shard, strict: true) end end - - pool - else - Tenanted::UntenantedConnectionPool.new(tenanted_root_config, self) end + + pool end def tenanted_root_config # :nodoc: ActiveRecord::Base.configurations.resolve(tenanted_config_name.to_sym) end - def _create_tenanted_pool(schema_version_check: true) # :nodoc: + def _create_tenanted_pool(physical_shard, schema_version_check: true) # :nodoc: # ensure all classes use the same connection pool - return superclass._create_tenanted_pool unless connection_class? + return superclass._create_tenanted_pool(physical_shard, schema_version_check: schema_version_check) unless connection_class? + + if tenanted_root_config.shared_pool? + tenanted_root_config.database_for(current_tenant) + + db_config = tenanted_root_config.build_shared_pool_config(connection_class_name: name) + connection_handler.establish_connection( + db_config, + owner_name: self, + role: current_role, + shard: physical_shard + ) + else + tenant = current_tenant + db_config = tenanted_root_config.new_tenant_config(tenant) - tenant = current_tenant - db_config = tenanted_root_config.new_tenant_config(tenant) + unless db_config.config_adapter.database_exist? + raise TenantDoesNotExistError, "The database for tenant #{tenant.inspect} does not exist." + end - unless db_config.config_adapter.database_exist? - raise TenantDoesNotExistError, "The database for tenant #{tenant.inspect} does not exist." - end - pool = establish_connection(db_config) + pool = establish_connection(db_config) - if schema_version_check - pending_migrations = pool.migration_context.open.pending_migrations - raise ActiveRecord::PendingMigrationError.new(pending_migrations: pending_migrations) if pending_migrations.any? - end + if schema_version_check + pending_migrations = pool.migration_context.open.pending_migrations + raise ActiveRecord::PendingMigrationError.new(pending_migrations: pending_migrations) if pending_migrations.any? + end - pool + pool + end end private - def retrieve_connection_pool(strict:) + def retrieve_connection_pool(shard: pool_shard_for(current_tenant), strict:) role = current_role - shard = current_tenant connection_handler.retrieve_connection_pool(connection_specification_name, role:, shard:, strict:).tap do |pool| if pool tenanted_connection_pools[[ shard, role ]] = pool @@ -243,19 +260,40 @@ def retrieve_connection_pool(strict:) end end + def pool_shard_for(logical_tenant) + tenanted_root_config.shared_pool? ? SHARED_POOL_SHARD : logical_tenant + end + + # Called from with_tenant (before/after) and :after set_current_tenant. + # Reconciles the leased connection to current_tenant in shared pool mode. + def ensure_shared_pool_tenant_switch + return unless tenanted_root_config.shared_pool? + return unless current_tenant + + pool = retrieve_connection_pool(shard: SHARED_POOL_SHARD, strict: false) + return unless pool + + conn = pool.active_connection + return unless conn + + database = tenanted_root_config.database_for(current_tenant) + conn.switch_tenant_database(tenant: current_tenant.to_s, database: database) + end + def reap_connection_pools while tenanted_connection_pools.size > tenanted_root_config.max_connection_pools info, _ = *tenanted_connection_pools.pop shard, role = *info connection_handler.remove_connection_pool(connection_specification_name, role:, shard:) - Rails.logger.info " REAPED [tenant=#{shard} role=#{role}] Tenanted connection pool reaped to limit total connection pools" + ActiveRecord::Base.logger&.info " REAPED [tenant=#{shard} role=#{role}] Tenanted connection pool reaped to limit total connection pools" end end def log_tenant_tag(tenant_name, &block) - if Rails.application.config.active_record_tenanted.log_tenant_tag - Rails.logger.tagged("tenant=#{tenant_name}", &block) + logger = ActiveRecord::Base.logger + if Rails.application.config.active_record_tenanted.log_tenant_tag && logger.respond_to?(:tagged) + logger.tagged("tenant=#{tenant_name}", &block) else yield end @@ -273,6 +311,8 @@ def log_tenant_tag(tenant_name, &block) define_callbacks :with_tenant define_callbacks :set_current_tenant + + set_callback :set_current_tenant, :after, :ensure_shared_pool_tenant_switch end def tenanted?