Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 148 additions & 0 deletions .github/workflows/validate-examples-rc.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
name: validate-examples-rc

on:
schedule:
# Run daily at 08:08 UTC
- cron: "8 8 * * *"
pull_request:
branches:
- release-*
workflow_dispatch:
inputs:
branch:
description: "Branch to run the workflow against"
required: false
default: "main"
dapr_version:
description: "Dapr/Dapr RC version to use (leave empty to auto-detect latest RC)"
required: false
default: ""
daprcli_version:
description: "Dapr/CLI RC version to use (leave empty to auto-detect latest RC)"
required: false
default: ""

permissions:
contents: read

jobs:
setup:
runs-on: ubuntu-latest
outputs:
RC_FOUND: ${{ steps.find-rc.outputs.RC_FOUND }}
DAPR_RUNTIME_VERSION: ${{ steps.find-rc.outputs.DAPR_RUNTIME_VERSION }}
DAPR_CLI_VERSION: ${{ steps.find-rc.outputs.DAPR_CLI_VERSION }}
EXAMPLES_MATRIX: ${{ steps.examples.outputs.matrix }}
steps:
- name: Check out code
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
with:
ref: ${{ github.event.inputs.branch || github.ref }}

- name: Find latest Dapr RC versions
id: find-rc
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
# Determine Dapr runtime RC version
if [ -n "${{ github.event.inputs.dapr_version }}" ]; then
RUNTIME_VERSION="${{ github.event.inputs.dapr_version }}"
echo "Using provided Dapr runtime version: $RUNTIME_VERSION"
else
RUNTIME_VERSION=$(gh api repos/dapr/dapr/releases --paginate --jq '[.[] | select(.prerelease == true and (.tag_name | test("rc"))) | .tag_name][0]' | head -1 | tr -d 'v')
echo "Latest Dapr runtime RC version: $RUNTIME_VERSION"
fi

# Determine Dapr CLI RC version
if [ -n "${{ github.event.inputs.daprcli_version }}" ]; then
CLI_VERSION="${{ github.event.inputs.daprcli_version }}"
echo "Using provided Dapr CLI version: $CLI_VERSION"
else
CLI_VERSION=$(gh api repos/dapr/cli/releases --paginate --jq '[.[] | select(.prerelease == true and (.tag_name | test("rc"))) | .tag_name][0]' | head -1 | tr -d 'v')
echo "Latest Dapr CLI RC version: $CLI_VERSION"
fi

if [ -z "$RUNTIME_VERSION" ]; then
echo "No Dapr runtime RC version found."
echo "RC_FOUND=false" >> "$GITHUB_OUTPUT"
exit 0
fi

if [ -z "$CLI_VERSION" ]; then
echo "No Dapr CLI RC version found, falling back to latest stable CLI."
CLI_VERSION=$(gh api repos/dapr/cli/releases/latest --jq '.tag_name' | tr -d 'v')
echo "Using latest stable Dapr CLI version: $CLI_VERSION"
fi

echo "RC_FOUND=true" >> "$GITHUB_OUTPUT"
echo "DAPR_RUNTIME_VERSION=$RUNTIME_VERSION" >> "$GITHUB_OUTPUT"
echo "DAPR_CLI_VERSION=$CLI_VERSION" >> "$GITHUB_OUTPUT"

- name: Discover examples
id: examples
run: |
EXAMPLES=$(find examples/src -name 'README.md' -exec dirname {} \; \
| sed 's|^examples/src/||' | sort | jq -Rnc '[inputs]')
echo "matrix=$EXAMPLES" >> "$GITHUB_OUTPUT"

validate-example:
needs: setup
if: needs.setup.outputs.RC_FOUND == 'true'
runs-on: ubuntu-latest
env:
PYTHON_VER: 3.12
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/master/install/install.sh
DAPR_CLI_VERSION: ${{ needs.setup.outputs.DAPR_CLI_VERSION }}
DAPR_RUNTIME_VERSION: ${{ needs.setup.outputs.DAPR_RUNTIME_VERSION }}
RUST_BACKTRACE: full

strategy:
fail-fast: false
matrix:
examples: ${{ fromJson(needs.setup.outputs.EXAMPLES_MATRIX) }}
steps:
- name: Check out code
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
with:
ref: ${{ github.event.inputs.branch || github.ref }}

- name: Rust setup
run: rustup toolchain install stable --profile minimal

- name: Install Protoc
uses: arduino/setup-protoc@c65c819552d16ad3c9b72d9dfd5ba5237b9c906b # v3.0.0
with:
version: "24.4"
repo-token: ${{ secrets.GITHUB_TOKEN }}

- name: Set up Dapr CLI ${{ env.DAPR_CLI_VERSION }}
run: wget -q ${{ env.DAPR_INSTALL_URL }} -O - | /bin/bash -s ${{ env.DAPR_CLI_VERSION }}

- name: Initialize Dapr runtime ${{ env.DAPR_RUNTIME_VERSION }}
run: |
dapr uninstall --all
dapr init --runtime-version ${{ env.DAPR_RUNTIME_VERSION }}

- name: List running containers
run: |
docker ps -a

- name: Set up Python ${{ env.PYTHON_VER }}
uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0
with:
python-version: ${{ env.PYTHON_VER }}

- name: Install Mechanical Markdown
run: |
python -m pip install --upgrade pip
pip install mechanical-markdown

- name: Dapr version
run: |
dapr version
docker ps -a

