diff --git a/.github/workflows/validate-examples-rc.yml b/.github/workflows/validate-examples-rc.yml new file mode 100644 index 0000000..6461c92 --- /dev/null +++ b/.github/workflows/validate-examples-rc.yml @@ -0,0 +1,148 @@ +name: validate-examples-rc + +on: + schedule: + # Run daily at 08:08 UTC + - cron: "8 8 * * *" + pull_request: + branches: + - release-* + workflow_dispatch: + inputs: + branch: + description: "Branch to run the workflow against" + required: false + default: "main" + dapr_version: + description: "Dapr/Dapr RC version to use (leave empty to auto-detect latest RC)" + required: false + default: "" + daprcli_version: + description: "Dapr/CLI RC version to use (leave empty to auto-detect latest RC)" + required: false + default: "" + +permissions: + contents: read + +jobs: + setup: + runs-on: ubuntu-latest + outputs: + RC_FOUND: ${{ steps.find-rc.outputs.RC_FOUND }} + DAPR_RUNTIME_VERSION: ${{ steps.find-rc.outputs.DAPR_RUNTIME_VERSION }} + DAPR_CLI_VERSION: ${{ steps.find-rc.outputs.DAPR_CLI_VERSION }} + EXAMPLES_MATRIX: ${{ steps.examples.outputs.matrix }} + steps: + - name: Check out code + uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + with: + ref: ${{ github.event.inputs.branch || github.ref }} + + - name: Find latest Dapr RC versions + id: find-rc + env: + GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} + run: | + # Determine Dapr runtime RC version + if [ -n "${{ github.event.inputs.dapr_version }}" ]; then + RUNTIME_VERSION="${{ github.event.inputs.dapr_version }}" + echo "Using provided Dapr runtime version: $RUNTIME_VERSION" + else + RUNTIME_VERSION=$(gh api repos/dapr/dapr/releases --paginate --jq '[.[] | select(.prerelease == true and (.tag_name | test("rc"))) | .tag_name][0]' | head -1 | tr -d 'v') + echo "Latest Dapr runtime RC version: $RUNTIME_VERSION" + fi + + # Determine Dapr CLI RC version + if [ -n "${{ github.event.inputs.daprcli_version }}" ]; then + CLI_VERSION="${{ github.event.inputs.daprcli_version }}" + echo "Using provided Dapr CLI version: $CLI_VERSION" + else + CLI_VERSION=$(gh api repos/dapr/cli/releases --paginate --jq '[.[] | select(.prerelease == true and (.tag_name | test("rc"))) | .tag_name][0]' | head -1 | tr -d 'v') + echo "Latest Dapr CLI RC version: $CLI_VERSION" + fi + + if [ -z "$RUNTIME_VERSION" ]; then + echo "No Dapr runtime RC version found." + echo "RC_FOUND=false" >> "$GITHUB_OUTPUT" + exit 0 + fi + + if [ -z "$CLI_VERSION" ]; then + echo "No Dapr CLI RC version found, falling back to latest stable CLI." + CLI_VERSION=$(gh api repos/dapr/cli/releases/latest --jq '.tag_name' | tr -d 'v') + echo "Using latest stable Dapr CLI version: $CLI_VERSION" + fi + + echo "RC_FOUND=true" >> "$GITHUB_OUTPUT" + echo "DAPR_RUNTIME_VERSION=$RUNTIME_VERSION" >> "$GITHUB_OUTPUT" + echo "DAPR_CLI_VERSION=$CLI_VERSION" >> "$GITHUB_OUTPUT" + + - name: Discover examples + id: examples + run: | + EXAMPLES=$(find examples/src -name 'README.md' -exec dirname {} \; \ + | sed 's|^examples/src/||' | sort | jq -Rnc '[inputs]') + echo "matrix=$EXAMPLES" >> "$GITHUB_OUTPUT" + + validate-example: + needs: setup + if: needs.setup.outputs.RC_FOUND == 'true' + runs-on: ubuntu-latest + env: + PYTHON_VER: 3.12 + DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/master/install/install.sh + DAPR_CLI_VERSION: ${{ needs.setup.outputs.DAPR_CLI_VERSION }} + DAPR_RUNTIME_VERSION: ${{ needs.setup.outputs.DAPR_RUNTIME_VERSION }} + RUST_BACKTRACE: full + + strategy: + fail-fast: false + matrix: + examples: ${{ fromJson(needs.setup.outputs.EXAMPLES_MATRIX) }} + steps: + - name: Check out code + uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + with: + ref: ${{ github.event.inputs.branch || github.ref }} + + - name: Rust setup + run: rustup toolchain install stable --profile minimal + + - name: Install Protoc + uses: arduino/setup-protoc@c65c819552d16ad3c9b72d9dfd5ba5237b9c906b # v3.0.0 + with: + version: "24.4" + repo-token: ${{ secrets.GITHUB_TOKEN }} + + - name: Set up Dapr CLI ${{ env.DAPR_CLI_VERSION }} + run: wget -q ${{ env.DAPR_INSTALL_URL }} -O - | /bin/bash -s ${{ env.DAPR_CLI_VERSION }} + + - name: Initialize Dapr runtime ${{ env.DAPR_RUNTIME_VERSION }} + run: | + dapr uninstall --all + dapr init --runtime-version ${{ env.DAPR_RUNTIME_VERSION }} + + - name: List running containers + run: | + docker ps -a + + - name: Set up Python ${{ env.PYTHON_VER }} + uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0 + with: + python-version: ${{ env.PYTHON_VER }} + + - name: Install Mechanical Markdown + run: | + python -m pip install --upgrade pip + pip install mechanical-markdown + + - name: Dapr version + run: | + dapr version + docker ps -a + + - name: Check Example + run: | + cd examples + ./validate.sh ${{ matrix.examples }} diff --git a/dapr/Cargo.toml b/dapr/Cargo.toml index da5dde0..7373696 100644 --- a/dapr/Cargo.toml +++ b/dapr/Cargo.toml @@ -18,7 +18,7 @@ workflow = ["dep:dapr-durabletask", "dep:tokio-util"] async-trait = { workspace = true } axum = "0.7" chrono = "0.4" -dapr-durabletask = { version = "0.0.1", optional = true } +dapr-durabletask = { version = "0.0.2", optional = true } futures = "0.3" http = "1" log = "0.4" @@ -40,7 +40,7 @@ once_cell = "1.19" dapr = { path = "./" } dapr-macros = { path = "../dapr-macros" } tokio = { workspace = true, features = ["full"] } -uuid = { version = "=1.23.0", features = ["v4"] } +uuid = { version = "=1.23.2", features = ["v4"] } tokio-stream = { workspace = true } hyper = "1.8.1" http-body-util = "0.1" diff --git a/dapr/src/appcallback.rs b/dapr/src/appcallback.rs index ad9508d..2ef4720 100644 --- a/dapr/src/appcallback.rs +++ b/dapr/src/appcallback.rs @@ -1,3 +1,4 @@ +use crate::dapr::proto::runtime::v1::app_callback_alpha_server::AppCallbackAlpha; use crate::dapr::proto::runtime::v1::app_callback_server::AppCallback; use crate::dapr::proto::{common, runtime}; use std::collections::HashMap; @@ -40,6 +41,12 @@ pub type TopicEventBulkRequest = runtime::v1::TopicEventBulkRequest; /// It includes the result for each event in the request. pub type TopicEventBulkResponse = runtime::v1::TopicEventBulkResponse; +/// JobEventRequest is the request message for a job event callback. +pub type JobEventRequest = runtime::v1::JobEventRequest; + +/// JobEventResponse is the response from the app when a job is triggered. +pub type JobEventResponse = runtime::v1::JobEventResponse; + impl ListTopicSubscriptionsResponse { /// Create `ListTopicSubscriptionsResponse` with a topic. pub fn topic(pubsub_name: String, topic: String) -> Self { @@ -82,6 +89,7 @@ impl ListInputBindingsResponse { pub struct AppCallbackService { handlers: Vec, + job_handlers: HashMap>, } pub struct Handler { @@ -156,6 +164,52 @@ impl AppCallback for AppCallbackService { ) -> Result, Status> { todo!("on_bulk_topic_event is not implemented yet") } + + async fn on_job_event( + &self, + request: Request, + ) -> Result, Status> { + let request_inner = request.into_inner(); + let job_name = if !request_inner.name.is_empty() { + request_inner.name.clone() + } else if let Some(stripped) = request_inner.method.strip_prefix("job/") { + stripped.to_string() + } else { + return Err(Status::invalid_argument(format!( + "cannot determine job name from request (method={:?})", + request_inner.method, + ))); + }; + + if let Some(handler) = self.job_handlers.get(&job_name) { + let handle_response = handler.handler(request_inner).await; + handle_response.map(Response::new) + } else { + Err(Status::not_found(format!( + "no handler registered for job {:?}", + job_name, + ))) + } + } +} + +// Also implement AppCallbackAlpha so the same service handles +// Dapr ≤ 1.17 runtimes that call OnJobEventAlpha1 / OnBulkTopicEventAlpha1. +#[tonic::async_trait] +impl AppCallbackAlpha for AppCallbackService { + async fn on_bulk_topic_event_alpha1( + &self, + request: Request, + ) -> Result, Status> { + self.on_bulk_topic_event(request).await + } + + async fn on_job_event_alpha1( + &self, + request: Request, + ) -> Result, Status> { + self.on_job_event(request).await + } } impl Default for AppCallbackService { @@ -192,12 +246,19 @@ impl AppCallbackService { /// The actor HTTP server ([`crate::server::DaprHttpServer`]) installs /// the layer automatically. pub fn new() -> AppCallbackService { - AppCallbackService { handlers: vec![] } + AppCallbackService { + handlers: vec![], + job_handlers: HashMap::new(), + } } pub fn add_handler(&mut self, handler: Handler) { self.handlers.push(handler) } + + pub fn add_job_handler(&mut self, job_name: String, handler: Box) { + self.job_handlers.insert(job_name, handler); + } } #[tonic::async_trait] @@ -207,3 +268,39 @@ pub trait HandlerMethod: Send + Sync + 'static { request: runtime::v1::TopicEventRequest, ) -> Result, Status>; } + +#[tonic::async_trait] +pub trait JobHandlerMethod: Send + Sync + 'static { + async fn handler( + &self, + request: runtime::v1::JobEventRequest, + ) -> Result; +} + +#[macro_export] +macro_rules! add_job_handler { + ($app_callback_service:expr, $handler_name:ident, $handler_fn:expr) => { + pub struct $handler_name {} + + #[$crate::reexport::async_trait] + impl $crate::appcallback::JobHandlerMethod for $handler_name { + async fn handler( + &self, + request: $crate::appcallback::JobEventRequest, + ) -> ::std::result::Result<$crate::appcallback::JobEventResponse, ::tonic::Status> + { + $handler_fn(request).await + } + } + + impl $handler_name { + pub fn new() -> Self { + $handler_name {} + } + } + + let handler_name = $handler_name.to_string(); + + $app_callback_service.add_job_handler(handler_name, Box::new($handler_name::new())); + }; +} diff --git a/dapr/src/client/mod.rs b/dapr/src/client/mod.rs index ad61ae7..13ede08 100644 --- a/dapr/src/client/mod.rs +++ b/dapr/src/client/mod.rs @@ -20,6 +20,16 @@ use tonic::{Status, Streaming}; pub mod config; pub mod interceptor; +/// Returns `true` when a [`tonic::Status`] indicates the called gRPC method +/// does not exist on the server. Dapr ≤ 1.17 routes unknown methods through +/// its service‑invocation proxy, which fails with code `Unknown` and a +/// characteristic message instead of the standard `Unimplemented` code. +fn is_method_not_found(status: &tonic::Status) -> bool { + matches!(status.code(), tonic::Code::Unimplemented) + || (matches!(status.code(), tonic::Code::Unknown) + && status.message().contains("failed to proxy request")) +} + pub use config::{ API_TOKEN_METADATA_KEY, APP_API_TOKEN_ENV, ClientOptions, DAPR_API_TOKEN_ENV, DAPR_CLIENT_TIMEOUT_SECONDS_ENV, DAPR_GRPC_ENDPOINT_ENV, DAPR_GRPC_PORT_ENV, @@ -71,13 +81,7 @@ impl Client { note = "Will be removed in 0.20.0. Use Client::new() or Client::from_options()." )] pub async fn connect_with_port(addr: String, port: String) -> Result { - // assert that port is between 1 and 65535 - let port: u16 = match port.parse::() { - Ok(p) => p, - Err(_) => { - panic!("Port must be a number between 1 and 65535"); - } - }; + let port: u16 = port.parse()?; let address = format!("{addr}:{port}"); @@ -617,6 +621,26 @@ impl Client { /// /// * job - The job to schedule /// * overwrite - Optional flag to overwrite an existing job with the same name + pub async fn schedule_job( + &mut self, + job: Job, + overwrite: Option, + ) -> Result { + let request = ScheduleJobRequest { + job: Some(job.clone()), + overwrite: overwrite.unwrap_or(false), + }; + self.0.schedule_job(request).await + } + + /// Schedules a job with the Dapr Distributed Scheduler + /// + /// # Arguments + /// + /// * job - The job to schedule + /// * overwrite - Optional flag to overwrite an existing job with the same name + #[allow(deprecated)] + #[deprecated(note = "Use schedule_job instead")] pub async fn schedule_job_alpha1( &mut self, job: Job, @@ -634,6 +658,20 @@ impl Client { /// # Arguments /// /// * name - The name of the job to retrieve + pub async fn get_job(&mut self, name: &str) -> Result { + let request = GetJobRequest { + name: name.to_string(), + }; + self.0.get_job(request).await + } + + /// Retrieves a scheduled job from the Dapr Distributed Scheduler + /// + /// # Arguments + /// + /// * name - The name of the job to retrieve + #[allow(deprecated)] + #[deprecated(note = "Use get_job instead")] pub async fn get_job_alpha1(&mut self, name: &str) -> Result { let request = GetJobRequest { name: name.to_string(), @@ -646,6 +684,20 @@ impl Client { /// # Arguments /// /// * name - The name of the job to delete + pub async fn delete_job(&mut self, name: &str) -> Result { + let request = DeleteJobRequest { + name: name.to_string(), + }; + self.0.delete_job(request).await + } + + /// Deletes a scheduled job from the Dapr Distributed Scheduler + /// + /// # Arguments + /// + /// * name - The name of the job to delete + #[allow(deprecated)] + #[deprecated(note = "Use delete_job instead")] pub async fn delete_job_alpha1(&mut self, name: &str) -> Result { let request = DeleteJobRequest { name: name.to_string(), @@ -653,6 +705,27 @@ impl Client { self.0.delete_job_alpha1(request).await } + /// Deletes all jobs whose name starts with the given prefix. + /// Pass `None` to delete all jobs for the app. + /// + /// # Arguments + /// + /// * prefix - The name prefix to match jobs against, or `None` to delete all + pub async fn delete_jobs_by_prefix( + &mut self, + prefix: Option<&str>, + ) -> Result { + let request = DeleteJobsByPrefixRequest { + name_prefix: prefix.map(|p| p.to_string()), + }; + self.0.delete_jobs_by_prefix(request).await + } + + /// Lists all scheduled jobs + pub async fn list_jobs(&mut self) -> Result { + self.0.list_jobs(ListJobsRequest {}).await + } + /// Converse with an LLM /// /// # Arguments @@ -679,7 +752,7 @@ impl Client { } #[async_trait] -pub trait DaprInterface: Sized { +pub trait DaprInterface: Sized + Send { async fn connect(addr: String) -> Result; async fn publish_event(&mut self, request: PublishEventRequest) -> Result<(), Error>; async fn invoke_service( @@ -727,18 +800,39 @@ pub trait DaprInterface: Sized { async fn decrypt(&mut self, payload: Vec) -> Result, Status>; + #[allow(deprecated)] + async fn schedule_job( + &mut self, + request: ScheduleJobRequest, + ) -> Result; + #[allow(deprecated)] + async fn get_job(&mut self, request: GetJobRequest) -> Result; + + #[allow(deprecated)] + async fn delete_job(&mut self, request: DeleteJobRequest) -> Result; + + #[deprecated(note = "Use schedule_job instead")] async fn schedule_job_alpha1( &mut self, request: ScheduleJobRequest, ) -> Result; + #[deprecated(note = "Use get_job instead")] async fn get_job_alpha1(&mut self, request: GetJobRequest) -> Result; + #[deprecated(note = "Use delete_job instead")] async fn delete_job_alpha1( &mut self, request: DeleteJobRequest, ) -> Result; + async fn delete_jobs_by_prefix( + &mut self, + _request: DeleteJobsByPrefixRequest, + ) -> Result; + + async fn list_jobs(&mut self, _request: ListJobsRequest) -> Result; + async fn converse_alpha1( &mut self, request: ConversationRequest, @@ -943,6 +1037,58 @@ macro_rules! impl_dapr_interface_for { Ok(data) } + async fn schedule_job( + &mut self, + request: ScheduleJobRequest, + ) -> Result { + let fallback = request.clone(); + match self.schedule_job(request).await { + Ok(resp) => Ok(resp.into_inner()), + Err(status) if is_method_not_found(&status) => + { + #[allow(deprecated)] + Ok(self.schedule_job_alpha1(fallback).await?.into_inner()) + } + Err(status) => Err(status.into()), + } + } + + async fn get_job(&mut self, request: GetJobRequest) -> Result { + let fallback = request.clone(); + match self.get_job(Request::new(request)).await { + Ok(resp) => Ok(resp.into_inner()), + Err(status) if is_method_not_found(&status) => + { + #[allow(deprecated)] + Ok(self + .get_job_alpha1(Request::new(fallback)) + .await? + .into_inner()) + } + Err(status) => Err(status.into()), + } + } + + async fn delete_job( + &mut self, + request: DeleteJobRequest, + ) -> Result { + let fallback = request.clone(); + match self.delete_job(Request::new(request)).await { + Ok(resp) => Ok(resp.into_inner()), + Err(status) if is_method_not_found(&status) => + { + #[allow(deprecated)] + Ok(self + .delete_job_alpha1(Request::new(fallback)) + .await? + .into_inner()) + } + Err(status) => Err(status.into()), + } + } + + #[allow(deprecated)] async fn schedule_job_alpha1( &mut self, request: ScheduleJobRequest, @@ -950,6 +1096,7 @@ macro_rules! impl_dapr_interface_for { Ok(self.schedule_job_alpha1(request).await?.into_inner()) } + #[allow(deprecated)] async fn get_job_alpha1( &mut self, request: GetJobRequest, @@ -960,6 +1107,7 @@ macro_rules! impl_dapr_interface_for { .into_inner()) } + #[allow(deprecated)] async fn delete_job_alpha1( &mut self, request: DeleteJobRequest, @@ -970,6 +1118,23 @@ macro_rules! impl_dapr_interface_for { .into_inner()) } + async fn delete_jobs_by_prefix( + &mut self, + request: DeleteJobsByPrefixRequest, + ) -> Result { + Ok(self + .delete_jobs_by_prefix(Request::new(request)) + .await? + .into_inner()) + } + + async fn list_jobs( + &mut self, + request: ListJobsRequest, + ) -> Result { + Ok(self.list_jobs(Request::new(request)).await?.into_inner()) + } + async fn converse_alpha1( &mut self, request: ConversationRequest, @@ -1243,6 +1408,18 @@ pub type DeleteJobRequest = crate::dapr::proto::runtime::v1::DeleteJobRequest; /// A response from a delete job request pub type DeleteJobResponse = crate::dapr::proto::runtime::v1::DeleteJobResponse; +/// A request to delete jobs by name prefix +pub type DeleteJobsByPrefixRequest = crate::dapr::proto::runtime::v1::DeleteJobsByPrefixRequest; + +/// A response from a delete-jobs-by-prefix request +pub type DeleteJobsByPrefixResponse = crate::dapr::proto::runtime::v1::DeleteJobsByPrefixResponse; + +/// A request to list all scheduled jobs +pub type ListJobsRequest = crate::dapr::proto::runtime::v1::ListJobsRequest; + +/// A response containing the list of scheduled jobs +pub type ListJobsResponse = crate::dapr::proto::runtime::v1::ListJobsResponse; + /// A request to conversate with an LLM pub type ConversationRequest = crate::dapr::proto::runtime::v1::ConversationRequest; @@ -1602,3 +1779,20 @@ impl From for ConversationMessage { } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + #[allow(deprecated)] + async fn connect_with_port_returns_parse_error_for_invalid_port() { + match Client::::connect_with_port("http://127.0.0.1".into(), "abc".into()) + .await + { + Err(Error::ParseIntError) => {} + Err(err) => panic!("expected ParseIntError, got {err:?}"), + Ok(_) => panic!("invalid port should return an error"), + } + } +} diff --git a/dapr/src/dapr/dapr.proto.runtime.v1.rs b/dapr/src/dapr/dapr.proto.runtime.v1.rs index 4fbd6cc..7407c2c 100644 --- a/dapr/src/dapr/dapr.proto.runtime.v1.rs +++ b/dapr/src/dapr/dapr.proto.runtime.v1.rs @@ -1307,6 +1307,33 @@ pub mod app_callback_client { ); self.inner.unary(req, path, codec).await } + /// Sends job back to the app's endpoint at trigger time. + pub async fn on_job_event( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/dapr.proto.runtime.v1.AppCallback/OnJobEvent", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new("dapr.proto.runtime.v1.AppCallback", "OnJobEvent"), + ); + self.inner.unary(req, path, codec).await + } } } /// Generated server implementations. @@ -1373,6 +1400,14 @@ pub mod app_callback_server { tonic::Response, tonic::Status, >; + /// Sends job back to the app's endpoint at trigger time. + async fn on_job_event( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; } /// AppCallback V1 allows user application to interact with Dapr runtime. /// User application needs to implement AppCallback service if it needs to @@ -1722,6 +1757,51 @@ pub mod app_callback_server { }; Box::pin(fut) } + "/dapr.proto.runtime.v1.AppCallback/OnJobEvent" => { + #[allow(non_camel_case_types)] + struct OnJobEventSvc(pub Arc); + impl< + T: AppCallback, + > tonic::server::UnaryService + for OnJobEventSvc { + type Response = super::JobEventResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::on_job_event(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = OnJobEventSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } _ => { Box::pin(async move { let mut response = http::Response::new( @@ -2195,7 +2275,8 @@ pub mod app_callback_alpha_client { ); self.inner.unary(req, path, codec).await } - /// Sends job back to the app's endpoint at trigger time. + /// Deprecated: Sends job back to the app's endpoint at trigger time. + #[deprecated] pub async fn on_job_event_alpha1( &mut self, request: impl tonic::IntoRequest, @@ -2248,7 +2329,7 @@ pub mod app_callback_alpha_server { tonic::Response, tonic::Status, >; - /// Sends job back to the app's endpoint at trigger time. + /// Deprecated: Sends job back to the app's endpoint at trigger time. async fn on_job_event_alpha1( &self, request: tonic::Request, @@ -3911,6 +3992,17 @@ pub struct DeleteJobsByPrefixRequestAlpha1 { /// Empty #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] pub struct DeleteJobsByPrefixResponseAlpha1 {} +/// DeleteJobsByPrefixRequest is the stable message to delete jobs by name prefix. +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct DeleteJobsByPrefixRequest { + /// name_prefix is the prefix of the job names to delete. If not provided, all + /// jobs associated with this app ID will be deleted. + #[prost(string, optional, tag = "1")] + pub name_prefix: ::core::option::Option<::prost::alloc::string::String>, +} +/// Empty +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct DeleteJobsByPrefixResponse {} /// Empty #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] pub struct ListJobsRequestAlpha1 {} @@ -3921,6 +4013,16 @@ pub struct ListJobsResponseAlpha1 { #[prost(message, repeated, tag = "1")] pub jobs: ::prost::alloc::vec::Vec, } +/// Empty +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct ListJobsRequest {} +/// ListJobsResponse is the stable message response containing the list of jobs. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ListJobsResponse { + /// The list of jobs. + #[prost(message, repeated, tag = "1")] + pub jobs: ::prost::alloc::vec::Vec, +} /// ShutdownRequest is the request for Shutdown. /// /// Empty @@ -5575,7 +5677,8 @@ pub mod dapr_client { .insert(GrpcMethod::new("dapr.proto.runtime.v1.Dapr", "Shutdown")); self.inner.unary(req, path, codec).await } - /// Create and schedule a job + /// Deprecated: Create and schedule a job + #[deprecated] pub async fn schedule_job_alpha1( &mut self, request: impl tonic::IntoRequest, @@ -5602,7 +5705,33 @@ pub mod dapr_client { ); self.inner.unary(req, path, codec).await } - /// Gets a scheduled job + /// Create and schedule a job + pub async fn schedule_job( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/dapr.proto.runtime.v1.Dapr/ScheduleJob", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("dapr.proto.runtime.v1.Dapr", "ScheduleJob")); + self.inner.unary(req, path, codec).await + } + /// Deprecated: Gets a scheduled job + #[deprecated] pub async fn get_job_alpha1( &mut self, request: impl tonic::IntoRequest, @@ -5624,7 +5753,30 @@ pub mod dapr_client { .insert(GrpcMethod::new("dapr.proto.runtime.v1.Dapr", "GetJobAlpha1")); self.inner.unary(req, path, codec).await } - /// Delete a job + /// Gets a scheduled job + pub async fn get_job( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/dapr.proto.runtime.v1.Dapr/GetJob", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("dapr.proto.runtime.v1.Dapr", "GetJob")); + self.inner.unary(req, path, codec).await + } + /// Deprecated: Delete a job + #[deprecated] pub async fn delete_job_alpha1( &mut self, request: impl tonic::IntoRequest, @@ -5651,6 +5803,33 @@ pub mod dapr_client { ); self.inner.unary(req, path, codec).await } + /// Delete a job + pub async fn delete_job( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/dapr.proto.runtime.v1.Dapr/DeleteJob", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("dapr.proto.runtime.v1.Dapr", "DeleteJob")); + self.inner.unary(req, path, codec).await + } + /// Deprecated: Delete jobs by name prefix + #[deprecated] pub async fn delete_jobs_by_prefix_alpha1( &mut self, request: impl tonic::IntoRequest, @@ -5680,6 +5859,35 @@ pub mod dapr_client { ); self.inner.unary(req, path, codec).await } + /// Delete jobs by name prefix + pub async fn delete_jobs_by_prefix( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/dapr.proto.runtime.v1.Dapr/DeleteJobsByPrefix", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new("dapr.proto.runtime.v1.Dapr", "DeleteJobsByPrefix"), + ); + self.inner.unary(req, path, codec).await + } + /// Deprecated: List all jobs + #[deprecated] pub async fn list_jobs_alpha1( &mut self, request: impl tonic::IntoRequest, @@ -5704,6 +5912,31 @@ pub mod dapr_client { .insert(GrpcMethod::new("dapr.proto.runtime.v1.Dapr", "ListJobsAlpha1")); self.inner.unary(req, path, codec).await } + /// List all jobs + pub async fn list_jobs( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/dapr.proto.runtime.v1.Dapr/ListJobs", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("dapr.proto.runtime.v1.Dapr", "ListJobs")); + self.inner.unary(req, path, codec).await + } /// Converse with a LLM service pub async fn converse_alpha1( &mut self, @@ -6210,7 +6443,7 @@ pub mod dapr_server { &self, request: tonic::Request, ) -> std::result::Result, tonic::Status>; - /// Create and schedule a job + /// Deprecated: Create and schedule a job async fn schedule_job_alpha1( &self, request: tonic::Request, @@ -6218,12 +6451,25 @@ pub mod dapr_server { tonic::Response, tonic::Status, >; - /// Gets a scheduled job + /// Create and schedule a job + async fn schedule_job( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + /// Deprecated: Gets a scheduled job async fn get_job_alpha1( &self, request: tonic::Request, ) -> std::result::Result, tonic::Status>; - /// Delete a job + /// Gets a scheduled job + async fn get_job( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + /// Deprecated: Delete a job async fn delete_job_alpha1( &self, request: tonic::Request, @@ -6231,6 +6477,15 @@ pub mod dapr_server { tonic::Response, tonic::Status, >; + /// Delete a job + async fn delete_job( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + /// Deprecated: Delete jobs by name prefix async fn delete_jobs_by_prefix_alpha1( &self, request: tonic::Request, @@ -6238,6 +6493,15 @@ pub mod dapr_server { tonic::Response, tonic::Status, >; + /// Delete jobs by name prefix + async fn delete_jobs_by_prefix( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + /// Deprecated: List all jobs async fn list_jobs_alpha1( &self, request: tonic::Request, @@ -6245,6 +6509,14 @@ pub mod dapr_server { tonic::Response, tonic::Status, >; + /// List all jobs + async fn list_jobs( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; /// Converse with a LLM service async fn converse_alpha1( &self, @@ -9042,6 +9314,49 @@ pub mod dapr_server { }; Box::pin(fut) } + "/dapr.proto.runtime.v1.Dapr/ScheduleJob" => { + #[allow(non_camel_case_types)] + struct ScheduleJobSvc(pub Arc); + impl tonic::server::UnaryService + for ScheduleJobSvc { + type Response = super::ScheduleJobResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::schedule_job(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = ScheduleJobSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } "/dapr.proto.runtime.v1.Dapr/GetJobAlpha1" => { #[allow(non_camel_case_types)] struct GetJobAlpha1Svc(pub Arc); @@ -9085,6 +9400,49 @@ pub mod dapr_server { }; Box::pin(fut) } + "/dapr.proto.runtime.v1.Dapr/GetJob" => { + #[allow(non_camel_case_types)] + struct GetJobSvc(pub Arc); + impl tonic::server::UnaryService + for GetJobSvc { + type Response = super::GetJobResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_job(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = GetJobSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } "/dapr.proto.runtime.v1.Dapr/DeleteJobAlpha1" => { #[allow(non_camel_case_types)] struct DeleteJobAlpha1Svc(pub Arc); @@ -9128,6 +9486,49 @@ pub mod dapr_server { }; Box::pin(fut) } + "/dapr.proto.runtime.v1.Dapr/DeleteJob" => { + #[allow(non_camel_case_types)] + struct DeleteJobSvc(pub Arc); + impl tonic::server::UnaryService + for DeleteJobSvc { + type Response = super::DeleteJobResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::delete_job(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = DeleteJobSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } "/dapr.proto.runtime.v1.Dapr/DeleteJobsByPrefixAlpha1" => { #[allow(non_camel_case_types)] struct DeleteJobsByPrefixAlpha1Svc(pub Arc); @@ -9176,6 +9577,51 @@ pub mod dapr_server { }; Box::pin(fut) } + "/dapr.proto.runtime.v1.Dapr/DeleteJobsByPrefix" => { + #[allow(non_camel_case_types)] + struct DeleteJobsByPrefixSvc(pub Arc); + impl< + T: Dapr, + > tonic::server::UnaryService + for DeleteJobsByPrefixSvc { + type Response = super::DeleteJobsByPrefixResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::delete_jobs_by_prefix(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = DeleteJobsByPrefixSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } "/dapr.proto.runtime.v1.Dapr/ListJobsAlpha1" => { #[allow(non_camel_case_types)] struct ListJobsAlpha1Svc(pub Arc); @@ -9221,6 +9667,49 @@ pub mod dapr_server { }; Box::pin(fut) } + "/dapr.proto.runtime.v1.Dapr/ListJobs" => { + #[allow(non_camel_case_types)] + struct ListJobsSvc(pub Arc); + impl tonic::server::UnaryService + for ListJobsSvc { + type Response = super::ListJobsResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::list_jobs(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = ListJobsSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } "/dapr.proto.runtime.v1.Dapr/ConverseAlpha1" => { #[allow(non_camel_case_types)] struct ConverseAlpha1Svc(pub Arc); diff --git a/dapr/src/dapr/types.bin b/dapr/src/dapr/types.bin index f6bca4a..9aeb76a 100644 Binary files a/dapr/src/dapr/types.bin and b/dapr/src/dapr/types.bin differ diff --git a/dapr/src/lib.rs b/dapr/src/lib.rs index 79a68f8..706ba5d 100644 --- a/dapr/src/lib.rs +++ b/dapr/src/lib.rs @@ -3,6 +3,12 @@ pub use serde; pub use serde_json; +/// Hidden re-exports used by `#[macro_export]` macros. Not public API. +#[doc(hidden)] +pub mod reexport { + pub use async_trait::async_trait; +} + pub use client::Client; /// Module containing the Dapr Callback SDK. diff --git a/dapr/src/server/appcallbackalpha.rs b/dapr/src/server/appcallbackalpha.rs index 3a604f0..3967bdb 100644 --- a/dapr/src/server/appcallbackalpha.rs +++ b/dapr/src/server/appcallbackalpha.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use tonic::{Code, Request, Response, Status}; +use tonic::{Request, Response, Status}; use crate::dapr::proto::runtime; use crate::dapr::proto::runtime::v1::app_callback_alpha_server::AppCallbackAlpha; @@ -41,17 +41,25 @@ impl AppCallbackAlpha for AppCallbackServiceAlpha { request: Request, ) -> Result, Status> { let request_inner = request.into_inner(); - let job_name = request_inner - .method - .strip_prefix("job/") - .unwrap() - .to_string(); + let job_name = if !request_inner.name.is_empty() { + request_inner.name.clone() + } else if let Some(stripped) = request_inner.method.strip_prefix("job/") { + stripped.to_string() + } else { + return Err(Status::invalid_argument(format!( + "cannot determine job name from request (method={:?})", + request_inner.method, + ))); + }; if let Some(handler) = self.job_handlers.get(&job_name) { let handle_response = handler.handler(request_inner).await; handle_response.map(Response::new) } else { - Err(Status::new(Code::Internal, "Job Handler Not Found")) + Err(Status::not_found(format!( + "no handler registered for job {:?}", + job_name, + ))) } } } @@ -61,9 +69,13 @@ macro_rules! add_job_handler_alpha { ($app_callback_service:expr, $handler_name:ident, $handler_fn:expr) => { pub struct $handler_name {} - #[async_trait::async_trait] - impl JobHandlerMethod for $handler_name { - async fn handler(&self, request: JobEventRequest) -> Result { + #[$crate::reexport::async_trait] + impl $crate::appcallback::JobHandlerMethod for $handler_name { + async fn handler( + &self, + request: $crate::appcallback::JobEventRequest, + ) -> ::std::result::Result<$crate::appcallback::JobEventResponse, ::tonic::Status> + { $handler_fn(request).await } } @@ -80,10 +92,5 @@ macro_rules! add_job_handler_alpha { }; } -#[tonic::async_trait] -pub trait JobHandlerMethod: Send + Sync + 'static { - async fn handler( - &self, - request: runtime::v1::JobEventRequest, - ) -> Result; -} +// Re-export for backward compatibility +pub use crate::appcallback::JobHandlerMethod; diff --git a/dapr/src/server/http.rs b/dapr/src/server/http.rs index bf081f0..3417a4b 100644 --- a/dapr/src/server/http.rs +++ b/dapr/src/server/http.rs @@ -6,7 +6,7 @@ use axum::{ routing::{delete, get, put}, }; use futures::{Future, FutureExt}; -use std::{pin::Pin, sync::Arc}; +use std::{pin::Pin, sync::Arc, time::Duration}; use tokio::net::TcpListener; use super::super::client::{AppApiTokenLayer, TonicClient}; @@ -119,12 +119,15 @@ impl DaprHttpServer { /// /// In contrast to the other functions that create a DaprHttpServer, this function does /// not panic, but instead returns a Result. + /// + /// The connection uses exponential-backoff retries in case the sidecar + /// is not yet ready. pub async fn try_new_with_dapr_port( dapr_port: u16, ) -> Result> { let dapr_addr = format!("http://127.0.0.1:{dapr_port}"); - let cc = TonicClient::connect(dapr_addr).await?; + let cc = Self::connect_with_retry(&dapr_addr).await?; let rt = ActorRuntime::new(cc); Ok(DaprHttpServer { @@ -135,6 +138,35 @@ impl DaprHttpServer { }) } + /// Connect to the Dapr sidecar with exponential-backoff retries. + async fn connect_with_retry( + dapr_addr: &str, + ) -> Result> { + const MAX_RETRIES: u32 = 10; + let mut retry_delay = Duration::from_millis(500); + let max_delay = Duration::from_secs(2); + + let mut last_err = None; + for attempt in 1..=MAX_RETRIES { + match TonicClient::connect(dapr_addr.to_string()).await { + Ok(client) => return Ok(client), + Err(e) => { + if attempt < MAX_RETRIES { + log::warn!( + "Dapr sidecar not ready (attempt {attempt}/{MAX_RETRIES}), \ + retrying in {retry_delay:?}…" + ); + tokio::time::sleep(retry_delay).await; + retry_delay = std::cmp::min(retry_delay * 2, max_delay); + } + last_err = Some(e); + } + } + } + + Err(last_err.unwrap().into()) + } + /// Override the [`AppApiTokenLayer`] used to authenticate inbound /// requests from the Dapr sidecar. /// diff --git a/dapr/src/workflow/client.rs b/dapr/src/workflow/client.rs index 65fe907..e03546e 100644 --- a/dapr/src/workflow/client.rs +++ b/dapr/src/workflow/client.rs @@ -1,16 +1,18 @@ use std::time::Duration; -use dapr_durabletask::api::{DurableTaskError, OrchestrationState, Result}; +use dapr_durabletask::api::{DurableTaskError, OrchestrationState, PurgeInstanceFilter, Result}; use dapr_durabletask::client::TaskHubGrpcClient; use dapr_durabletask::worker::{Registry, TaskHubGrpcWorker}; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; +use tonic::transport::Channel; use super::options::{EventOptions, FetchOptions, ScheduleOptions}; /// Client for scheduling and managing Dapr workflow instances. pub struct WorkflowClient { inner: TaskHubGrpcClient, + channel: Channel, worker: Option, } @@ -30,9 +32,18 @@ impl WorkflowClient { /// * `address` - Address of the Dapr sidecar gRPC endpoint to connect to. pub async fn new_with_address(address: impl Into) -> Result { let address = ensure_url_scheme(address.into()); - let inner = TaskHubGrpcClient::new(&address).await?; + let channel = Channel::from_shared(address.clone()) + .map_err(|e| DurableTaskError::InvalidAddress(e.to_string()))? + .connect() + .await + .map_err(|e| DurableTaskError::ConnectionFailed(e.to_string()))?; + let inner = TaskHubGrpcClient::from_channel(channel.clone()); let worker = Some(TaskHubGrpcWorker::new(&address)); - Ok(Self { inner, worker }) + Ok(Self { + inner, + channel, + worker, + }) } /// Get the worker registry for registering workflows and activities before starting the worker. @@ -57,6 +68,20 @@ impl WorkflowClient { }) } + /// Create a lightweight scheduling client that shares the underlying gRPC + /// connection. Useful for high-throughput scenarios where many tasks need + /// to schedule and manage workflows concurrently without opening new + /// connections. + /// + /// The returned client is [`Clone`] — each clone reuses the same HTTP/2 + /// connection pool, so creating many clones is cheap. + pub fn scheduling_client(&self) -> WorkflowSchedulingClient { + WorkflowSchedulingClient { + inner: TaskHubGrpcClient::from_channel(self.channel.clone()), + channel: self.channel.clone(), + } + } + /// Schedule a new workflow instance and return its instance ID. /// /// # Arguments @@ -68,12 +93,7 @@ impl WorkflowClient { name: &str, options: ScheduleOptions, ) -> Result { - let instance_id = options.instance_id.clone(); - let start_time = options.start_time_utc(); - let input = options.input_json()?; - self.inner - .schedule_new_orchestration(name, input, instance_id, start_time) - .await + schedule_workflow_impl(&mut self.inner, name, options).await } /// Suspend a running workflow instance. @@ -87,10 +107,7 @@ impl WorkflowClient { instance_id: &str, reason: impl Into, ) -> Result<()> { - let reason = reason.into(); - self.inner - .suspend_orchestration(instance_id, Some(reason)) - .await + suspend_workflow_impl(&mut self.inner, instance_id, reason).await } /// Resume a suspended workflow instance. @@ -104,10 +121,7 @@ impl WorkflowClient { instance_id: &str, reason: impl Into, ) -> Result<()> { - let reason = reason.into(); - self.inner - .resume_orchestration(instance_id, Some(reason)) - .await + resume_workflow_impl(&mut self.inner, instance_id, reason).await } /// Raise an event to a workflow instance. @@ -123,9 +137,7 @@ impl WorkflowClient { event_name: &str, options: EventOptions, ) -> Result<()> { - self.inner - .raise_orchestration_event(instance_id, event_name, options.payload_json()?) - .await + raise_event_impl(&mut self.inner, instance_id, event_name, options).await } /// Fetch workflow metadata, optionally including inputs and outputs. @@ -139,12 +151,7 @@ impl WorkflowClient { instance_id: &str, options: FetchOptions, ) -> Result { - self.inner - .get_orchestration_state(instance_id, options.fetch_payloads) - .await? - .ok_or_else(|| DurableTaskError::InstanceNotFound { - instance_id: instance_id.to_string(), - }) + fetch_workflow_metadata_impl(&mut self.inner, instance_id, options).await } /// Wait for a workflow to start. @@ -156,8 +163,13 @@ impl WorkflowClient { &mut self, instance_id: &str, ) -> Result { - self.wait_for_workflow_start_with_options(instance_id, FetchOptions::new(), None) - .await + wait_for_workflow_start_with_options_impl( + &mut self.inner, + instance_id, + FetchOptions::new(), + None, + ) + .await } /// Wait for a workflow to start with fetch and timeout options. @@ -173,12 +185,8 @@ impl WorkflowClient { options: FetchOptions, timeout: Option, ) -> Result { - self.inner - .wait_for_orchestration_start(instance_id, options.fetch_payloads, timeout) - .await? - .ok_or_else(|| DurableTaskError::InstanceNotFound { - instance_id: instance_id.to_string(), - }) + wait_for_workflow_start_with_options_impl(&mut self.inner, instance_id, options, timeout) + .await } /// Wait for a workflow to complete. @@ -190,8 +198,13 @@ impl WorkflowClient { &mut self, instance_id: &str, ) -> Result { - self.wait_for_workflow_completion_with_options(instance_id, FetchOptions::new(), None) - .await + wait_for_workflow_completion_with_options_impl( + &mut self.inner, + instance_id, + FetchOptions::new(), + None, + ) + .await } /// Wait for a workflow to complete with fetch and timeout options. @@ -207,12 +220,13 @@ impl WorkflowClient { options: FetchOptions, timeout: Option, ) -> Result { - self.inner - .wait_for_orchestration_completion(instance_id, options.fetch_payloads, timeout) - .await? - .ok_or_else(|| DurableTaskError::InstanceNotFound { - instance_id: instance_id.to_string(), - }) + wait_for_workflow_completion_with_options_impl( + &mut self.inner, + instance_id, + options, + timeout, + ) + .await } /// Purge workflow state and history for an instance. @@ -221,8 +235,7 @@ impl WorkflowClient { /// /// * `instance_id` - ID of the workflow instance to purge. pub async fn purge_workflow_state(&mut self, instance_id: &str) -> Result<()> { - self.purge_workflow_state_recursive(instance_id, false) - .await + purge_workflow_state_recursive_impl(&mut self.inner, instance_id, false).await } /// Purge workflow state and history for an instance and optionally child workflows. @@ -236,10 +249,23 @@ impl WorkflowClient { instance_id: &str, recursive: bool, ) -> Result<()> { - self.inner - .purge_orchestration(instance_id, recursive) - .await?; - Ok(()) + purge_workflow_state_recursive_impl(&mut self.inner, instance_id, recursive).await + } + + /// Purge workflow instances matching the given filter criteria. + /// + /// Returns the number of deleted instances. + /// + /// # Arguments + /// + /// * `filter` - Filter criteria for selecting instances to purge. + /// * `recursive` - When `true`, also purge child workflow instances. + pub async fn purge_workflow_state_by_filter( + &mut self, + filter: PurgeInstanceFilter, + recursive: bool, + ) -> Result { + purge_workflow_state_by_filter_impl(&mut self.inner, filter, recursive).await } /// Terminate a workflow instance. @@ -248,7 +274,7 @@ impl WorkflowClient { /// /// * `instance_id` - ID of the workflow instance to terminate. pub async fn terminate_workflow(&mut self, instance_id: &str) -> Result<()> { - self.terminate_workflow_recursive(instance_id, false).await + terminate_workflow_recursive_impl(&mut self.inner, instance_id, false).await } /// Terminate a workflow instance and optionally child workflows. @@ -262,10 +288,170 @@ impl WorkflowClient { instance_id: &str, recursive: bool, ) -> Result<()> { - self.inner - .terminate_orchestration(instance_id, None, recursive) + terminate_workflow_recursive_impl(&mut self.inner, instance_id, recursive).await + } +} + +/// Lightweight, cloneable client for scheduling and managing workflow instances. +/// +/// Created via [`WorkflowClient::scheduling_client`]. Every clone shares the +/// same underlying HTTP/2 connection pool, so creating many clones is cheap. +/// Use this when you need to schedule or wait on workflows from multiple +/// concurrent tasks without opening a new gRPC connection each time. +pub struct WorkflowSchedulingClient { + inner: TaskHubGrpcClient, + channel: Channel, +} + +impl Clone for WorkflowSchedulingClient { + fn clone(&self) -> Self { + Self { + inner: TaskHubGrpcClient::from_channel(self.channel.clone()), + channel: self.channel.clone(), + } + } +} + +impl WorkflowSchedulingClient { + /// Schedule a new workflow instance and return its instance ID. + pub async fn schedule_workflow( + &mut self, + name: &str, + options: ScheduleOptions, + ) -> Result { + schedule_workflow_impl(&mut self.inner, name, options).await + } + + /// Suspend a running workflow instance. + pub async fn suspend_workflow( + &mut self, + instance_id: &str, + reason: impl Into, + ) -> Result<()> { + suspend_workflow_impl(&mut self.inner, instance_id, reason).await + } + + /// Resume a suspended workflow instance. + pub async fn resume_workflow( + &mut self, + instance_id: &str, + reason: impl Into, + ) -> Result<()> { + resume_workflow_impl(&mut self.inner, instance_id, reason).await + } + + /// Raise an event to a workflow instance. + pub async fn raise_event( + &mut self, + instance_id: &str, + event_name: &str, + options: EventOptions, + ) -> Result<()> { + raise_event_impl(&mut self.inner, instance_id, event_name, options).await + } + + /// Fetch workflow metadata, optionally including inputs and outputs. + pub async fn fetch_workflow_metadata( + &mut self, + instance_id: &str, + options: FetchOptions, + ) -> Result { + fetch_workflow_metadata_impl(&mut self.inner, instance_id, options).await + } + + /// Wait for a workflow to start. + pub async fn wait_for_workflow_start( + &mut self, + instance_id: &str, + ) -> Result { + wait_for_workflow_start_with_options_impl( + &mut self.inner, + instance_id, + FetchOptions::new(), + None, + ) + .await + } + + /// Wait for a workflow to start with fetch and timeout options. + pub async fn wait_for_workflow_start_with_options( + &mut self, + instance_id: &str, + options: FetchOptions, + timeout: Option, + ) -> Result { + wait_for_workflow_start_with_options_impl(&mut self.inner, instance_id, options, timeout) .await } + + /// Wait for a workflow to complete. + pub async fn wait_for_workflow_completion( + &mut self, + instance_id: &str, + ) -> Result { + wait_for_workflow_completion_with_options_impl( + &mut self.inner, + instance_id, + FetchOptions::new(), + None, + ) + .await + } + + /// Wait for a workflow to complete with fetch and timeout options. + pub async fn wait_for_workflow_completion_with_options( + &mut self, + instance_id: &str, + options: FetchOptions, + timeout: Option, + ) -> Result { + wait_for_workflow_completion_with_options_impl( + &mut self.inner, + instance_id, + options, + timeout, + ) + .await + } + + /// Purge workflow state and history for an instance. + pub async fn purge_workflow_state(&mut self, instance_id: &str) -> Result<()> { + purge_workflow_state_recursive_impl(&mut self.inner, instance_id, false).await + } + + /// Purge workflow state and history for an instance and optionally child workflows. + pub async fn purge_workflow_state_recursive( + &mut self, + instance_id: &str, + recursive: bool, + ) -> Result<()> { + purge_workflow_state_recursive_impl(&mut self.inner, instance_id, recursive).await + } + + /// Purge workflow instances matching the given filter criteria. + /// + /// Returns the number of deleted instances. + pub async fn purge_workflow_state_by_filter( + &mut self, + filter: PurgeInstanceFilter, + recursive: bool, + ) -> Result { + purge_workflow_state_by_filter_impl(&mut self.inner, filter, recursive).await + } + + /// Terminate a workflow instance. + pub async fn terminate_workflow(&mut self, instance_id: &str) -> Result<()> { + terminate_workflow_recursive_impl(&mut self.inner, instance_id, false).await + } + + /// Terminate a workflow instance and optionally child workflows. + pub async fn terminate_workflow_recursive( + &mut self, + instance_id: &str, + recursive: bool, + ) -> Result<()> { + terminate_workflow_recursive_impl(&mut self.inner, instance_id, recursive).await + } } /// Handle for a running workflow worker. @@ -305,10 +491,6 @@ fn default_sidecar_address() -> String { /// Prepend `http://` to `address` when no URL scheme is present so that the /// underlying tonic channel can parse it. -/// -/// `dapr-durabletask 0.0.1` exposes no token interceptor or per-request metadata -/// hook, so `DAPR_API_TOKEN` cannot be forwarded from this layer yet. Once the -/// upstream client gains an interceptor surface, wire token injection here. fn ensure_url_scheme(address: String) -> String { if address.contains("://") { address @@ -317,6 +499,122 @@ fn ensure_url_scheme(address: String) -> String { } } +// Shared scheduling and management helpers used by both workflow clients. + +async fn schedule_workflow_impl( + client: &mut TaskHubGrpcClient, + name: &str, + options: ScheduleOptions, +) -> Result { + let instance_id = options.instance_id.clone(); + let start_time = options.start_time_utc(); + let input = options.input_json()?; + client + .schedule_new_orchestration(name, input, instance_id, start_time) + .await +} + +async fn suspend_workflow_impl( + client: &mut TaskHubGrpcClient, + instance_id: &str, + reason: impl Into, +) -> Result<()> { + let reason = reason.into(); + client + .suspend_orchestration(instance_id, Some(reason)) + .await +} + +async fn resume_workflow_impl( + client: &mut TaskHubGrpcClient, + instance_id: &str, + reason: impl Into, +) -> Result<()> { + let reason = reason.into(); + client.resume_orchestration(instance_id, Some(reason)).await +} + +async fn raise_event_impl( + client: &mut TaskHubGrpcClient, + instance_id: &str, + event_name: &str, + options: EventOptions, +) -> Result<()> { + client + .raise_orchestration_event(instance_id, event_name, options.payload_json()?) + .await +} + +async fn fetch_workflow_metadata_impl( + client: &mut TaskHubGrpcClient, + instance_id: &str, + options: FetchOptions, +) -> Result { + client + .get_orchestration_state(instance_id, options.fetch_payloads) + .await? + .ok_or_else(|| DurableTaskError::InstanceNotFound { + instance_id: instance_id.to_string(), + }) +} + +async fn wait_for_workflow_start_with_options_impl( + client: &mut TaskHubGrpcClient, + instance_id: &str, + options: FetchOptions, + timeout: Option, +) -> Result { + client + .wait_for_orchestration_start(instance_id, options.fetch_payloads, timeout) + .await? + .ok_or_else(|| DurableTaskError::InstanceNotFound { + instance_id: instance_id.to_string(), + }) +} + +async fn wait_for_workflow_completion_with_options_impl( + client: &mut TaskHubGrpcClient, + instance_id: &str, + options: FetchOptions, + timeout: Option, +) -> Result { + client + .wait_for_orchestration_completion(instance_id, options.fetch_payloads, timeout) + .await? + .ok_or_else(|| DurableTaskError::InstanceNotFound { + instance_id: instance_id.to_string(), + }) +} + +async fn purge_workflow_state_recursive_impl( + client: &mut TaskHubGrpcClient, + instance_id: &str, + recursive: bool, +) -> Result<()> { + client.purge_orchestration(instance_id, recursive).await?; + Ok(()) +} + +async fn purge_workflow_state_by_filter_impl( + client: &mut TaskHubGrpcClient, + filter: PurgeInstanceFilter, + recursive: bool, +) -> Result { + client + .purge_orchestrations_by_filter(filter, recursive) + .await +} + +async fn terminate_workflow_recursive_impl( + client: &mut TaskHubGrpcClient, + instance_id: &str, + recursive: bool, +) -> Result<()> { + client + .terminate_orchestration(instance_id, None, recursive) + .await +} + #[cfg(test)] mod tests { use super::ensure_url_scheme; diff --git a/dapr/src/workflow/context.rs b/dapr/src/workflow/context.rs index 9334e9c..aa94963 100644 --- a/dapr/src/workflow/context.rs +++ b/dapr/src/workflow/context.rs @@ -75,7 +75,9 @@ pub trait WorkflowContextExt { T: DeserializeOwned + Send + 'static; /// Return the propagated history attached to the current workflow invocation, if any. - fn propagated_history(&self) -> Option; + fn propagated_history( + &self, + ) -> Option>; /// Wait for an external event with an optional timeout and deserialize the payload. /// @@ -100,7 +102,7 @@ pub trait WorkflowContextExt { impl WorkflowContextExt for WorkflowContext { fn get_input_typed(&self) -> dapr_durabletask::api::Result { - self.get_input() + self.input() } fn call_activity_typed( @@ -159,7 +161,9 @@ impl WorkflowContextExt for WorkflowContext { async move { deserialize_task_output(task.await?) } } - fn propagated_history(&self) -> Option { + fn propagated_history( + &self, + ) -> Option> { dapr_durabletask::task::OrchestrationContext::propagated_history(self) } diff --git a/dapr/src/workflow/mod.rs b/dapr/src/workflow/mod.rs index eb8ac55..977443d 100644 --- a/dapr/src/workflow/mod.rs +++ b/dapr/src/workflow/mod.rs @@ -62,7 +62,7 @@ pub mod client; pub mod context; pub mod options; -pub use client::{WorkerHandle, WorkflowClient}; +pub use client::{WorkerHandle, WorkflowClient, WorkflowSchedulingClient}; pub use context::{ ActivityContext, ActivityContextExt, RegistryExt, WorkflowContext, WorkflowContextExt, }; @@ -73,7 +73,9 @@ pub use options::{ pub use dapr_durabletask::api::{ DurableTaskError as WorkflowError, FailureDetails, HistoryPropagationScope, OrchestrationState as WorkflowMetadata, OrchestrationStatus as RuntimeStatus, - PropagatedHistory, Result, RetryPolicy, + PropagatedHistory, PurgeInstanceFilter, Result, RetryPolicy, +}; +pub use dapr_durabletask::task::{ + CompletableTask, OrchestrationContext, TaskResult, when_all, when_any, }; -pub use dapr_durabletask::task::{CompletableTask, OrchestrationContext, when_all}; pub use dapr_durabletask::worker::Registry; diff --git a/examples/src/actors/client.rs b/examples/src/actors/client.rs index f6a2890..4e9c563 100644 --- a/examples/src/actors/client.rs +++ b/examples/src/actors/client.rs @@ -20,13 +20,25 @@ async fn main() -> Result<(), Box> { // Create the client let mut client = dapr::Client::new().await?; - let data = MyRequest { - name: "test".to_string(), - }; - - let resp: Result = client - .invoke_actor("MyActor", "a1", "do_stuff", data, None) - .await; + // Retry with exponential backoff until the actor-server is registered. + let mut resp: Result = + Err(dapr::error::Error::SerializationError); + for attempt in 1..=10u32 { + let data = MyRequest { + name: "test".to_string(), + }; + + resp = client + .invoke_actor("MyActor", "a1", "do_stuff", data, None) + .await; + + if resp.is_ok() || attempt == 10 { + break; + } + + eprintln!("Actor not ready (attempt {attempt}/10), retrying in 2s…"); + tokio::time::sleep(std::time::Duration::new(2, 0)).await; + } println!("Response: {resp:#?}"); diff --git a/examples/src/app-api-token/main.rs b/examples/src/app-api-token/main.rs index c0f6e26..9f427c5 100644 --- a/examples/src/app-api-token/main.rs +++ b/examples/src/app-api-token/main.rs @@ -86,6 +86,13 @@ impl AppCallback for LoggingCallback { ) -> Result, Status> { Err(Status::unimplemented("on_bulk_topic_event")) } + + async fn on_job_event( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("on_job_event")) + } } #[tokio::main] diff --git a/examples/src/bindings/input.rs b/examples/src/bindings/input.rs index e2d9da0..08deb51 100644 --- a/examples/src/bindings/input.rs +++ b/examples/src/bindings/input.rs @@ -2,9 +2,9 @@ use tonic::{Request, Response, Status, transport::Server}; use dapr::dapr::proto::common::v1::{InvokeRequest, InvokeResponse}; use dapr::dapr::proto::runtime::v1::{ - BindingEventRequest, BindingEventResponse, ListInputBindingsResponse, - ListTopicSubscriptionsResponse, TopicEventBulkRequest, TopicEventBulkResponse, - TopicEventRequest, TopicEventResponse, + BindingEventRequest, BindingEventResponse, JobEventRequest, JobEventResponse, + ListInputBindingsResponse, ListTopicSubscriptionsResponse, TopicEventBulkRequest, + TopicEventBulkResponse, TopicEventRequest, TopicEventResponse, app_callback_server::{AppCallback, AppCallbackServer}, }; @@ -75,6 +75,13 @@ impl AppCallback for AppCallbackService { ) -> Result, Status> { todo!("on_bulk_topic_event is not implemented yet") } + + async fn on_job_event( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("on_job_event")) + } } #[tokio::main] diff --git a/examples/src/invoke/grpc/server.rs b/examples/src/invoke/grpc/server.rs index 552ae79..9ff8978 100644 --- a/examples/src/invoke/grpc/server.rs +++ b/examples/src/invoke/grpc/server.rs @@ -98,6 +98,13 @@ impl AppCallback for AppCallbackService { // TODO: implement bulk topic event handling logic here. Ok(Response::new(TopicEventBulkResponse::default())) } + + async fn on_job_event( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("on_job_event")) + } } #[tokio::main] diff --git a/examples/src/jobs-failurepolicy/dapr.yaml b/examples/src/jobs-failurepolicy/dapr.yaml index 363e0e9..49ea68f 100644 --- a/examples/src/jobs-failurepolicy/dapr.yaml +++ b/examples/src/jobs-failurepolicy/dapr.yaml @@ -8,4 +8,4 @@ apps: appPort: 50051 logLevel: debug schedulerHostAddress: localhost - command: [ "cargo", "run", "--example", "jobs" ] + command: [ "cargo", "run", "--example", "jobs-failurepolicy" ] diff --git a/examples/src/jobs-failurepolicy/jobs_failurepolicy.rs b/examples/src/jobs-failurepolicy/jobs_failurepolicy.rs index eba46b1..2b444a6 100644 --- a/examples/src/jobs-failurepolicy/jobs_failurepolicy.rs +++ b/examples/src/jobs-failurepolicy/jobs_failurepolicy.rs @@ -1,13 +1,15 @@ use std::time::Duration; +use dapr::appcallback::AppCallbackService; use dapr::client::{JobBuilder, JobFailurePolicyBuilder, JobFailurePolicyType}; use dapr::dapr::proto::runtime::v1::{ JobEventRequest, JobEventResponse, app_callback_alpha_server::AppCallbackAlphaServer, + app_callback_server::AppCallbackServer, }; -use dapr::server::appcallbackalpha::{AppCallbackServiceAlpha, JobHandlerMethod}; -use dapr::{add_job_handler_alpha, serde_json}; +use dapr::{add_job_handler, serde_json}; use prost_types::Any; use serde::{Deserialize, Serialize}; +use std::sync::Arc; use tokio::time::sleep; use tonic::Status; use tonic::transport::Server; @@ -48,26 +50,25 @@ async fn main() -> Result<(), Box> { tokio::spawn(async move { let server_addr = "127.0.0.1:50051".parse().unwrap(); - println!("AppCallbackAlpha server listening on {server_addr}"); + println!("AppCallback server listening on {server_addr}"); - let mut alpha_callback_service = AppCallbackServiceAlpha::new(); + let mut callback_service = AppCallbackService::new(); let backup_job_handler_name = "prod-db-backup"; - add_job_handler_alpha!( - alpha_callback_service, + add_job_handler!( + callback_service, backup_job_handler_name, backup_job_handler ); let ping_pong_handler_name = "ping-pong"; - add_job_handler_alpha!( - alpha_callback_service, - ping_pong_handler_name, - ping_pong_handler - ); + add_job_handler!(callback_service, ping_pong_handler_name, ping_pong_handler); + + let callback_service = Arc::new(callback_service); Server::builder() - .add_service(AppCallbackAlphaServer::new(alpha_callback_service)) + .add_service(AppCallbackServer::from_arc(callback_service.clone())) + .add_service(AppCallbackAlphaServer::from_arc(callback_service)) .serve(server_addr) .await .unwrap(); @@ -105,20 +106,20 @@ async fn main() -> Result<(), Box> { .with_failure_policy(JobFailurePolicyBuilder::new(JobFailurePolicyType::Drop {}).build()) .build(); - let _schedule_resp = client.schedule_job_alpha1(job, None).await?; + let _schedule_resp = client.schedule_job(job, None).await?; println!("job scheduled successfully"); sleep(Duration::from_secs(3)).await; - let get_resp = client.get_job_alpha1("prod-db-backup").await?; + let get_resp = client.get_job("prod-db-backup").await?; let get_resp_backup: Backup = serde_json::from_slice(&get_resp.clone().job.unwrap().data.unwrap().value).unwrap(); println!("job retrieved: {get_resp_backup:?}"); - let _delete_resp = client.delete_job_alpha1("prod-db-backup").await?; + let _delete_resp = client.delete_job("prod-db-backup").await?; println!("job deleted"); @@ -130,7 +131,7 @@ async fn main() -> Result<(), Box> { .with_schedule("@every 1s") .with_repeats(5) .build(); - let _schedule_resp = client.schedule_job_alpha1(ping_pong_job, None).await?; + let _schedule_resp = client.schedule_job(ping_pong_job, None).await?; sleep(Duration::from_secs(10)).await; diff --git a/examples/src/jobs/jobs.rs b/examples/src/jobs/jobs.rs index c7f5433..57101c7 100644 --- a/examples/src/jobs/jobs.rs +++ b/examples/src/jobs/jobs.rs @@ -1,13 +1,15 @@ use std::time::Duration; +use dapr::appcallback::AppCallbackService; use dapr::client::JobBuilder; use dapr::dapr::proto::runtime::v1::{ JobEventRequest, JobEventResponse, app_callback_alpha_server::AppCallbackAlphaServer, + app_callback_server::AppCallbackServer, }; -use dapr::server::appcallbackalpha::{AppCallbackServiceAlpha, JobHandlerMethod}; -use dapr::{add_job_handler_alpha, serde_json}; +use dapr::{add_job_handler, serde_json}; use prost_types::Any; use serde::{Deserialize, Serialize}; +use std::sync::Arc; use tokio::time::sleep; use tonic::Status; use tonic::transport::Server; @@ -48,26 +50,25 @@ async fn main() -> Result<(), Box> { tokio::spawn(async move { let server_addr = "127.0.0.1:50051".parse().unwrap(); - println!("AppCallbackAlpha server listening on {server_addr}"); + println!("AppCallback server listening on {server_addr}"); - let mut alpha_callback_service = AppCallbackServiceAlpha::new(); + let mut callback_service = AppCallbackService::new(); let backup_job_handler_name = "prod-db-backup"; - add_job_handler_alpha!( - alpha_callback_service, + add_job_handler!( + callback_service, backup_job_handler_name, backup_job_handler ); let ping_pong_handler_name = "ping-pong"; - add_job_handler_alpha!( - alpha_callback_service, - ping_pong_handler_name, - ping_pong_handler - ); + add_job_handler!(callback_service, ping_pong_handler_name, ping_pong_handler); + + let callback_service = Arc::new(callback_service); Server::builder() - .add_service(AppCallbackAlphaServer::new(alpha_callback_service)) + .add_service(AppCallbackServer::from_arc(callback_service.clone())) + .add_service(AppCallbackAlphaServer::from_arc(callback_service)) .serve(server_addr) .await .unwrap(); @@ -104,20 +105,20 @@ async fn main() -> Result<(), Box> { .with_data(any) .build(); - let _schedule_resp = client.schedule_job_alpha1(job, None).await?; + let _schedule_resp = client.schedule_job(job, None).await?; println!("job scheduled successfully"); sleep(Duration::from_secs(3)).await; - let get_resp = client.get_job_alpha1("prod-db-backup").await?; + let get_resp = client.get_job("prod-db-backup").await?; let get_resp_backup: Backup = serde_json::from_slice(&get_resp.clone().job.unwrap().data.unwrap().value).unwrap(); println!("job retrieved: {get_resp_backup:?}"); - let _delete_resp = client.delete_job_alpha1("prod-db-backup").await?; + let _delete_resp = client.delete_job("prod-db-backup").await?; println!("job deleted"); @@ -129,7 +130,7 @@ async fn main() -> Result<(), Box> { .with_schedule("@every 1s") .with_repeats(5) .build(); - let _schedule_resp = client.schedule_job_alpha1(ping_pong_job, None).await?; + let _schedule_resp = client.schedule_job(ping_pong_job, None).await?; sleep(Duration::from_secs(10)).await; diff --git a/examples/src/workflow-history-propagation/main.rs b/examples/src/workflow-history-propagation/main.rs index bbdb65e..3483317 100644 --- a/examples/src/workflow-history-propagation/main.rs +++ b/examples/src/workflow-history-propagation/main.rs @@ -101,7 +101,7 @@ async fn process_payment(ctx: WorkflowContext) -> dapr::workflow::Result; + +struct WorkflowRunSummary { + latencies: Vec, + failed: usize, +} + +impl WorkflowRunSummary { + fn succeeded(&self) -> usize { + self.latencies.len() + } +} + async fn sustained_workflow(ctx: WorkflowContext) -> Result> { let input: i32 = ctx.get_input_typed()?; let output: i32 = ctx.call_activity_typed(ACTIVITY_NAME, input).await?; @@ -52,12 +65,13 @@ async fn main() -> Result<()> { println!("Worker initialized"); let worker = worker_client.start_worker().await?; + let scheduling_client = worker_client.scheduling_client(); let started_at = Instant::now(); - let latencies = run_workflows(workflow_count, concurrency).await; + let run_summary = run_workflows(scheduling_client, workflow_count, concurrency).await; let elapsed = started_at.elapsed(); - print_summary(workflow_count, &latencies, elapsed); + print_summary(workflow_count, &run_summary, elapsed); worker.shutdown().await?; @@ -80,7 +94,11 @@ fn read_concurrency(workflow_count: usize) -> usize { .clamp(1, workflow_count) } -async fn run_workflows(workflow_count: usize, concurrency: usize) -> Vec { +async fn run_workflows( + scheduling_client: WorkflowSchedulingClient, + workflow_count: usize, + concurrency: usize, +) -> WorkflowRunSummary { let mut tasks = JoinSet::new(); let mut next_input = 0; let mut completed = 0; @@ -89,7 +107,7 @@ async fn run_workflows(workflow_count: usize, concurrency: usize) -> Vec Vec>, input: i32) { +fn spawn_workflow( + tasks: &mut JoinSet, + mut client: WorkflowSchedulingClient, + input: i32, +) { tasks.spawn(async move { let started_at = Instant::now(); - let mut client = WorkflowClient::new() - .await - .map_err(|error| format!("client initialization failed: {error}"))?; let instance_id = client .schedule_workflow(WORKFLOW_NAME, ScheduleOptions::new().with_input(input)) .await @@ -155,9 +174,10 @@ fn spawn_workflow(tasks: &mut JoinSet>, in }); } -fn print_summary(workflow_count: usize, latencies: &[Duration], elapsed: Duration) { - let succeeded = latencies.len(); - let failed = workflow_count - succeeded; +fn print_summary(workflow_count: usize, summary: &WorkflowRunSummary, elapsed: Duration) { + let latencies = &summary.latencies; + let succeeded = summary.succeeded(); + let failed = summary.failed; let throughput = succeeded as f64 / elapsed.as_secs_f64(); let mut sorted_latencies = latencies.to_vec(); diff --git a/proto/dapr/proto/runtime/v1/appcallback.proto b/proto/dapr/proto/runtime/v1/appcallback.proto index 3c9a7a2..3874642 100644 --- a/proto/dapr/proto/runtime/v1/appcallback.proto +++ b/proto/dapr/proto/runtime/v1/appcallback.proto @@ -49,6 +49,9 @@ service AppCallback { // Subscribes bulk events from Pubsub rpc OnBulkTopicEvent(TopicEventBulkRequest) returns (TopicEventBulkResponse) {} + + // Sends job back to the app's endpoint at trigger time. + rpc OnJobEvent (JobEventRequest) returns (JobEventResponse); } // AppCallbackHealthCheck V1 is an optional extension to AppCallback V1 to implement @@ -66,8 +69,10 @@ service AppCallbackAlpha { option deprecated = true; } - // Sends job back to the app's endpoint at trigger time. - rpc OnJobEventAlpha1 (JobEventRequest) returns (JobEventResponse); + // Deprecated: Sends job back to the app's endpoint at trigger time. + rpc OnJobEventAlpha1 (JobEventRequest) returns (JobEventResponse) { + option deprecated = true; + } } message JobEventRequest { diff --git a/proto/dapr/proto/runtime/v1/dapr.proto b/proto/dapr/proto/runtime/v1/dapr.proto index f8a0602..bf0453e 100644 --- a/proto/dapr/proto/runtime/v1/dapr.proto +++ b/proto/dapr/proto/runtime/v1/dapr.proto @@ -232,18 +232,45 @@ service Dapr { // Shutdown the sidecar rpc Shutdown (ShutdownRequest) returns (google.protobuf.Empty) {} + // Deprecated: Create and schedule a job + rpc ScheduleJobAlpha1(ScheduleJobRequest) returns (ScheduleJobResponse) { + option deprecated = true; + } + // Create and schedule a job - rpc ScheduleJobAlpha1(ScheduleJobRequest) returns (ScheduleJobResponse) {} + rpc ScheduleJob(ScheduleJobRequest) returns (ScheduleJobResponse) {} + + // Deprecated: Gets a scheduled job + rpc GetJobAlpha1(GetJobRequest) returns (GetJobResponse) { + option deprecated = true; + } // Gets a scheduled job - rpc GetJobAlpha1(GetJobRequest) returns (GetJobResponse) {} + rpc GetJob(GetJobRequest) returns (GetJobResponse) {} + + // Deprecated: Delete a job + rpc DeleteJobAlpha1(DeleteJobRequest) returns (DeleteJobResponse) { + option deprecated = true; + } // Delete a job - rpc DeleteJobAlpha1(DeleteJobRequest) returns (DeleteJobResponse) {} + rpc DeleteJob(DeleteJobRequest) returns (DeleteJobResponse) {} + + // Deprecated: Delete jobs by name prefix + rpc DeleteJobsByPrefixAlpha1(DeleteJobsByPrefixRequestAlpha1) returns (DeleteJobsByPrefixResponseAlpha1) { + option deprecated = true; + } - rpc DeleteJobsByPrefixAlpha1(DeleteJobsByPrefixRequestAlpha1) returns (DeleteJobsByPrefixResponseAlpha1) {} + // Delete jobs by name prefix + rpc DeleteJobsByPrefix(DeleteJobsByPrefixRequest) returns (DeleteJobsByPrefixResponse) {} + + // Deprecated: List all jobs + rpc ListJobsAlpha1(ListJobsRequestAlpha1) returns (ListJobsResponseAlpha1) { + option deprecated = true; + } - rpc ListJobsAlpha1(ListJobsRequestAlpha1) returns (ListJobsResponseAlpha1) {} + // List all jobs + rpc ListJobs(ListJobsRequest) returns (ListJobsResponse) {} // Converse with a LLM service rpc ConverseAlpha1(ConversationRequest) returns (ConversationResponse) {} diff --git a/proto/dapr/proto/runtime/v1/jobs.proto b/proto/dapr/proto/runtime/v1/jobs.proto index f181068..34163d0 100644 --- a/proto/dapr/proto/runtime/v1/jobs.proto +++ b/proto/dapr/proto/runtime/v1/jobs.proto @@ -122,6 +122,17 @@ message DeleteJobsByPrefixResponseAlpha1 { // Empty } +// DeleteJobsByPrefixRequest is the stable message to delete jobs by name prefix. +message DeleteJobsByPrefixRequest { + // name_prefix is the prefix of the job names to delete. If not provided, all + // jobs associated with this app ID will be deleted. + optional string name_prefix = 1; +} + +message DeleteJobsByPrefixResponse { + // Empty +} + message ListJobsRequestAlpha1 { // Empty } @@ -131,3 +142,13 @@ message ListJobsResponseAlpha1 { // The list of jobs. repeated Job jobs = 1; } + +message ListJobsRequest { + // Empty +} + +// ListJobsResponse is the stable message response containing the list of jobs. +message ListJobsResponse { + // The list of jobs. + repeated Job jobs = 1; +}