Skip to content

[ISSUE #22]Feature Request: Upgrade to Serverless Workflow DSL 1.0.3#23

Merged
xwm1992 merged 11 commits into
apache:mainfrom
qqeasonchen:main
Jul 2, 2026
Merged

[ISSUE #22]Feature Request: Upgrade to Serverless Workflow DSL 1.0.3#23
xwm1992 merged 11 commits into
apache:mainfrom
qqeasonchen:main

Conversation

@qqeasonchen

Copy link
Copy Markdown
Contributor

#22

…ask support

### Breaking / Architecture Changes
- Remove direct dependency on github.com/serverlessworkflow/sdk-go/v2
- Add local swf model (third_party/swf/model.go) with zero external deps
- Add local swf parser (third_party/swf/swf.go) supporting both v1.x and legacy 0.8
- swf.Task replaces pmodel states; swf.Workflow is the new canonical model

### New Structured Task Types (DSL 1.0.3)
- set / do / fork / for / try / wait / raise / run / emit / listen
- All mapped to local runtime executors (internal/task/local_runtime.go)
- set: JQ-based data transformation
- wait: duration-based pause
- raise: error propagation
- run / emit: EventMesh event publishing

### Schema Validation
- Duplicate task name detection
- Then reference validation (unknown target → clear error)
- Required field checks at parse time

### DAL & Runtime
- buildTask/buildTaskRelation rebuild on swf model
- Switch task enhanced: conditionless cases treated as default
- isLocalRuntimeTask() dispatches to NewLocalRuntimeTask
- publishNextOrComplete() shared between operation and local runtime

### Tests
- 12 unit + integration tests covering v1, legacy, structured tasks, validation
- Config file integration tests for both testcreateworkflow.yaml variants
- data_filter test updated for v1 runtime expression format

### Files Changed: 13 modified + 5 new = 18 files
### Fork / Try / For Structured Task Executors
- fork: DAL builds relations to ALL branches (multi-transition)
  Runtime publishes all branch start tasks via publishNextTasks()
- try: Runtime executes try body; parser now captures catch.when as children
- for: Runtime iterates over JSON array input, runs do body for each item
- do: Runtime executes inline set/raise tasks within do block

### DSL 1.0.3 output / data / schedule Fields
- swf.Workflow: new Schedule, Input, Output fields
- swf.Task: new OutputFilter, InlineData fields
- Parser extracts schedule (start/cron), input.from, output.as
- DAL stores TaskOutputFilter for runtime filtering
- Output filter applied after task execution via filter.FilterWorkflowTaskOutputData

### Output Filter Pipeline
- Task model gains TaskOutputFilter column
- Filters applied at runtime in local_runtime.Run() before publishNextOrComplete

### A2A ↔ Workflow Bridge (internal/bridge/)
- a2a_types.go: A2A client (HTTP), message types (TaskRequest/Response/AgentCard)
- a2a_executor.go: Polling executor for workflow→A2A delegation
- workflow_agent.go: HTTP agent exposing workflow as A2A endpoint
- operation_task.go: runA2AAction() dispatches to A2A agent when OperationType='a2a'

### Code Cleanup
- newBaseTask() factory in task.go standardizes baseTask construction
- All task constructors use newBaseTask() with TaskOutputFilter propagation

### Tests: 17 passing (3 new fork/try/for parser tests + 2 schedule/output tests)
- docs/DESIGN.md: architecture overview, DSL parser, task graph, 4 executors, A2A bridge, DB model, filter pipeline
- docs/USAGE.md: quick start, 12 task types with YAML examples, input/output filtering, schedule, REST API, A2A integration
- README.md: updated with doc index, feature highlights, and quick start pointers
- docs/DESIGN.md: full English translation of architecture design document
- docs/USAGE.md: full English translation of user guide
- README.md: updated to English-first
- go test ./... : PASS | go vet ./... : clean
- docs/DESIGN_CN.md: Chinese translation of architecture design
- docs/USAGE_CN.md: Chinese translation of user guide
- README.md: bilingual doc index (EN default, CN reference)
- go test ./... : PASS | go vet ./... : clean
- Fix 16 errcheck errors: add error handling for unchecked error returns
  - metrics.Inc/Dec/Add/RecordLatency calls: use _ = to explicitly ignore errors
  - l.Release() calls: add error logging
  - w.Write calls: add error logging
- Fix 1 ineffassign error in internal/dal/config.go
- Update CI workflow to use Go 1.18 (matches go.mod requirement)
- golangci-lint run ./... : PASS (0 errors, 0 warnings)

Fixes apache#22
- 修复 errcheck: metrics.loadAllCollectors 用 mustRegister 处理 error
- 修复 errcheck: a2a_types.go defer resp.Body.Close() 用闭包显式忽略
- 修复 errcheck: workflow_agent.go w.Write 返回值已用 _, _ = 忽略
- 修复 metrics.getCollectorByNameAndType gauge 读错 map (histograms -> gauges)
- ObserveQueue 接口新增 UnSubscribe(); InMemory/EventMesh 实现补 stub
- 新增 POST /workflow/start 端点 (WorkflowController.Start)
- cmd/controller 注册 /workflow/start 路由
- operation_task.go: t.baseTask.queue -> t.queue, t.baseTask.input -> t.input
- switch_task.go: t.baseTask.queue -> t.queue, t.baseTask.input -> t.input

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR upgrades eventmesh-workflow to support Serverless Workflow DSL 1.0.3 while preserving 0.8 compatibility by replacing the external sdk-go/v2 dependency with a custom parser, expanding the runtime to handle structured task types, and introducing initial A2A (agent-to-agent) bridging plus updated docs.

Changes:

  • Introduces a zero-external-dependency SWF parser (third_party/swf) supporting DSL 1.0.3 (document + do) and legacy 0.8 (id + states) with validation + flattening.
  • Extends runtime dispatch/execution to cover structured/local task types (set/do/fork/for/try/wait/raise/run/emit), adds output filtering, and updates DAL graph construction.
  • Adds initial A2A bridge components (client + executor + workflow agent HTTP endpoints) and expands documentation/usage guides.

Reviewed changes

Copilot reviewed 37 out of 37 changed files in this pull request and generated 10 comments.

Show a summary per file
File Description
third_party/swf/swf.go Replaces sdk-go parsing with a custom YAML→Workflow parser for v1 + legacy and task/action extraction.
third_party/swf/model.go Adds internal SWF model types, flattening, and validation helpers.
third_party/swf/swf_test.go Adds unit tests for v1/legacy parsing, validation, and structured constructs.
third_party/swf/parser_integration_test.go Adds integration tests that parse real config fixtures if present.
internal/dal/workflow.go Updates DAL task/action/relation building to consume the new SWF model and support fork/switch relations.
internal/dal/model/workflow_task.go Extends task persistence model with output filter + task definition fields.
internal/task/task.go Updates task factory to route local-runtime task types and support listen/event routing.
internal/task/runtime_util.go Adds shared helpers for publishing next tasks / completing workflows and local-runtime type detection.
internal/task/local_runtime.go Introduces LocalRuntime executor implementation for structural tasks.
internal/task/operation_task.go Updates operation execution flow and adds A2A execution path.
internal/task/event_task.go Makes event task creation more defensive and aligns metrics usage.
internal/task/switch_task.go Refactors switch matching to support default transitions and extracts condition evaluation helper.
internal/task/a2a_integration.go Adds A2A task detection + executor construction glue for tasks.
internal/bridge/a2a_types.go Adds A2A protocol types and a small HTTP client.
internal/bridge/a2a_executor.go Adds polling-based A2A executor used by workflow tasks.
internal/bridge/workflow_agent.go Adds HTTP endpoints to expose workflows as A2A agents.
internal/filter/data_filter.go Adds output filtering and expression normalization for JQ evaluation.
internal/filter/data_filter_test.go Improves filter tests and adds coverage for v1-style expressions and JSON validity.
internal/queue/queue.go Extends queue interface with UnSubscribe().
internal/queue/in_memory_queue.go Implements UnSubscribe() and makes metrics calls ignore return errors.
internal/queue/eventmesh_queue.go Implements UnSubscribe() and makes metrics calls ignore return errors.
internal/metrics/metrics.go Hardens collector registration and fixes gauge collector lookup.
internal/schedule/inline_scheduler.go Logs lock release errors and ignores latency record return value.
middleware/dblock/lock.go Logs lock release errors during refresh deadline failures.
internal/dal/config.go Properly handles sql.Open error return.
internal/constants/constants.go Adds constants for new task types (set/do/fork/for/try/wait/raise/run/emit/listen/call).
flow/engine.go Makes metrics increments non-blocking by ignoring errors.
cmd/controller/workflow.go Adds /workflow/start API endpoint to start workflow instances via engine.
cmd/controller/main.go Registers the new /workflow/start route.
configs/testcreateworkflow-v1.yaml Adds a DSL 1.0.3 example workflow config.
go.mod Removes github.com/serverlessworkflow/sdk-go/v2 dependency.
README.md Expands project overview, feature list, architecture summary, and quick start.
docs/USAGE.md Adds comprehensive user guide (DSL, tasks, API, A2A usage, limitations/roadmap).
docs/USAGE_CN.md Adds Chinese version of the user guide.
docs/DESIGN.md Adds detailed design doc for parser/runtime/DAL/A2A and data flow.
docs/DESIGN_CN.md Adds Chinese version of the design doc.
docs/ISSUE.md Adds a local copy of Issue #22 feature request details for documentation/tracking.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread third_party/swf/swf.go
Comment on lines +258 to +269
switch operationType {
case "http":
operationName = expressionString(with["endpoint"])
case "openapi":
operationName = firstNonEmpty(expressionString(with["operationId"]), expressionString(with["operation"]), expressionString(with["document"]))
case "asyncapi":
operationName = firstNonEmpty(expressionString(with["operation"]), expressionString(with["channel"]), expressionString(with["document"]))
case "grpc":
operationName = firstNonEmpty(expressionString(with["service"]), expressionString(with["method"]))
default:
operationName = firstNonEmpty(operationName, expressionString(with["operation"]), expressionString(with["endpoint"]))
}
Comment thread third_party/swf/swf.go
Comment on lines +409 to +414
func expressionString(value interface{}) string {
text := strings.TrimSpace(asString(value))
text = strings.TrimPrefix(text, "${")
text = strings.TrimSuffix(text, "}")
return strings.TrimSpace(text)
}
Comment on lines +91 to +95
res, err := t.jq.One(jqData, condition)
if err != nil {
return false, err
}
return strconv.ParseBool(gconv.String(res))
Comment on lines +34 to +39
if len(action.OperationName) > 0 {
if action.OperationName[:3] == "a2a" {
return true
}
}
return false
Comment on lines +159 to +163
func (t *localRuntimeTask) executeTry() (string, error) {
tryTasks := asSlice(t.definition["try"])
if len(tryTasks) == 0 {
return t.input, nil
}
Comment thread internal/dal/workflow.go
Comment on lines +423 to +425
if workflowTask.Type == swf.TaskTypeFork {
taskRelations = append(taskRelations, w.buildForkTaskRelation(workflow.ID, workflowTask, fromTaskID, taskIDs)...)
continue
Comment on lines 26 to 29
TaskInputFilter string `json:"task_input_filter" gorm:"column:task_input_filter;type:varchar;size:1024"`
TaskOutputFilter string `json:"task_output_filter" gorm:"column:task_output_filter;type:varchar;size:1024"`
TaskDefinition string `json:"task_definition" gorm:"-"`
Status int `json:"status" gorm:"column:status;type:int"`
Comment on lines 59 to +63
var taskInstanceID = uuid.New().String()
var taskInstance = model.WorkflowTaskInstance{WorkflowInstanceID: t.workflowInstanceID, WorkflowID: t.workflowID,
TaskID: t.transition.ToTaskID, TaskInstanceID: taskInstanceID, Status: constants.TaskInstanceSleepStatus,
Input: t.baseTask.input}
if err := t.baseTask.queue.Publish([]*model.WorkflowTaskInstance{&taskInstance}); err != nil {
Input: t.input}
if err := t.queue.Publish([]*model.WorkflowTaskInstance{&taskInstance}); err != nil {
Comment on lines +144 to +147
resp := A2ATaskResponse{
ID: taskID,
Status: A2ATaskStatusWorking,
Message: A2AMessage{
Comment on lines +182 to +203
func (t *localRuntimeTask) executeFor() (string, error) {
forDef := asMap(t.definition["for"])
forDo := asMap(forDef["do"])
if len(forDo) == 0 {
return t.input, nil
}
var items []interface{}
if t.input != "" {
var inputData interface{}
if err := json.Unmarshal([]byte(t.input), &inputData); err == nil {
if arr, ok := inputData.([]interface{}); ok {
items = arr
}
}
}
if len(items) == 0 {
items = append(items, t.input)
}
for _, item := range items {
itemInput, _ := json.Marshal(item)
for _, taskItem := range asSlice(forDo["do"]) {
taskMap := asMap(taskItem)

@xwm1992 xwm1992 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@xwm1992 xwm1992 merged commit 2e21cdb into apache:main Jul 2, 2026
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants