Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions temporalio/ext/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use temporalio_common::protos::coresdk::{
NexusSlotInfo, WorkflowSlotInfo,
};
use temporalio_common::protos::temporal::api::history::v1::History;
use temporalio_common::protos::temporal::api::worker::v1::PluginInfo;
use temporalio_common::{
errors::{PollError, WorkflowErrorType},
worker::{
Expand Down Expand Up @@ -555,6 +556,16 @@ fn build_config(options: Struct, runtime_handle: &RuntimeHandle) -> Result<Worke
.map(|s| (s, HashSet::from([WorkflowErrorType::Nondeterminism])))
.collect::<HashMap<String, HashSet<WorkflowErrorType>>>(),
)
.plugins(
options
.member::<Vec<String>>(id!("plugins"))?
.into_iter()
.map(|name| PluginInfo {
name,
version: String::new(),
})
.collect::<Vec<PluginInfo>>(),
)
.build()
.map_err(|err| error!("Invalid worker options: {}", err))
}
Expand Down
95 changes: 82 additions & 13 deletions temporalio/lib/temporalio/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
require 'temporalio/client/async_activity_handle'
require 'temporalio/client/connection'
require 'temporalio/client/interceptor'
require 'temporalio/client/plugin'
require 'temporalio/client/schedule'
require 'temporalio/client/schedule_handle'
require 'temporalio/client/with_start_workflow_operation'
Expand Down Expand Up @@ -42,6 +43,7 @@ class Client
:connection,
:namespace,
:data_converter,
:plugins,
:interceptors,
:logger,
:default_workflow_query_reject_condition
Expand Down Expand Up @@ -70,6 +72,9 @@ class ListWorkflowPage; end # rubocop:disable Lint/EmptyClass
# @param tls [Boolean, Connection::TLSOptions] If false, do not use TLS. If true, use system default TLS options. If
# TLS options are present, those TLS options will be used.
# @param data_converter [Converters::DataConverter] Data converter to use for all data conversions to/from payloads.
# @param plugins [Array<Plugin>] Plugins to use for configuring clients and intercepting connection. Any plugins
# that also include {Worker::Plugin} will automatically be applied to the worker and should not be configured
# explicitly on the worker. WARNING: Plugins are experimental.
# @param interceptors [Array<Interceptor>] Set of interceptors that are chained together to allow intercepting of
# client calls. The earlier interceptors wrap the later ones. Any interceptors that also implement worker
# interceptor will be used as worker interceptors too so they should not be given separately when creating a
Expand Down Expand Up @@ -101,6 +106,7 @@ def self.connect(
api_key: nil,
tls: nil,
data_converter: Converters::DataConverter.default,
plugins: [],
interceptors: [],
logger: Logger.new($stdout, level: Logger::WARN),
default_workflow_query_reject_condition: nil,
Expand All @@ -112,27 +118,78 @@ def self.connect(
runtime: Runtime.default,
lazy_connect: false
)
# Prepare connection. The connection var is needed here so it can be used in callback for plugin.
base_connection = nil
final_connection = nil
around_connect = if plugins.any?
_validate_plugins!(plugins)
# For plugins, we have to do an around_connect approach with Connection where we provide a
# no-return-value proc that is invoked with the built options and yields newly built options.
# The connection will have been created before, but we allow plugins to return a
# different/extended connection, possibly avoiding actual connection altogether.
proc do |options, &block|
# Steep simply can't comprehend these advanced inline procs
# steep:ignore:start

# Root next call
next_call_called = false
next_call = proc do |options|
raise 'next_call called more than once' if next_call_called

next_call_called = true
block&.call(options)
base_connection
end
# Go backwards, building up new next_call invocations on plugins
next_call = plugins.reverse_each.reduce(next_call) do |next_call, plugin|
proc { |options| plugin.connect_client(options, next_call) }
end
# Do call
final_connection = next_call.call(options)

# steep:ignore:end
end
end
# Now create connection
base_connection = Connection.new(
target_host:,
api_key:,
tls:,
rpc_metadata:,
rpc_retry:,
identity:,
keep_alive:,
http_connect_proxy:,
runtime:,
lazy_connect:,
around_connect: # steep:ignore
)

# Create client
Client.new(
connection: Connection.new(
target_host:,
api_key:,
tls:,
rpc_metadata:,
rpc_retry:,
identity:,
keep_alive:,
http_connect_proxy:,
runtime:,
lazy_connect:
),
connection: final_connection || base_connection,
namespace:,
data_converter:,
plugins:,
interceptors:,
logger:,
default_workflow_query_reject_condition:
)
end

# @!visibility private
def self._validate_plugins!(plugins)
plugins.each do |plugin|
raise ArgumentError, "#{plugin.class} does not implement Client::Plugin" unless plugin.is_a?(Plugin)

# Validate plugin has implemented expected methods
missing = Plugin.instance_methods(false).select { |m| plugin.method(m).owner == Plugin }
unless missing.empty?
raise ArgumentError, "#{plugin.class} missing the following client plugin method(s): #{missing.join(', ')}"
end
end
end

# @return [Options] Frozen options for this client which has the same attributes as {initialize}.
attr_reader :options

Expand All @@ -143,6 +200,9 @@ def self.connect(
# @param connection [Connection] Existing connection to create a client from.
# @param namespace [String] Namespace to use for client calls.
# @param data_converter [Converters::DataConverter] Data converter to use for all data conversions to/from payloads.
# @param plugins [Array<Plugin>] Plugins to use for configuring clients. Any plugins that also include
# {Worker::Plugin} will automatically be applied to the worker and should not be configured explicitly on the
# worker. WARNING: Plugins are experimental.
# @param interceptors [Array<Interceptor>] Set of interceptors that are chained together to allow intercepting of
# client calls. The earlier interceptors wrap the later ones. Any interceptors that also implement worker
# interceptor will be used as worker interceptors too so they should not be given separately when creating a
Expand All @@ -157,6 +217,7 @@ def initialize(
connection:,
namespace:,
data_converter: DataConverter.default,
plugins: [],
interceptors: [],
logger: Logger.new($stdout, level: Logger::WARN),
default_workflow_query_reject_condition: nil
Expand All @@ -165,12 +226,20 @@ def initialize(
connection:,
namespace:,
data_converter:,
plugins:,
interceptors:,
logger:,
default_workflow_query_reject_condition:
).freeze

# Apply plugins
Client._validate_plugins!(plugins)
@options = plugins.reduce(@options) { |options, plugin| plugin.configure_client(options) }

# Initialize interceptors
@impl = interceptors.reverse_each.reduce(Internal::Client::Implementation.new(self)) do |acc, int| # steep:ignore
@impl = @options.interceptors.reverse_each.reduce(
Internal::Client::Implementation.new(self)
) do |acc, int| # steep:ignore
int.intercept_client(acc)
end
end
Expand Down
20 changes: 17 additions & 3 deletions temporalio/lib/temporalio/client/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ class HTTPConnectProxyOptions; end # rubocop:disable Lint/EmptyClass
# @param lazy_connect [Boolean] If true, there is no connection until the first call is attempted or a worker
# is created with it. Clients from lazy connections cannot be used for workers if they have not performed a
# connection.
# @param around_connect [Proc, nil] If present, this proc accepts two values: options and a block. The block is a
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# @param around_connect [Proc, nil] If present, this proc accepts two values: options and a block. The block is a
# @param around_connect [Proc, nil] If present, this proc accepts two values: options and a block. The block

# must be yielded to only once with the options. The block does not return a meaningful value, nor should
# around_connect.
#
# @see Client.connect
def initialize(
Expand All @@ -181,7 +184,8 @@ def initialize(
keep_alive: KeepAliveOptions.new,
http_connect_proxy: nil,
runtime: Runtime.default,
lazy_connect: false
lazy_connect: false,
around_connect: nil
)
@options = Options.new(
target_host:,
Expand All @@ -195,9 +199,19 @@ def initialize(
runtime:,
lazy_connect:
).freeze
# Create core client now if not lazy
@core_client_mutex = Mutex.new
_core_client unless lazy_connect
# Create core client now if not lazy, applying around_connect if present
if around_connect
# Technically around_connect can never run the block for whatever reason (i.e. plugin returning a mock
# connection), so we don't enforce it
around_connect.call(@options) do |options|
@options = options
_core_client unless lazy_connect
nil
end
else
_core_client unless lazy_connect
end
# Create service instances
@workflow_service = WorkflowService.new(self)
@operator_service = OperatorService.new(self)
Expand Down
42 changes: 42 additions & 0 deletions temporalio/lib/temporalio/client/plugin.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# frozen_string_literal: true

module Temporalio
class Client
# Plugin mixin to include for configuring clients and/or intercepting connect calls.
#
# This is a low-level implementation that requires abstract methods herein to be implemented. Many implementers may
# prefer {SimplePlugin} which includes this.
#
# WARNING: Plugins are experimental.
module Plugin
# @abstract
# @return [String] Name of the plugin.
def name
raise NotImplementedError
end

# Configure a client.
#
# @abstract
# @param options [Options] Current immutable options set.
# @return [Options] Options to use, possibly updated from original.
def configure_client(options)
raise NotImplementedError
end

# Connect a client.
#
# Implementers are expected to delegate to next_call to perform the connection. Note, this does not apply to users
# explicitly creating connections via {Connection} constructor.
#
# @abstract
# @param options [Connection::Options] Current immutable options set.
# @param next_call [Proc] Proc for the next plugin in the chain to call. It accepts the options and returns a
# {Connection}.
# @return [Connection] Connected connection.
def connect_client(options, next_call)
raise NotImplementedError
end
end
end
end
1 change: 1 addition & 0 deletions temporalio/lib/temporalio/internal/bridge/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class Worker
:nondeterminism_as_workflow_fail,
:nondeterminism_as_workflow_fail_for_types,
:deployment_options,
:plugins,
keyword_init: true
)

Expand Down
Loading
Loading