Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 86 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -397,11 +397,94 @@ however `ActivityNew3` will get executed, since the release wasn't yet checked a
every new execution of the workflow — all new activities will get executed, while `ActivityOld` will
not.

Later on you can clean it up and drop all the checks if you don't have any older workflows running
or expect them to ever be executed (e.g. reset).

*NOTE: Releases with different names do not depend on each other in any way.*

## Extras

This section describes optional extra modules included in the SDK for convenience and some
additional functionality.

### Versioned Workflows

Implemnting breaking changes using the previously described `#has_release?` flag can be error prone
and results in a condition build up in workflows over time.

Another way of implementing breaking changes is by doing a full cut over to the new version every
time you need to modify a workflow. This can be achieved manually by treating new versions as
separate workflows. We've simplified this process by making your workflow aware of its versions:

```ruby
require 'cadence/concerns/versioned'

class MyWorkflowV1 < Cadence::Workflow
retry_policy max_attempts: 5

def execute
Activity2.execute!
end
end

class MyWorkflowV2 < Cadence::Workflow

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Funny, we're also working on versioning.
My initial take: It's gonna be pretty hard to read PRs that copy all the code. Also this will make the git blame look like all the code is new.
We use inline version checks (version_at_least?) and we memoize the version to make it sticky. A bit of a hack, but we use

temporal_context.has_release?("version-#{version}")

where version is a number. We call this at the beginning of each workflow to memoize it.
We're adopting ElasticSearch and I think we need to implement https://github.com/coinbase/temporal-ruby/blob/master/lib/temporal/workflow/state_manager.rb#L285 so we can get the version into ES as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting… so if I understand correctly you're doing something very similar — you are picking the latest version at the start of each execution and then adding conditionals to the workflow code to segregate different version. Am I getting it right?

Btw, I don't think you need to memoize the version yourself because has_release? does it for you — after it was called ones it will return the same value for all subsequent calls.

We were following the same approach using the has_release? explicitly, but found it to be quite noisy.

Maybe the optimal solution is the combination of the two — version is submitted explicitly via headers and then can be checked inline with version_at_least?. It allows for feature flag rollouts as well as concise PR diffs.

I like the idea of adding ES support and tagging versions for searchability 👍

timeouts execution: 60

def execute
Activity2.execute!
Activity3.execute!
end
end

class MyWorkflow < Cadence::Workflow
include Cadence::Concerns::Versioned

version 1, MyWorkflowV1
version 2, MyWorkflowV2

def execute
Activity1.execute!
end
end
```

This way you don't need to make any changes to the invocation of your workflows — calling
`Cadence.start_workflow(MyWorkflow)` will resolve the latest version and schedule `MyWorkflowV2`.
It will still appear as if you're executing `MyWorkflow` from the Cadence UI, metrics, logging, etc.
This approach allows you to easily extend your existing workflows without changing anything outside
of your workflow.

When making a workflow versioned the main class (e.g. `MyWorkflow`) becomes the default version.
Once a workflow was scheduled its version will remain unchanged, so all the previously executed
workflows will be executed using the default implementation. Newly scheduled workflows will pick the
latest available version, but you can specify a version like this:

```ruby
Cadence.start_workflow(MyWorkflow, options: { version: 1 })
```

Once all the old versions are no longer in use you can remove those files and drop their `version`
definitions (just make sure not to change the numbers for versions that are in active use).

In case you want to do a gradual rollout you can override the version picker with your own
implementation:


```ruby
class MyWorkflow < Cadence::Workflow
include Cadence::Concerns::Versioned

version 1, MyWorkflowV1
version 2, MyWorkflowV2
version_picker do |_latest_version|
if my_feature_flag?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is cool. We're actually considering forcing our engineers to use a feature flag to avoid bouncing back and forth between code versions during deploys. I know it's harder in an SDK designed for many companies, but I wonder if you could nudge this toward being more encouraged.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm open to it and think it's a great practise. Do you have any specific ideas on how you'd force it? Removing the default version_picker implementation?

2
else
1
end
end

...
end
```

## Testing

It is crucial to properly test your workflows and activities before running them in production. The
Expand Down
1 change: 1 addition & 0 deletions examples/bin/worker
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ worker.register_workflow(SideEffectWorkflow)
worker.register_workflow(SimpleTimerWorkflow)
worker.register_workflow(TimeoutWorkflow)
worker.register_workflow(TripBookingWorkflow)
worker.register_workflow(VersionedWorkflow)

worker.register_activity(AsyncActivity)
worker.register_activity(EchoActivity)
Expand Down
13 changes: 7 additions & 6 deletions examples/spec/helpers.rb
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
require 'securerandom'
require 'cadence/workflow/history'

module Helpers
def run_workflow(workflow, *input, **args)
workflow_id = SecureRandom.uuid
run_id = Cadence.start_workflow(
workflow,
*input,
**args.merge(options: { workflow_id: workflow_id })
)
args[:options] = args.fetch(:options, {}).merge(workflow_id: workflow_id)

run_id = Cadence.start_workflow(workflow, *input, **args)

client = Cadence.send(:default_client)
connection = client.send(:connection)

connection.get_workflow_execution_history(
result = connection.get_workflow_execution_history(
domain: Cadence.configuration.domain,
workflow_id: workflow_id,
run_id: run_id,
next_page_token: nil,
wait_for_new_event: true,
event_type: :close
)

Cadence::Workflow::History.new(result.history.events)
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
it 'completes' do
result = run_workflow(described_class, 'Alice', 'Bob', 'John')

expect(result.history.events.first.eventType)
.to eq(CadenceThrift::EventType::WorkflowExecutionCompleted)
expect(result.events.first.type).to eq('WorkflowExecutionCompleted')
end
end
76 changes: 76 additions & 0 deletions examples/spec/integration/versioned_workflow_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
require 'workflows/versioned_workflow'
require 'cadence/json'

describe VersionedWorkflow, :integration do
context 'when scheduling' do
context 'without explicit version' do
it 'executes the latest version' do
result = run_workflow(described_class)

event = result.events.first

expect(event.type).to eq('WorkflowExecutionCompleted')
expect(Cadence::JSON.deserialize(event.attributes.result)).to eq('ECHO: version 2')
end
end

context 'with explicit version' do
let(:options) { { options: { version: 1 } } }

it 'executes the specified version' do
result = run_workflow(described_class, options)

event = result.events.first

expect(event.type).to eq('WorkflowExecutionCompleted')
expect(Cadence::JSON.deserialize(event.attributes.result)).to eq('ECHO: version 1')
end
end

context 'with a non-existing version' do
let(:options) { { options: { version: 3 } } }

it 'raises an error' do
expect do
run_workflow(described_class, options)
end.to raise_error(
Cadence::Concerns::Versioned::UnknownWorkflowVersion,
'Unknown version 3 for VersionedWorkflow'
)
end
end
end

context 'when already scheduled' do
context 'without a version' do
it 'executes the default version' do
# starting with a plain string to skip the automatic header setting
result = run_workflow('VersionedWorkflow')

event = result.events.first

expect(event.type).to eq('WorkflowExecutionCompleted')
expect(Cadence::JSON.deserialize(event.attributes.result)).to eq('ECHO: default version')
end
end

context 'with a non-existing version' do
let(:options) do
{
options: {
timeouts: { execution: 1 },
headers: { 'Version' => '3' }
}
}
end

it 'times out the workflow' do
result = run_workflow('VersionedWorkflow', options)

event = result.events.first

expect(event.type).to eq('WorkflowExecutionTimedOut')
end
end
end
end
16 changes: 16 additions & 0 deletions examples/workflows/versioned_workflow.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
require 'cadence/concerns/versioned'
require_relative './versioned_workflow_v1'
require_relative './versioned_workflow_v2'

class VersionedWorkflow < Cadence::Workflow
include Cadence::Concerns::Versioned

headers 'MyHeader' => 'MyValue'

version 1, VersionedWorkflowV1
version 2, VersionedWorkflowV2

def execute
EchoActivity.execute!('default version')
end
end
5 changes: 5 additions & 0 deletions examples/workflows/versioned_workflow_v1.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
class VersionedWorkflowV1 < Cadence::Workflow
def execute
EchoActivity.execute!('version 1')
end
end
7 changes: 7 additions & 0 deletions examples/workflows/versioned_workflow_v2.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
class VersionedWorkflowV2 < Cadence::Workflow
headers 'MyNewHeader' => 'MyNewValue'

def execute
EchoActivity.execute!('version 2')
end
end
101 changes: 101 additions & 0 deletions lib/cadence/concerns/versioned.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
require 'cadence/errors'

module Cadence
module Concerns
module Versioned
def self.included(base)
base.extend ClassMethods
end

VERSION_HEADER_NAME = 'Version'.freeze
DEFAULT_VERSION = 0

class UnknownWorkflowVersion < Cadence::ClientError; end

class Workflow
attr_reader :version, :main_class, :version_class

def initialize(main_class, version = nil)
version ||= main_class.pick_version
version_class = main_class.version_class_for(version)

@version = version
@main_class = main_class
@version_class = version_class
end

def domain
if version_class.domain
warn '[WARNING] Overriding domain in a workflow version is not yet supported. ' \
"Called from #{version_class}."
end

main_class.domain
end

def task_list
if version_class.task_list
warn '[WARNING] Overriding task_list in a workflow version is not yet supported. ' \
"Called from #{version_class}."
end

main_class.task_list
end

def retry_policy
version_class.retry_policy || main_class.retry_policy
end

def timeouts
version_class.timeouts || main_class.timeouts
end

def headers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would you change the version of a cron job? Would you have to resubmit the job after updating the workflow so that the header gets sent to Cadence and persisted across future workflow starts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, interesting! Yes, you'd have to resubmit it because all the values (along with the input arguments) are passed when you first schedule it

(version_class.headers || main_class.headers || {}).merge(VERSION_HEADER_NAME => version.to_s)
end
end

module ClassMethods
def version(number, workflow_class)
versions[number] = workflow_class
end

def execute_in_context(context, input)
version = context.headers.fetch(VERSION_HEADER_NAME, DEFAULT_VERSION).to_i
version_class = version_class_for(version)

if self == version_class
super
else
# forward the method call to the target version class
version_class.execute_in_context(context, input)
end
end

def version_class_for(version)
versions.fetch(version.to_i) do
raise UnknownWorkflowVersion, "Unknown version #{version} for #{self.name}"
end
end

def pick_version
version_picker.call(versions.keys.max)
end

private

DEFAULT_VERSION_PICKER = lambda { |latest_version| latest_version }

def version_picker(&block)
return @version_picker || DEFAULT_VERSION_PICKER unless block_given?
@version_picker = block
end

def versions
# Initialize with the default version
@versions ||= { DEFAULT_VERSION => self }
end
end
end
end
end
10 changes: 10 additions & 0 deletions lib/cadence/execution_options.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require 'cadence/concerns/executable'
require 'cadence/concerns/versioned'
require 'cadence/retry_policy'

module Cadence
Expand All @@ -17,6 +18,9 @@ def initialize(object, options, defaults = nil)

# For Cadence::Workflow and Cadence::Activity use defined values as the next option
if object.singleton_class.included_modules.include?(Concerns::Executable)
# In a versioned workflow merge the specific version options with default workflow options
object = Concerns::Versioned::Workflow.new(object, options[:version]) if versioned?(object)

@domain ||= object.domain
@task_list ||= object.task_list
@retry_policy = object.retry_policy.merge(@retry_policy) if object.retry_policy
Expand All @@ -41,5 +45,11 @@ def initialize(object, options, defaults = nil)

freeze
end

private

def versioned?(workflow)
workflow.singleton_class.included_modules.include?(Concerns::Versioned::ClassMethods)
end
end
end
Loading