Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 66 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,67 @@
# eventmesh-workflow
Apache eventmesh

Apache EventMesh workflow runtime — Serverless Workflow DSL `1.0.3` compatible.

## Documentation / 文档

| English | 中文 | Description / 说明 |
| --- | --- | --- |
| [DESIGN.md](docs/DESIGN.md) | [DESIGN_CN.md](docs/DESIGN_CN.md) | Architecture design, components, data model, task executors, A2A bridge |
| [USAGE.md](docs/USAGE.md) | [USAGE_CN.md](docs/USAGE_CN.md) | Quick start, DSL writing guide, 12 task types, REST API, complete examples |

## Key Features

- **Dual DSL Compatibility**: Serverless Workflow 1.0.3 (`document` + `do`) and 0.8 (`id` + `states`)
- **Zero External DSL Dependency**: Custom `third_party/swf/` parser replacing sdk-go/v2
- **12 Task Types**: call / listen / switch / set / do / fork / for / try / wait / raise / run / emit
- **Built-in Structural Executors**: fork (parallel) / try (error handling) / for (loop) / do (sequence)
- **A2A Bidirectional Bridge**: Workflow can call external A2A Agents; can also be exposed as an A2A Agent
- **JQ Data Filtering**: Input/output level JSON filter pipeline
- **4 Runtime Executors**: Operation / Event / Switch / LocalRuntime

## Architecture at a Glance

```
Controller(HTTP API) → DAL(MySQL) ← DSL Parser(swf)
Flow Engine → Queue(In-Memory/EventMesh) → Task Executors
├─ OperationTask → EventMesh/A2A
├─ EventTask
├─ SwitchTask → JQ condition matching
└─ LocalRuntimeTask → set/do/fork/for/try/wait/raise/run/emit
```

## Implementation Status

| Phase | Scope | Status |
| --- | --- | --- |
| 1 | Local DSL parser replacing sdk-go/v2 | Done |
| 2 | DSL 1.0.3 `document` + `do` parsing | Done |
| 3 | DSL 0.8 legacy compatibility | Done |
| 4 | 12 task type mapping | Done |
| 5 | Task relation graph construction (then / switch / fork) | Done |
| 6 | Structural task built-in executors | Done |
| 7 | output/schedule/data field support | Done |
| 8 | A2A bidirectional bridge (Client + WorkflowAgent) | Done |
| 9 | Full Go test suite | Done |

## Quick Start

```bash
# Initialize database
mysql -u root -p < distribution/mysql-schema.sql

# Build
make build

# Start services
./bin/eventmesh-workflow controller --config configs/controller.yaml
./bin/eventmesh-workflow engine --config configs/engine.yaml

# Register a workflow
curl -X POST http://localhost:8080/workflow \
-H "Content-Type: application/json" \
-d '{"workflow_id": "demo", "workflow_name": "demo", "definition": "document:\n dsl: \"1.0.3\"\n name: demo\n version: \"1.0.0\"\ndo:\n - hello:\n set:\n greeting: \"Hello, World!\"\n then: end"}'
```

Example workflows: `configs/testcreateworkflow-v1.yaml` (DSL 1.0.3) / `configs/testcreateworkflow.yaml` (DSL 0.8)
1 change: 1 addition & 0 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func (s *Server) router() {
s.server.GET("/workflow/:workflowId", s.workflow.QueryDetail)
s.server.DELETE("/workflow/:workflowId", s.workflow.Delete)
s.server.GET("/workflow/instances", s.workflow.QueryInstances)
s.server.POST("/workflow/start", s.workflow.Start)
}

func (s *Server) setupConfig() error {
Expand Down
44 changes: 43 additions & 1 deletion cmd/controller/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
package main

import (
"context"
"net/http"

"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/flow"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal"
"github.com/apache/incubator-eventmesh/eventmesh-workflow-go/internal/dal/model"
"github.com/gin-gonic/gin"
"net/http"
)

const (
Expand All @@ -29,11 +32,13 @@ const (
// WorkflowController workflow controller operations
type WorkflowController struct {
workflowDAL dal.WorkflowDAL
engine *flow.Engine
}

func NewWorkflowController() *WorkflowController {
c := WorkflowController{}
c.workflowDAL = dal.NewWorkflowDAL()
c.engine = flow.NewEngine()
return &c
}

Expand Down Expand Up @@ -143,6 +148,43 @@ func (c *WorkflowController) Delete(ctx *gin.Context) {
ctx.JSON(http.StatusOK, nil)
}

// StartWorkflowRequest start workflow request
type StartWorkflowRequest struct {
WorkflowID string `json:"workflow_id" binding:"required"`
Input string `json:"input"`
}

// StartWorkflowResponse start workflow response
type StartWorkflowResponse struct {
InstanceID string `json:"instance_id"`
}

// Start start a workflow instance
// @Summary start a workflow instance
// @Description start a workflow instance
// @Tags workflow
// @Accept json
// @Produce json
// @Param request body StartWorkflowRequest true "start request"
// @Success 200 {object} StartWorkflowResponse
// @Failure 400
// @Failure 500
// @Router /workflow/start [post]
func (c *WorkflowController) Start(ctx *gin.Context) {
request := StartWorkflowRequest{}
if err := ctx.ShouldBind(&request); err != nil {
ctx.JSON(http.StatusBadRequest, err.Error())
return
}
param := &flow.WorkflowParam{ID: request.WorkflowID, Input: request.Input}
instanceID, err := c.engine.Start(context.Background(), param)
if err != nil {
ctx.JSON(http.StatusInternalServerError, err.Error())
return
}
ctx.JSON(http.StatusOK, StartWorkflowResponse{InstanceID: instanceID})
}

// QueryInstances query workflow instances
// @Summary query workflow instances
// @Description query workflow instances
Expand Down
69 changes: 69 additions & 0 deletions configs/testcreateworkflow-v1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

document:
dsl: '1.0.3'
namespace: eventmesh.apache.org
name: store-order-management
version: '1.0.0'
title: Store Order Management Workflow
use:
functions:
sendOrder:
call: asyncapi
with:
operation: file://orderapp.yaml#sendOrder
sendPayment:
call: asyncapi
with:
operation: file://paymentapp.yaml#sendPayment
sendShipment:
call: asyncapi
with:
operation: file://expressapp.yaml#sendExpress
do:
- receiveNewOrderEvent:
listen:
to:
one:
with:
type: online.store.newOrder
source: store/order
then: checkNewOrderResult
- checkNewOrderResult:
switch:
- newOrderSuccessful:
when: .order_no != ""
then: sendOrderPayment
- newOrderFailed:
then: end
- sendOrderPayment:
call: asyncapi
with:
operation: file://paymentapp.yaml#sendPayment
then: checkPaymentStatus
- checkPaymentStatus:
switch:
- paymentSuccessful:
when: .order_no != ""
then: sendOrderShipment
- paymentDenied:
then: end
- sendOrderShipment:
call: asyncapi
with:
operation: file://expressapp.yaml#sendExpress
then: end
Loading
Loading