Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions .github/workflows/rspec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ jobs:

services:
mysql:
image: mysql:5.7
image: mysql:8.0
ports:
- 3306:3306
env:
Expand All @@ -22,16 +22,8 @@ jobs:
- uses: actions/checkout@v2
- uses: ruby/setup-ruby@v1
with:
ruby-version: 2.4
ruby-version: 3.4
bundler-cache: true

- name: Run tests
run: bundle exec rspec

- name: Code Coverage
uses: paambaati/codeclimate-action@v2.7.5
env:
CC_TEST_REPORTER_ID: ${{ secrets.CC_TEST_REPORTER_ID }}
with:
coverageLocations: |
${{github.workspace}}/coverage/.resultset.json:simplecov
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: '2.1'

services:
test-runner:
image: ruby:2.4
image: ruby:3.4
working_dir: /usr/src/app
container_name: test-runner
command: sh -c "while true; do echo 'Container is running..'; sleep 5; done"
Expand All @@ -21,7 +21,7 @@ services:

test-mysql:
container_name: test-mysql
image: mysql:5.7
image: mysql:8
restart: always
environment:
MYSQL_ROOT_PASSWORD: admin
Expand Down
147 changes: 87 additions & 60 deletions lib/mysql_framework/connector.rb
Original file line number Diff line number Diff line change
@@ -1,83 +1,104 @@
# frozen_string_literal: true

require 'active_record'
require 'active_record/connection_adapters/mysql2_adapter'

# Monkeypatch the MySQL2 adapter to return hashes with symbol keys by default
module MysqlFramework
module Mysql2AdapterPatch
def configure_connection
super
@raw_connection.query_options[:as] = :hash
@raw_connection.query_options[:symbolize_keys] = true
@raw_connection.query_options[:cast_booleans] = true
end
end
end

ActiveRecord::ConnectionAdapters::Mysql2Adapter.prepend(MysqlFramework::Mysql2AdapterPatch)

module MysqlFramework
class Connector
def initialize(options = {})
@options = default_options.merge(options)
@mutex = Mutex.new

Mysql2::Client.default_query_options.merge!(symbolize_keys: true, cast_booleans: true)
@connection_map = nil
@map_mutex = Mutex.new
@setup_mutex = Mutex.new
@setup_complete = false
end

# This method is called to setup a pool of MySQL connections.
# This method is called to setup the ActiveRecord connection pool.
def setup
return unless connection_pool_enabled?
return if @setup_complete

@connection_pool = ::Queue.new
@setup_mutex.synchronize do
return if @setup_complete

start_pool_size.times { @connection_pool.push(new_client) }

@created_connections = start_pool_size
ActiveRecord::Base.establish_connection(active_record_config)
@connection_map = {}
@setup_complete = true
end
end

# This method is called to close all MySQL connections in the pool and dispose of the pool itself.
def dispose
return if @connection_pool.nil?
return unless @setup_complete

until @connection_pool.empty?
conn = @connection_pool.pop(true)
conn&.close
ActiveRecord::Base.connection_pool.disconnect!

@map_mutex.synchronize do
@connection_map.clear
end

@connection_pool = nil
@setup_complete = false
end

# This method is called to get the idle connection queue for this connector.
# This method is called to get the connection pool for this connector.
def connections
@connection_pool
return nil unless @setup_complete

ActiveRecord::Base.connection_pool
end

# This method is called to fetch a client from the connection pool.
def check_out
@mutex.synchronize do
begin
return new_client unless connection_pool_enabled?

client = @connection_pool.pop(true)

client.ping if @options[:reconnect]

client
rescue ThreadError
if @created_connections < max_pool_size
client = new_client
@created_connections += 1
return client
end
setup unless @setup_complete

MysqlFramework.logger.error { "[#{self.class}] - Database connection pool depleted." }
adapter = ActiveRecord::Base.connection_pool.checkout
raw_conn = adapter.raw_connection

raise 'Database connection pool depleted.'
end
@map_mutex.synchronize do
@connection_map[raw_conn.object_id] = adapter
end

raw_conn
end

# This method is called to check a client back in to the connection when no longer needed.
def check_in(client)
@mutex.synchronize do
return client&.close unless connection_pool_enabled?
return if client.nil? || !@setup_complete

adapter = @map_mutex.synchronize do
@connection_map.delete(client.object_id)
end

client = new_client if client&.closed?
@connection_pool.push(client)
if adapter
ActiveRecord::Base.connection_pool.checkin(adapter)
else
MysqlFramework.logger.warn { "[#{self.class}] - Unable to find adapter for raw connection during check_in" }
end
end