- name: Check Example
run: |
cd examples
./validate.sh ${{ matrix.examples }}
4 changes: 2 additions & 2 deletions dapr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ workflow = ["dep:dapr-durabletask", "dep:tokio-util"]
async-trait = { workspace = true }
axum = "0.7"
chrono = "0.4"
dapr-durabletask = { version = "0.0.1", optional = true }
dapr-durabletask = { version = "0.0.2", optional = true }
futures = "0.3"
http = "1"
log = "0.4"
Expand All @@ -40,7 +40,7 @@ once_cell = "1.19"
dapr = { path = "./" }
dapr-macros = { path = "../dapr-macros" }
tokio = { workspace = true, features = ["full"] }
uuid = { version = "=1.23.0", features = ["v4"] }
uuid = { version = "=1.23.2", features = ["v4"] }
tokio-stream = { workspace = true }
hyper = "1.8.1"
http-body-util = "0.1"
99 changes: 98 additions & 1 deletion dapr/src/appcallback.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::dapr::proto::runtime::v1::app_callback_alpha_server::AppCallbackAlpha;
use crate::dapr::proto::runtime::v1::app_callback_server::AppCallback;
use crate::dapr::proto::{common, runtime};
use std::collections::HashMap;
Expand Down Expand Up @@ -40,6 +41,12 @@ pub type TopicEventBulkRequest = runtime::v1::TopicEventBulkRequest;
/// It includes the result for each event in the request.
pub type TopicEventBulkResponse = runtime::v1::TopicEventBulkResponse;

/// JobEventRequest is the request message for a job event callback.
pub type JobEventRequest = runtime::v1::JobEventRequest;

/// JobEventResponse is the response from the app when a job is triggered.
pub type JobEventResponse = runtime::v1::JobEventResponse;

impl ListTopicSubscriptionsResponse {
/// Create `ListTopicSubscriptionsResponse` with a topic.
pub fn topic(pubsub_name: String, topic: String) -> Self {
Expand Down Expand Up @@ -82,6 +89,7 @@ impl ListInputBindingsResponse {

pub struct AppCallbackService {
handlers: Vec<Handler>,
job_handlers: HashMap<String, Box<dyn JobHandlerMethod + Send + Sync + 'static>>,
}

pub struct Handler {
Expand Down Expand Up @@ -156,6 +164,52 @@ impl AppCallback for AppCallbackService {
) -> Result<Response<TopicEventBulkResponse>, Status> {
todo!("on_bulk_topic_event is not implemented yet")
}

async fn on_job_event(
&self,
request: Request<runtime::v1::JobEventRequest>,
) -> Result<Response<runtime::v1::JobEventResponse>, Status> {
let request_inner = request.into_inner();
let job_name = if !request_inner.name.is_empty() {
request_inner.name.clone()
} else if let Some(stripped) = request_inner.method.strip_prefix("job/") {
stripped.to_string()
} else {
return Err(Status::invalid_argument(format!(
"cannot determine job name from request (method={:?})",
request_inner.method,
)));
};

if let Some(handler) = self.job_handlers.get(&job_name) {
let handle_response = handler.handler(request_inner).await;
handle_response.map(Response::new)
} else {
Err(Status::not_found(format!(
"no handler registered for job {:?}",
job_name,
)))
}
}
}

// Also implement AppCallbackAlpha so the same service handles
// Dapr ≤ 1.17 runtimes that call OnJobEventAlpha1 / OnBulkTopicEventAlpha1.
#[tonic::async_trait]
impl AppCallbackAlpha for AppCallbackService {
async fn on_bulk_topic_event_alpha1(
&self,
request: Request<runtime::v1::TopicEventBulkRequest>,
) -> Result<Response<runtime::v1::TopicEventBulkResponse>, Status> {
self.on_bulk_topic_event(request).await
}

async fn on_job_event_alpha1(
&self,
request: Request<runtime::v1::JobEventRequest>,
) -> Result<Response<runtime::v1::JobEventResponse>, Status> {
self.on_job_event(request).await
}
}

impl Default for AppCallbackService {
Expand Down Expand Up @@ -192,12 +246,19 @@ impl AppCallbackService {
/// The actor HTTP server ([`crate::server::DaprHttpServer`]) installs
/// the layer automatically.
pub fn new() -> AppCallbackService {
AppCallbackService { handlers: vec![] }
AppCallbackService {
handlers: vec![],
job_handlers: HashMap::new(),
}
}

pub fn add_handler(&mut self, handler: Handler) {
self.handlers.push(handler)
}

pub fn add_job_handler(&mut self, job_name: String, handler: Box<dyn JobHandlerMethod>) {
self.job_handlers.insert(job_name, handler);
}
}

#[tonic::async_trait]
Expand All @@ -207,3 +268,39 @@ pub trait HandlerMethod: Send + Sync + 'static {
request: runtime::v1::TopicEventRequest,
) -> Result<Response<runtime::v1::TopicEventResponse>, Status>;
}

#[tonic::async_trait]
pub trait JobHandlerMethod: Send + Sync + 'static {
async fn handler(
&self,
request: runtime::v1::JobEventRequest,
) -> Result<runtime::v1::JobEventResponse, Status>;
}

#[macro_export]
macro_rules! add_job_handler {
($app_callback_service:expr, $handler_name:ident, $handler_fn:expr) => {
pub struct $handler_name {}

#[$crate::reexport::async_trait]
impl $crate::appcallback::JobHandlerMethod for $handler_name {
async fn handler(
&self,
request: $crate::appcallback::JobEventRequest,
) -> ::std::result::Result<$crate::appcallback::JobEventResponse, ::tonic::Status>
{
$handler_fn(request).await
}
}

impl $handler_name {
pub fn new() -> Self {
$handler_name {}
}
}

let handler_name = $handler_name.to_string();

$app_callback_service.add_job_handler(handler_name, Box::new($handler_name::new()));
};
}
Loading
Loading