# This method is called to use a client from the connection pool.
def with_client(provided = nil)
client = provided || check_out
yield client
ensure
check_in(client) if client && !provided
if provided
yield provided
else
setup unless @setup_complete
ActiveRecord::Base.connection_pool.with_connection do |connection|
yield connection.raw_connection
end
end
end

# This method is called to execute a prepared statement
Expand All @@ -87,14 +108,12 @@ def with_client(provided = nil)
# running different queries at the same time.
def execute(query, provided_client = nil)
with_client(provided_client) do |client|
begin
statement = client.prepare(query.sql)
result = statement.execute(*query.params)
result&.to_a
ensure
result&.free
statement&.close
end
statement = client.prepare(query.sql)
result = statement.execute(*query.params)
result&.to_a
ensure
result&.free
statement&.close
end
end

Expand Down Expand Up @@ -146,20 +165,28 @@ def default_options
}
end

def new_client
Mysql2::Client.new(@options)
end

def connection_pool_enabled?
@connection_pool_enabled ||= ENV.fetch('MYSQL_CONNECTION_POOL_ENABLED', 'true').casecmp?('true')
end

def start_pool_size
@start_pool_size ||= Integer(ENV.fetch('MYSQL_START_POOL_SIZE', 1))
def active_record_config
{
adapter: 'mysql2',
host: @options[:host],
port: @options[:port],
database: @options[:database],
username: @options[:username],
password: @options[:password],
reconnect: @options[:reconnect],
read_timeout: @options[:read_timeout],
write_timeout: @options[:write_timeout],
pool: max_pool_size,
checkout_timeout: pool_timeout
}
end

def max_pool_size
@max_pool_size ||= Integer(ENV.fetch('MYSQL_MAX_POOL_SIZE', 5))
end

def pool_timeout
@pool_timeout ||= Integer(ENV.fetch('MYSQL_POOL_TIMEOUT', 5))
end
end
end
24 changes: 15 additions & 9 deletions lib/mysql_framework/scripts/lock_manager.rb
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
# frozen_string_literal: true

require 'redlock'
require 'connection_pool'

module MysqlFramework
module Scripts
class LockManager
def initialize
@pool = Queue.new
@pool = ConnectionPool.new(size: pool_size, timeout: pool_timeout) do
# By not letting redlock retry we will rely on the retry that happens in this class
Redlock::Client.new([redis_url], retry_jitter: retry_jitter, retry_count: 1, retry_delay: 0)
end
end

# This method is called to request a lock (Default 5 minutes)
Expand Down Expand Up @@ -63,18 +67,12 @@ def with_lock(key:, ttl: default_ttl, max_attempts: default_max_retries, retry_d

# This method is called to retrieve a Redlock client from the pool
def fetch_client
@pool.pop(true)
rescue StandardError
# By not letting redlock retry we will rely on the retry that happens in this class
Redlock::Client.new([redis_url], retry_jitter: retry_jitter, retry_count: 1, retry_delay: 0)
@pool.checkout
end

# This method is called to retrieve a Redlock client from the pool and yield it to a block
def with_client
client = fetch_client
yield client
ensure
@pool.push(client)
@pool.with { |client| yield client }
end

private
Expand All @@ -98,6 +96,14 @@ def default_retry_delay
def retry_jitter
@retry_jitter ||= Integer(ENV.fetch('MYSQL_MIGRATION_LOCK_JITTER_MS', 50))
end

def pool_size
@pool_size ||= Integer(ENV.fetch('MYSQL_MIGRATION_LOCK_POOL_SIZE', 5))
end

def pool_timeout
@pool_timeout ||= Integer(ENV.fetch('MYSQL_MIGRATION_LOCK_POOL_TIMEOUT', 5))
end
end
end
end
2 changes: 1 addition & 1 deletion lib/mysql_framework/scripts/manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def execute
end

def apply_by_tag(tags)
lock_manager.with_lock(key: self.class) do
lock_manager.with_lock(key: self.class.name) do
initialize_script_history

mysql_connector.transaction do |client|
Expand Down
2 changes: 2 additions & 0 deletions mysql_framework.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,6 @@ Gem::Specification.new do |spec|

spec.add_dependency 'mysql2', '~> 0.4'
spec.add_dependency 'redlock'
spec.add_dependency 'connection_pool'
spec.add_dependency 'activerecord', '~> 8.1', '>= 8.1.1'
end
Loading