diff --git a/chasm/lib/nexusoperation/frontend.go b/chasm/lib/nexusoperation/frontend.go new file mode 100644 index 0000000000..35e41a558c --- /dev/null +++ b/chasm/lib/nexusoperation/frontend.go @@ -0,0 +1,72 @@ +package nexusoperation + +import ( + "context" + + "go.temporal.io/api/serviceerror" + "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/server/common/log" + "go.temporal.io/server/common/namespace" +) + +// FrontendHandler provides the frontend-facing API for standalone Nexus operations. +type FrontendHandler interface { + StartNexusOperationExecution(context.Context, *workflowservice.StartNexusOperationExecutionRequest) (*workflowservice.StartNexusOperationExecutionResponse, error) + DescribeNexusOperationExecution(context.Context, *workflowservice.DescribeNexusOperationExecutionRequest) (*workflowservice.DescribeNexusOperationExecutionResponse, error) + PollNexusOperationExecution(context.Context, *workflowservice.PollNexusOperationExecutionRequest) (*workflowservice.PollNexusOperationExecutionResponse, error) + ListNexusOperationExecutions(context.Context, *workflowservice.ListNexusOperationExecutionsRequest) (*workflowservice.ListNexusOperationExecutionsResponse, error) + CountNexusOperationExecutions(context.Context, *workflowservice.CountNexusOperationExecutionsRequest) (*workflowservice.CountNexusOperationExecutionsResponse, error) + RequestCancelNexusOperationExecution(context.Context, *workflowservice.RequestCancelNexusOperationExecutionRequest) (*workflowservice.RequestCancelNexusOperationExecutionResponse, error) + TerminateNexusOperationExecution(context.Context, *workflowservice.TerminateNexusOperationExecutionRequest) (*workflowservice.TerminateNexusOperationExecutionResponse, error) + DeleteNexusOperationExecution(context.Context, *workflowservice.DeleteNexusOperationExecutionRequest) (*workflowservice.DeleteNexusOperationExecutionResponse, error) +} + +type frontendHandler struct { + config *Config + logger log.Logger + namespaceRegistry namespace.Registry +} + +func NewFrontendHandler( + config *Config, + logger log.Logger, + namespaceRegistry namespace.Registry, +) FrontendHandler { + return &frontendHandler{ + config: config, + logger: logger, + namespaceRegistry: namespaceRegistry, + } +} + +func (h *frontendHandler) StartNexusOperationExecution(context.Context, *workflowservice.StartNexusOperationExecutionRequest) (*workflowservice.StartNexusOperationExecutionResponse, error) { + return nil, serviceerror.NewUnimplemented("StartNexusOperationExecution not implemented") +} + +func (h *frontendHandler) DescribeNexusOperationExecution(context.Context, *workflowservice.DescribeNexusOperationExecutionRequest) (*workflowservice.DescribeNexusOperationExecutionResponse, error) { + return nil, serviceerror.NewUnimplemented("DescribeNexusOperationExecution not implemented") +} + +func (h *frontendHandler) PollNexusOperationExecution(context.Context, *workflowservice.PollNexusOperationExecutionRequest) (*workflowservice.PollNexusOperationExecutionResponse, error) { + return nil, serviceerror.NewUnimplemented("PollNexusOperationExecution not implemented") +} + +func (h *frontendHandler) ListNexusOperationExecutions(context.Context, *workflowservice.ListNexusOperationExecutionsRequest) (*workflowservice.ListNexusOperationExecutionsResponse, error) { + return nil, serviceerror.NewUnimplemented("ListNexusOperationExecutions not implemented") +} + +func (h *frontendHandler) CountNexusOperationExecutions(context.Context, *workflowservice.CountNexusOperationExecutionsRequest) (*workflowservice.CountNexusOperationExecutionsResponse, error) { + return nil, serviceerror.NewUnimplemented("CountNexusOperationExecutions not implemented") +} + +func (h *frontendHandler) RequestCancelNexusOperationExecution(context.Context, *workflowservice.RequestCancelNexusOperationExecutionRequest) (*workflowservice.RequestCancelNexusOperationExecutionResponse, error) { + return nil, serviceerror.NewUnimplemented("RequestCancelNexusOperationExecution not implemented") +} + +func (h *frontendHandler) TerminateNexusOperationExecution(context.Context, *workflowservice.TerminateNexusOperationExecutionRequest) (*workflowservice.TerminateNexusOperationExecutionResponse, error) { + return nil, serviceerror.NewUnimplemented("TerminateNexusOperationExecution not implemented") +} + +func (h *frontendHandler) DeleteNexusOperationExecution(context.Context, *workflowservice.DeleteNexusOperationExecutionRequest) (*workflowservice.DeleteNexusOperationExecutionResponse, error) { + return nil, serviceerror.NewUnimplemented("DeleteNexusOperationExecution not implemented") +} diff --git a/chasm/lib/nexusoperation/fx.go b/chasm/lib/nexusoperation/fx.go index 8d46a261dc..0b30b0794e 100644 --- a/chasm/lib/nexusoperation/fx.go +++ b/chasm/lib/nexusoperation/fx.go @@ -6,7 +6,7 @@ import ( ) var Module = fx.Module( - "chasm.lib.nexusoperations", + "chasm.lib.nexusoperation", fx.Provide(configProvider), fx.Provide(NewOperationInvocationTaskExecutor), fx.Provide(NewOperationBackoffTaskExecutor), @@ -19,6 +19,11 @@ var Module = fx.Module( fx.Invoke(register), ) +var FrontendModule = fx.Module( + "chasm.lib.nexusoperation.frontend", + fx.Provide(NewFrontendHandler), +) + func register( registry *chasm.Registry, library *Library, diff --git a/client/frontend/client_gen.go b/client/frontend/client_gen.go index 7b3f3e3aa5..e003fcdcb4 100644 --- a/client/frontend/client_gen.go +++ b/client/frontend/client_gen.go @@ -19,6 +19,16 @@ func (c *clientImpl) CountActivityExecutions( return c.client.CountActivityExecutions(ctx, request, opts...) } +func (c *clientImpl) CountNexusOperationExecutions( + ctx context.Context, + request *workflowservice.CountNexusOperationExecutionsRequest, + opts ...grpc.CallOption, +) (*workflowservice.CountNexusOperationExecutionsResponse, error) { + ctx, cancel := c.createContext(ctx) + defer cancel() + return c.client.CountNexusOperationExecutions(ctx, request, opts...) +} + func (c *clientImpl) CountSchedules( ctx context.Context, request *workflowservice.CountSchedulesRequest, @@ -69,6 +79,16 @@ func (c *clientImpl) DeleteActivityExecution( return c.client.DeleteActivityExecution(ctx, request, opts...) } +func (c *clientImpl) DeleteNexusOperationExecution( + ctx context.Context, + request *workflowservice.DeleteNexusOperationExecutionRequest, + opts ...grpc.CallOption, +) (*workflowservice.DeleteNexusOperationExecutionResponse, error) { + ctx, cancel := c.createContext(ctx) + defer cancel() + return c.client.DeleteNexusOperationExecution(ctx, request, opts...) +} + func (c *clientImpl) DeleteSchedule( ctx context.Context, request *workflowservice.DeleteScheduleRequest, @@ -169,6 +189,16 @@ func (c *clientImpl) DescribeNamespace( return c.client.DescribeNamespace(ctx, request, opts...) } +func (c *clientImpl) DescribeNexusOperationExecution( + ctx context.Context, + request *workflowservice.DescribeNexusOperationExecutionRequest, + opts ...grpc.CallOption, +) (*workflowservice.DescribeNexusOperationExecutionResponse, error) { + ctx, cancel := c.createContext(ctx) + defer cancel() + return c.client.DescribeNexusOperationExecution(ctx, request, opts...) +} + func (c *clientImpl) DescribeSchedule( ctx context.Context, request *workflowservice.DescribeScheduleRequest, @@ -419,6 +449,16 @@ func (c *clientImpl) ListNamespaces( return c.client.ListNamespaces(ctx, request, opts...) } +func (c *clientImpl) ListNexusOperationExecutions( + ctx context.Context, + request *workflowservice.ListNexusOperationExecutionsRequest, + opts ...grpc.CallOption, +) (*workflowservice.ListNexusOperationExecutionsResponse, error) { + ctx, cancel := c.createContext(ctx) + defer cancel() + return c.client.ListNexusOperationExecutions(ctx, request, opts...) +} + func (c *clientImpl) ListOpenWorkflowExecutions( ctx context.Context, request *workflowservice.ListOpenWorkflowExecutionsRequest, @@ -549,6 +589,16 @@ func (c *clientImpl) PollActivityTaskQueue( return c.client.PollActivityTaskQueue(ctx, request, opts...) } +func (c *clientImpl) PollNexusOperationExecution( + ctx context.Context, + request *workflowservice.PollNexusOperationExecutionRequest, + opts ...grpc.CallOption, +) (*workflowservice.PollNexusOperationExecutionResponse, error) { + ctx, cancel := c.createContext(ctx) + defer cancel() + return c.client.PollNexusOperationExecution(ctx, request, opts...) +} + func (c *clientImpl) PollNexusTaskQueue( ctx context.Context, request *workflowservice.PollNexusTaskQueueRequest, @@ -639,6 +689,16 @@ func (c *clientImpl) RequestCancelActivityExecution( return c.client.RequestCancelActivityExecution(ctx, request, opts...) } +func (c *clientImpl) RequestCancelNexusOperationExecution( + ctx context.Context, + request *workflowservice.RequestCancelNexusOperationExecutionRequest, + opts ...grpc.CallOption, +) (*workflowservice.RequestCancelNexusOperationExecutionResponse, error) { + ctx, cancel := c.createContext(ctx) + defer cancel() + return c.client.RequestCancelNexusOperationExecution(ctx, request, opts...) +} + func (c *clientImpl) RequestCancelWorkflowExecution( ctx context.Context, request *workflowservice.RequestCancelWorkflowExecutionRequest, @@ -889,6 +949,16 @@ func (c *clientImpl) StartBatchOperation( return c.client.StartBatchOperation(ctx, request, opts...) } +func (c *clientImpl) StartNexusOperationExecution( + ctx context.Context, + request *workflowservice.StartNexusOperationExecutionRequest, + opts ...grpc.CallOption, +) (*workflowservice.StartNexusOperationExecutionResponse, error) { + ctx, cancel := c.createContext(ctx) + defer cancel() + return c.client.StartNexusOperationExecution(ctx, request, opts...) +} + func (c *clientImpl) StartWorkflowExecution( ctx context.Context, request *workflowservice.StartWorkflowExecutionRequest, @@ -919,6 +989,16 @@ func (c *clientImpl) TerminateActivityExecution( return c.client.TerminateActivityExecution(ctx, request, opts...) } +func (c *clientImpl) TerminateNexusOperationExecution( + ctx context.Context, + request *workflowservice.TerminateNexusOperationExecutionRequest, + opts ...grpc.CallOption, +) (*workflowservice.TerminateNexusOperationExecutionResponse, error) { + ctx, cancel := c.createContext(ctx) + defer cancel() + return c.client.TerminateNexusOperationExecution(ctx, request, opts...) +} + func (c *clientImpl) TerminateWorkflowExecution( ctx context.Context, request *workflowservice.TerminateWorkflowExecutionRequest, diff --git a/client/frontend/metric_client_gen.go b/client/frontend/metric_client_gen.go index ad15ce5dc4..4366d8de9e 100644 --- a/client/frontend/metric_client_gen.go +++ b/client/frontend/metric_client_gen.go @@ -23,6 +23,20 @@ func (c *metricClient) CountActivityExecutions( return c.client.CountActivityExecutions(ctx, request, opts...) } +func (c *metricClient) CountNexusOperationExecutions( + ctx context.Context, + request *workflowservice.CountNexusOperationExecutionsRequest, + opts ...grpc.CallOption, +) (_ *workflowservice.CountNexusOperationExecutionsResponse, retError error) { + + metricsHandler, startTime := c.startMetricsRecording(ctx, "FrontendClientCountNexusOperationExecutions") + defer func() { + c.finishMetricsRecording(metricsHandler, startTime, retError) + }() + + return c.client.CountNexusOperationExecutions(ctx, request, opts...) +} + func (c *metricClient) CountSchedules( ctx context.Context, request *workflowservice.CountSchedulesRequest, @@ -93,6 +107,20 @@ func (c *metricClient) DeleteActivityExecution( return c.client.DeleteActivityExecution(ctx, request, opts...) } +func (c *metricClient) DeleteNexusOperationExecution( + ctx context.Context, + request *workflowservice.DeleteNexusOperationExecutionRequest, + opts ...grpc.CallOption, +) (_ *workflowservice.DeleteNexusOperationExecutionResponse, retError error) { + + metricsHandler, startTime := c.startMetricsRecording(ctx, "FrontendClientDeleteNexusOperationExecution") + defer func() { + c.finishMetricsRecording(metricsHandler, startTime, retError) + }() + + return c.client.DeleteNexusOperationExecution(ctx, request, opts...) +} + func (c *metricClient) DeleteSchedule( ctx context.Context, request *workflowservice.DeleteScheduleRequest, @@ -233,6 +261,20 @@ func (c *metricClient) DescribeNamespace( return c.client.DescribeNamespace(ctx, request, opts...) } +func (c *metricClient) DescribeNexusOperationExecution( + ctx context.Context, + request *workflowservice.DescribeNexusOperationExecutionRequest, + opts ...grpc.CallOption, +) (_ *workflowservice.DescribeNexusOperationExecutionResponse, retError error) { + + metricsHandler, startTime := c.startMetricsRecording(ctx, "FrontendClientDescribeNexusOperationExecution") + defer func() { + c.finishMetricsRecording(metricsHandler, startTime, retError) + }() + + return c.client.DescribeNexusOperationExecution(ctx, request, opts...) +} + func (c *metricClient) DescribeSchedule( ctx context.Context, request *workflowservice.DescribeScheduleRequest, @@ -583,6 +625,20 @@ func (c *metricClient) ListNamespaces( return c.client.ListNamespaces(ctx, request, opts...) } +func (c *metricClient) ListNexusOperationExecutions( + ctx context.Context, + request *workflowservice.ListNexusOperationExecutionsRequest, + opts ...grpc.CallOption, +) (_ *workflowservice.ListNexusOperationExecutionsResponse, retError error) { + + metricsHandler, startTime := c.startMetricsRecording(ctx, "FrontendClientListNexusOperationExecutions") + defer func() { + c.finishMetricsRecording(metricsHandler, startTime, retError) + }() + + return c.client.ListNexusOperationExecutions(ctx, request, opts...) +} + func (c *metricClient) ListOpenWorkflowExecutions( ctx context.Context, request *workflowservice.ListOpenWorkflowExecutionsRequest, @@ -765,6 +821,20 @@ func (c *metricClient) PollActivityTaskQueue( return c.client.PollActivityTaskQueue(ctx, request, opts...) } +func (c *metricClient) PollNexusOperationExecution( + ctx context.Context, + request *workflowservice.PollNexusOperationExecutionRequest, + opts ...grpc.CallOption, +) (_ *workflowservice.PollNexusOperationExecutionResponse, retError error) { + + metricsHandler, startTime := c.startMetricsRecording(ctx, "FrontendClientPollNexusOperationExecution") + defer func() { + c.finishMetricsRecording(metricsHandler, startTime, retError) + }() + + return c.client.PollNexusOperationExecution(ctx, request, opts...) +} + func (c *metricClient) PollNexusTaskQueue( ctx context.Context, request *workflowservice.PollNexusTaskQueueRequest, @@ -891,6 +961,20 @@ func (c *metricClient) RequestCancelActivityExecution( return c.client.RequestCancelActivityExecution(ctx, request, opts...) } +func (c *metricClient) RequestCancelNexusOperationExecution( + ctx context.Context, + request *workflowservice.RequestCancelNexusOperationExecutionRequest, + opts ...grpc.CallOption, +) (_ *workflowservice.RequestCancelNexusOperationExecutionResponse, retError error) { + + metricsHandler, startTime := c.startMetricsRecording(ctx, "FrontendClientRequestCancelNexusOperationExecution") + defer func() { + c.finishMetricsRecording(metricsHandler, startTime, retError) + }() + + return c.client.RequestCancelNexusOperationExecution(ctx, request, opts...) +} + func (c *metricClient) RequestCancelWorkflowExecution( ctx context.Context, request *workflowservice.RequestCancelWorkflowExecutionRequest, @@ -1241,6 +1325,20 @@ func (c *metricClient) StartBatchOperation( return c.client.StartBatchOperation(ctx, request, opts...) } +func (c *metricClient) StartNexusOperationExecution( + ctx context.Context, + request *workflowservice.StartNexusOperationExecutionRequest, + opts ...grpc.CallOption, +) (_ *workflowservice.StartNexusOperationExecutionResponse, retError error) { + + metricsHandler, startTime := c.startMetricsRecording(ctx, "FrontendClientStartNexusOperationExecution") + defer func() { + c.finishMetricsRecording(metricsHandler, startTime, retError) + }() + + return c.client.StartNexusOperationExecution(ctx, request, opts...) +} + func (c *metricClient) StartWorkflowExecution( ctx context.Context, request *workflowservice.StartWorkflowExecutionRequest, @@ -1283,6 +1381,20 @@ func (c *metricClient) TerminateActivityExecution( return c.client.TerminateActivityExecution(ctx, request, opts...) } +func (c *metricClient) TerminateNexusOperationExecution( + ctx context.Context, + request *workflowservice.TerminateNexusOperationExecutionRequest, + opts ...grpc.CallOption, +) (_ *workflowservice.TerminateNexusOperationExecutionResponse, retError error) { + + metricsHandler, startTime := c.startMetricsRecording(ctx, "FrontendClientTerminateNexusOperationExecution") + defer func() { + c.finishMetricsRecording(metricsHandler, startTime, retError) + }() + + return c.client.TerminateNexusOperationExecution(ctx, request, opts...) +} + func (c *metricClient) TerminateWorkflowExecution( ctx context.Context, request *workflowservice.TerminateWorkflowExecutionRequest, diff --git a/client/frontend/retryable_client_gen.go b/client/frontend/retryable_client_gen.go index a46a3bbfd2..2efe857cd4 100644 --- a/client/frontend/retryable_client_gen.go +++ b/client/frontend/retryable_client_gen.go @@ -26,6 +26,21 @@ func (c *retryableClient) CountActivityExecutions( return resp, err } +func (c *retryableClient) CountNexusOperationExecutions( + ctx context.Context, + request *workflowservice.CountNexusOperationExecutionsRequest, + opts ...grpc.CallOption, +) (*workflowservice.CountNexusOperationExecutionsResponse, error) { + var resp *workflowservice.CountNexusOperationExecutionsResponse + op := func(ctx context.Context) error { + var err error + resp, err = c.client.CountNexusOperationExecutions(ctx, request, opts...) + return err + } + err := backoff.ThrottleRetryContext(ctx, op, c.policy, c.isRetryable) + return resp, err +} + func (c *retryableClient) CountSchedules( ctx context.Context, request *workflowservice.CountSchedulesRequest, @@ -101,6 +116,21 @@ func (c *retryableClient) DeleteActivityExecution( return resp, err } +func (c *retryableClient) DeleteNexusOperationExecution( + ctx context.Context, + request *workflowservice.DeleteNexusOperationExecutionRequest, + opts ...grpc.CallOption, +) (*workflowservice.DeleteNexusOperationExecutionResponse, error) { + var resp *workflowservice.DeleteNexusOperationExecutionResponse + op := func(ctx context.Context) error { + var err error + resp, err = c.client.DeleteNexusOperationExecution(ctx, request, opts...) + return err + } + err := backoff.ThrottleRetryContext(ctx, op, c.policy, c.isRetryable) + return resp, err +} + func (c *retryableClient) DeleteSchedule( ctx context.Context, request *workflowservice.DeleteScheduleRequest, @@ -251,6 +281,21 @@ func (c *retryableClient) DescribeNamespace( return resp, err } +func (c *retryableClient) DescribeNexusOperationExecution( + ctx context.Context, + request *workflowservice.DescribeNexusOperationExecutionRequest, + opts ...grpc.CallOption, +) (*workflowservice.DescribeNexusOperationExecutionResponse, error) { + var resp *workflowservice.DescribeNexusOperationExecutionResponse + op := func(ctx context.Context) error { + var err error + resp, err = c.client.DescribeNexusOperationExecution(ctx, request, opts...) + return err + } + err := backoff.ThrottleRetryContext(ctx, op, c.policy, c.isRetryable) + return resp, err +} + func (c *retryableClient) DescribeSchedule( ctx context.Context, request *workflowservice.DescribeScheduleRequest, @@ -626,6 +671,21 @@ func (c *retryableClient) ListNamespaces( return resp, err } +func (c *retryableClient) ListNexusOperationExecutions( + ctx context.Context, + request *workflowservice.ListNexusOperationExecutionsRequest, + opts ...grpc.CallOption, +) (*workflowservice.ListNexusOperationExecutionsResponse, error) { + var resp *workflowservice.ListNexusOperationExecutionsResponse + op := func(ctx context.Context) error { + var err error + resp, err = c.client.ListNexusOperationExecutions(ctx, request, opts...) + return err + } + err := backoff.ThrottleRetryContext(ctx, op, c.policy, c.isRetryable) + return resp, err +} + func (c *retryableClient) ListOpenWorkflowExecutions( ctx context.Context, request *workflowservice.ListOpenWorkflowExecutionsRequest, @@ -821,6 +881,21 @@ func (c *retryableClient) PollActivityTaskQueue( return resp, err } +func (c *retryableClient) PollNexusOperationExecution( + ctx context.Context, + request *workflowservice.PollNexusOperationExecutionRequest, + opts ...grpc.CallOption, +) (*workflowservice.PollNexusOperationExecutionResponse, error) { + var resp *workflowservice.PollNexusOperationExecutionResponse + op := func(ctx context.Context) error { + var err error + resp, err = c.client.PollNexusOperationExecution(ctx, request, opts...) + return err + } + err := backoff.ThrottleRetryContext(ctx, op, c.policy, c.isRetryable) + return resp, err +} + func (c *retryableClient) PollNexusTaskQueue( ctx context.Context, request *workflowservice.PollNexusTaskQueueRequest, @@ -956,6 +1031,21 @@ func (c *retryableClient) RequestCancelActivityExecution( return resp, err } +func (c *retryableClient) RequestCancelNexusOperationExecution( + ctx context.Context, + request *workflowservice.RequestCancelNexusOperationExecutionRequest, + opts ...grpc.CallOption, +) (*workflowservice.RequestCancelNexusOperationExecutionResponse, error) { + var resp *workflowservice.RequestCancelNexusOperationExecutionResponse + op := func(ctx context.Context) error { + var err error + resp, err = c.client.RequestCancelNexusOperationExecution(ctx, request, opts...) + return err + } + err := backoff.ThrottleRetryContext(ctx, op, c.policy, c.isRetryable) + return resp, err +} + func (c *retryableClient) RequestCancelWorkflowExecution( ctx context.Context, request *workflowservice.RequestCancelWorkflowExecutionRequest, @@ -1331,6 +1421,21 @@ func (c *retryableClient) StartBatchOperation( return resp, err } +func (c *retryableClient) StartNexusOperationExecution( + ctx context.Context, + request *workflowservice.StartNexusOperationExecutionRequest, + opts ...grpc.CallOption, +) (*workflowservice.StartNexusOperationExecutionResponse, error) { + var resp *workflowservice.StartNexusOperationExecutionResponse + op := func(ctx context.Context) error { + var err error + resp, err = c.client.StartNexusOperationExecution(ctx, request, opts...) + return err + } + err := backoff.ThrottleRetryContext(ctx, op, c.policy, c.isRetryable) + return resp, err +} + func (c *retryableClient) StartWorkflowExecution( ctx context.Context, request *workflowservice.StartWorkflowExecutionRequest, @@ -1376,6 +1481,21 @@ func (c *retryableClient) TerminateActivityExecution( return resp, err } +func (c *retryableClient) TerminateNexusOperationExecution( + ctx context.Context, + request *workflowservice.TerminateNexusOperationExecutionRequest, + opts ...grpc.CallOption, +) (*workflowservice.TerminateNexusOperationExecutionResponse, error) { + var resp *workflowservice.TerminateNexusOperationExecutionResponse + op := func(ctx context.Context) error { + var err error + resp, err = c.client.TerminateNexusOperationExecution(ctx, request, opts...) + return err + } + err := backoff.ThrottleRetryContext(ctx, op, c.policy, c.isRetryable) + return resp, err +} + func (c *retryableClient) TerminateWorkflowExecution( ctx context.Context, request *workflowservice.TerminateWorkflowExecutionRequest, diff --git a/common/api/metadata.go b/common/api/metadata.go index 669f558b29..cc41125bf1 100644 --- a/common/api/metadata.go +++ b/common/api/metadata.go @@ -173,6 +173,14 @@ var ( "UpdateWorkerConfig": {Scope: ScopeNamespace, Access: AccessWrite, Polling: PollingNone}, "PauseWorkflowExecution": {Scope: ScopeNamespace, Access: AccessWrite, Polling: PollingNone}, "UnpauseWorkflowExecution": {Scope: ScopeNamespace, Access: AccessWrite, Polling: PollingNone}, + "DescribeNexusOperationExecution": {Scope: ScopeNamespace, Access: AccessReadOnly, Polling: PollingCapable}, + "ListNexusOperationExecutions": {Scope: ScopeNamespace, Access: AccessReadOnly, Polling: PollingNone}, + "StartNexusOperationExecution": {Scope: ScopeNamespace, Access: AccessWrite, Polling: PollingNone}, + "PollNexusOperationExecution": {Scope: ScopeNamespace, Access: AccessReadOnly, Polling: PollingAlways}, + "CountNexusOperationExecutions": {Scope: ScopeNamespace, Access: AccessReadOnly, Polling: PollingNone}, + "RequestCancelNexusOperationExecution": {Scope: ScopeNamespace, Access: AccessWrite, Polling: PollingNone}, + "TerminateNexusOperationExecution": {Scope: ScopeNamespace, Access: AccessWrite, Polling: PollingNone}, + "DeleteNexusOperationExecution": {Scope: ScopeNamespace, Access: AccessWrite, Polling: PollingNone}, } operatorServiceMetadata = map[string]MethodMetadata{ "AddSearchAttributes": {Scope: ScopeNamespace, Access: AccessAdmin, Polling: PollingNone}, diff --git a/common/rpc/interceptor/logtags/workflow_service_server_gen.go b/common/rpc/interceptor/logtags/workflow_service_server_gen.go index 9857e7341b..5ce0c824d1 100644 --- a/common/rpc/interceptor/logtags/workflow_service_server_gen.go +++ b/common/rpc/interceptor/logtags/workflow_service_server_gen.go @@ -13,6 +13,10 @@ func (wt *WorkflowTags) extractFromWorkflowServiceServerMessage(message any) []t return nil case *workflowservice.CountActivityExecutionsResponse: return nil + case *workflowservice.CountNexusOperationExecutionsRequest: + return nil + case *workflowservice.CountNexusOperationExecutionsResponse: + return nil case *workflowservice.CountSchedulesRequest: return nil case *workflowservice.CountSchedulesResponse: @@ -35,6 +39,12 @@ func (wt *WorkflowTags) extractFromWorkflowServiceServerMessage(message any) []t } case *workflowservice.DeleteActivityExecutionResponse: return nil + case *workflowservice.DeleteNexusOperationExecutionRequest: + return []tag.Tag{ + tag.WorkflowRunID(r.GetRunId()), + } + case *workflowservice.DeleteNexusOperationExecutionResponse: + return nil case *workflowservice.DeleteScheduleRequest: return nil case *workflowservice.DeleteScheduleResponse: @@ -82,6 +92,14 @@ func (wt *WorkflowTags) extractFromWorkflowServiceServerMessage(message any) []t return nil case *workflowservice.DescribeNamespaceResponse: return nil + case *workflowservice.DescribeNexusOperationExecutionRequest: + return []tag.Tag{ + tag.WorkflowRunID(r.GetRunId()), + } + case *workflowservice.DescribeNexusOperationExecutionResponse: + return []tag.Tag{ + tag.WorkflowRunID(r.GetRunId()), + } case *workflowservice.DescribeScheduleRequest: return nil case *workflowservice.DescribeScheduleResponse: @@ -191,6 +209,10 @@ func (wt *WorkflowTags) extractFromWorkflowServiceServerMessage(message any) []t return nil case *workflowservice.ListNamespacesResponse: return nil + case *workflowservice.ListNexusOperationExecutionsRequest: + return nil + case *workflowservice.ListNexusOperationExecutionsResponse: + return nil case *workflowservice.ListOpenWorkflowExecutionsRequest: return nil case *workflowservice.ListOpenWorkflowExecutionsResponse: @@ -256,6 +278,14 @@ func (wt *WorkflowTags) extractFromWorkflowServiceServerMessage(message any) []t tag.WorkflowID(r.GetWorkflowExecution().GetWorkflowId()), tag.WorkflowRunID(r.GetWorkflowExecution().GetRunId()), } + case *workflowservice.PollNexusOperationExecutionRequest: + return []tag.Tag{ + tag.WorkflowRunID(r.GetRunId()), + } + case *workflowservice.PollNexusOperationExecutionResponse: + return []tag.Tag{ + tag.WorkflowRunID(r.GetRunId()), + } case *workflowservice.PollNexusTaskQueueRequest: return nil case *workflowservice.PollNexusTaskQueueResponse: @@ -309,6 +339,12 @@ func (wt *WorkflowTags) extractFromWorkflowServiceServerMessage(message any) []t } case *workflowservice.RequestCancelActivityExecutionResponse: return nil + case *workflowservice.RequestCancelNexusOperationExecutionRequest: + return []tag.Tag{ + tag.WorkflowRunID(r.GetRunId()), + } + case *workflowservice.RequestCancelNexusOperationExecutionResponse: + return nil case *workflowservice.RequestCancelWorkflowExecutionRequest: return []tag.Tag{ tag.WorkflowID(r.GetWorkflowExecution().GetWorkflowId()), @@ -441,6 +477,12 @@ func (wt *WorkflowTags) extractFromWorkflowServiceServerMessage(message any) []t return nil case *workflowservice.StartBatchOperationResponse: return nil + case *workflowservice.StartNexusOperationExecutionRequest: + return nil + case *workflowservice.StartNexusOperationExecutionResponse: + return []tag.Tag{ + tag.WorkflowRunID(r.GetRunId()), + } case *workflowservice.StartWorkflowExecutionRequest: return []tag.Tag{ tag.WorkflowID(r.GetWorkflowId()), @@ -459,6 +501,12 @@ func (wt *WorkflowTags) extractFromWorkflowServiceServerMessage(message any) []t } case *workflowservice.TerminateActivityExecutionResponse: return nil + case *workflowservice.TerminateNexusOperationExecutionRequest: + return []tag.Tag{ + tag.WorkflowRunID(r.GetRunId()), + } + case *workflowservice.TerminateNexusOperationExecutionResponse: + return nil case *workflowservice.TerminateWorkflowExecutionRequest: return []tag.Tag{ tag.WorkflowID(r.GetWorkflowExecution().GetWorkflowId()), diff --git a/common/rpc/interceptor/redirection.go b/common/rpc/interceptor/redirection.go index b88f1eff43..f83663c6fb 100644 --- a/common/rpc/interceptor/redirection.go +++ b/common/rpc/interceptor/redirection.go @@ -147,6 +147,15 @@ var ( "RequestCancelActivityExecution": func() any { return &workflowservice.RequestCancelActivityExecutionResponse{} }, "TerminateActivityExecution": func() any { return &workflowservice.TerminateActivityExecutionResponse{} }, "DeleteActivityExecution": func() any { return &workflowservice.DeleteActivityExecutionResponse{} }, + + "DescribeNexusOperationExecution": func() any { return &workflowservice.DescribeNexusOperationExecutionResponse{} }, + "ListNexusOperationExecutions": func() any { return &workflowservice.ListNexusOperationExecutionsResponse{} }, + "StartNexusOperationExecution": func() any { return &workflowservice.StartNexusOperationExecutionResponse{} }, + "PollNexusOperationExecution": func() any { return &workflowservice.PollNexusOperationExecutionResponse{} }, + "CountNexusOperationExecutions": func() any { return &workflowservice.CountNexusOperationExecutionsResponse{} }, + "RequestCancelNexusOperationExecution": func() any { return &workflowservice.RequestCancelNexusOperationExecutionResponse{} }, + "TerminateNexusOperationExecution": func() any { return &workflowservice.TerminateNexusOperationExecutionResponse{} }, + "DeleteNexusOperationExecution": func() any { return &workflowservice.DeleteNexusOperationExecutionResponse{} }, } ) diff --git a/common/rpc/interceptor/redirection_test.go b/common/rpc/interceptor/redirection_test.go index 9624754b00..a2b5ec5bf3 100644 --- a/common/rpc/interceptor/redirection_test.go +++ b/common/rpc/interceptor/redirection_test.go @@ -204,6 +204,15 @@ func (s *redirectionInterceptorSuite) TestGlobalAPI() { "RequestCancelActivityExecution": {}, "TerminateActivityExecution": {}, "DeleteActivityExecution": {}, + + "CountNexusOperationExecutions": {}, + "DeleteNexusOperationExecution": {}, + "DescribeNexusOperationExecution": {}, + "ListNexusOperationExecutions": {}, + "PollNexusOperationExecution": {}, + "RequestCancelNexusOperationExecution": {}, + "StartNexusOperationExecution": {}, + "TerminateNexusOperationExecution": {}, }, apis) } diff --git a/common/testing/mockapi/workflowservicemock/v1/service_grpc.pb.mock.go b/common/testing/mockapi/workflowservicemock/v1/service_grpc.pb.mock.go index 0ff2f78a3a..ebcefd5de2 100644 --- a/common/testing/mockapi/workflowservicemock/v1/service_grpc.pb.mock.go +++ b/common/testing/mockapi/workflowservicemock/v1/service_grpc.pb.mock.go @@ -62,6 +62,26 @@ func (mr *MockWorkflowServiceClientMockRecorder) CountActivityExecutions(ctx, in return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CountActivityExecutions", reflect.TypeOf((*MockWorkflowServiceClient)(nil).CountActivityExecutions), varargs...) } +// CountNexusOperationExecutions mocks base method. +func (m *MockWorkflowServiceClient) CountNexusOperationExecutions(ctx context.Context, in *workflowservice.CountNexusOperationExecutionsRequest, opts ...grpc.CallOption) (*workflowservice.CountNexusOperationExecutionsResponse, error) { + m.ctrl.T.Helper() + varargs := []any{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "CountNexusOperationExecutions", varargs...) + ret0, _ := ret[0].(*workflowservice.CountNexusOperationExecutionsResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CountNexusOperationExecutions indicates an expected call of CountNexusOperationExecutions. +func (mr *MockWorkflowServiceClientMockRecorder) CountNexusOperationExecutions(ctx, in any, opts ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CountNexusOperationExecutions", reflect.TypeOf((*MockWorkflowServiceClient)(nil).CountNexusOperationExecutions), varargs...) +} + // CountSchedules mocks base method. func (m *MockWorkflowServiceClient) CountSchedules(ctx context.Context, in *workflowservice.CountSchedulesRequest, opts ...grpc.CallOption) (*workflowservice.CountSchedulesResponse, error) { m.ctrl.T.Helper() @@ -162,6 +182,26 @@ func (mr *MockWorkflowServiceClientMockRecorder) DeleteActivityExecution(ctx, in return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteActivityExecution", reflect.TypeOf((*MockWorkflowServiceClient)(nil).DeleteActivityExecution), varargs...) } +// DeleteNexusOperationExecution mocks base method. +func (m *MockWorkflowServiceClient) DeleteNexusOperationExecution(ctx context.Context, in *workflowservice.DeleteNexusOperationExecutionRequest, opts ...grpc.CallOption) (*workflowservice.DeleteNexusOperationExecutionResponse, error) { + m.ctrl.T.Helper() + varargs := []any{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "DeleteNexusOperationExecution", varargs...) + ret0, _ := ret[0].(*workflowservice.DeleteNexusOperationExecutionResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DeleteNexusOperationExecution indicates an expected call of DeleteNexusOperationExecution. +func (mr *MockWorkflowServiceClientMockRecorder) DeleteNexusOperationExecution(ctx, in any, opts ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteNexusOperationExecution", reflect.TypeOf((*MockWorkflowServiceClient)(nil).DeleteNexusOperationExecution), varargs...) +} + // DeleteSchedule mocks base method. func (m *MockWorkflowServiceClient) DeleteSchedule(ctx context.Context, in *workflowservice.DeleteScheduleRequest, opts ...grpc.CallOption) (*workflowservice.DeleteScheduleResponse, error) { m.ctrl.T.Helper() @@ -362,6 +402,26 @@ func (mr *MockWorkflowServiceClientMockRecorder) DescribeNamespace(ctx, in any, return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DescribeNamespace", reflect.TypeOf((*MockWorkflowServiceClient)(nil).DescribeNamespace), varargs...) } +// DescribeNexusOperationExecution mocks base method. +func (m *MockWorkflowServiceClient) DescribeNexusOperationExecution(ctx context.Context, in *workflowservice.DescribeNexusOperationExecutionRequest, opts ...grpc.CallOption) (*workflowservice.DescribeNexusOperationExecutionResponse, error) { + m.ctrl.T.Helper() + varargs := []any{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "DescribeNexusOperationExecution", varargs...) + ret0, _ := ret[0].(*workflowservice.DescribeNexusOperationExecutionResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DescribeNexusOperationExecution indicates an expected call of DescribeNexusOperationExecution. +func (mr *MockWorkflowServiceClientMockRecorder) DescribeNexusOperationExecution(ctx, in any, opts ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DescribeNexusOperationExecution", reflect.TypeOf((*MockWorkflowServiceClient)(nil).DescribeNexusOperationExecution), varargs...) +} + // DescribeSchedule mocks base method. func (m *MockWorkflowServiceClient) DescribeSchedule(ctx context.Context, in *workflowservice.DescribeScheduleRequest, opts ...grpc.CallOption) (*workflowservice.DescribeScheduleResponse, error) { m.ctrl.T.Helper() @@ -862,6 +922,26 @@ func (mr *MockWorkflowServiceClientMockRecorder) ListNamespaces(ctx, in any, opt return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListNamespaces", reflect.TypeOf((*MockWorkflowServiceClient)(nil).ListNamespaces), varargs...) } +// ListNexusOperationExecutions mocks base method. +func (m *MockWorkflowServiceClient) ListNexusOperationExecutions(ctx context.Context, in *workflowservice.ListNexusOperationExecutionsRequest, opts ...grpc.CallOption) (*workflowservice.ListNexusOperationExecutionsResponse, error) { + m.ctrl.T.Helper() + varargs := []any{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "ListNexusOperationExecutions", varargs...) + ret0, _ := ret[0].(*workflowservice.ListNexusOperationExecutionsResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListNexusOperationExecutions indicates an expected call of ListNexusOperationExecutions. +func (mr *MockWorkflowServiceClientMockRecorder) ListNexusOperationExecutions(ctx, in any, opts ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListNexusOperationExecutions", reflect.TypeOf((*MockWorkflowServiceClient)(nil).ListNexusOperationExecutions), varargs...) +} + // ListOpenWorkflowExecutions mocks base method. func (m *MockWorkflowServiceClient) ListOpenWorkflowExecutions(ctx context.Context, in *workflowservice.ListOpenWorkflowExecutionsRequest, opts ...grpc.CallOption) (*workflowservice.ListOpenWorkflowExecutionsResponse, error) { m.ctrl.T.Helper() @@ -1122,6 +1202,26 @@ func (mr *MockWorkflowServiceClientMockRecorder) PollActivityTaskQueue(ctx, in a return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PollActivityTaskQueue", reflect.TypeOf((*MockWorkflowServiceClient)(nil).PollActivityTaskQueue), varargs...) } +// PollNexusOperationExecution mocks base method. +func (m *MockWorkflowServiceClient) PollNexusOperationExecution(ctx context.Context, in *workflowservice.PollNexusOperationExecutionRequest, opts ...grpc.CallOption) (*workflowservice.PollNexusOperationExecutionResponse, error) { + m.ctrl.T.Helper() + varargs := []any{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "PollNexusOperationExecution", varargs...) + ret0, _ := ret[0].(*workflowservice.PollNexusOperationExecutionResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// PollNexusOperationExecution indicates an expected call of PollNexusOperationExecution. +func (mr *MockWorkflowServiceClientMockRecorder) PollNexusOperationExecution(ctx, in any, opts ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PollNexusOperationExecution", reflect.TypeOf((*MockWorkflowServiceClient)(nil).PollNexusOperationExecution), varargs...) +} + // PollNexusTaskQueue mocks base method. func (m *MockWorkflowServiceClient) PollNexusTaskQueue(ctx context.Context, in *workflowservice.PollNexusTaskQueueRequest, opts ...grpc.CallOption) (*workflowservice.PollNexusTaskQueueResponse, error) { m.ctrl.T.Helper() @@ -1302,6 +1402,26 @@ func (mr *MockWorkflowServiceClientMockRecorder) RequestCancelActivityExecution( return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RequestCancelActivityExecution", reflect.TypeOf((*MockWorkflowServiceClient)(nil).RequestCancelActivityExecution), varargs...) } +// RequestCancelNexusOperationExecution mocks base method. +func (m *MockWorkflowServiceClient) RequestCancelNexusOperationExecution(ctx context.Context, in *workflowservice.RequestCancelNexusOperationExecutionRequest, opts ...grpc.CallOption) (*workflowservice.RequestCancelNexusOperationExecutionResponse, error) { + m.ctrl.T.Helper() + varargs := []any{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "RequestCancelNexusOperationExecution", varargs...) + ret0, _ := ret[0].(*workflowservice.RequestCancelNexusOperationExecutionResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// RequestCancelNexusOperationExecution indicates an expected call of RequestCancelNexusOperationExecution. +func (mr *MockWorkflowServiceClientMockRecorder) RequestCancelNexusOperationExecution(ctx, in any, opts ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RequestCancelNexusOperationExecution", reflect.TypeOf((*MockWorkflowServiceClient)(nil).RequestCancelNexusOperationExecution), varargs...) +} + // RequestCancelWorkflowExecution mocks base method. func (m *MockWorkflowServiceClient) RequestCancelWorkflowExecution(ctx context.Context, in *workflowservice.RequestCancelWorkflowExecutionRequest, opts ...grpc.CallOption) (*workflowservice.RequestCancelWorkflowExecutionResponse, error) { m.ctrl.T.Helper() @@ -1802,6 +1922,26 @@ func (mr *MockWorkflowServiceClientMockRecorder) StartBatchOperation(ctx, in any return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartBatchOperation", reflect.TypeOf((*MockWorkflowServiceClient)(nil).StartBatchOperation), varargs...) } +// StartNexusOperationExecution mocks base method. +func (m *MockWorkflowServiceClient) StartNexusOperationExecution(ctx context.Context, in *workflowservice.StartNexusOperationExecutionRequest, opts ...grpc.CallOption) (*workflowservice.StartNexusOperationExecutionResponse, error) { + m.ctrl.T.Helper() + varargs := []any{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "StartNexusOperationExecution", varargs...) + ret0, _ := ret[0].(*workflowservice.StartNexusOperationExecutionResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StartNexusOperationExecution indicates an expected call of StartNexusOperationExecution. +func (mr *MockWorkflowServiceClientMockRecorder) StartNexusOperationExecution(ctx, in any, opts ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartNexusOperationExecution", reflect.TypeOf((*MockWorkflowServiceClient)(nil).StartNexusOperationExecution), varargs...) +} + // StartWorkflowExecution mocks base method. func (m *MockWorkflowServiceClient) StartWorkflowExecution(ctx context.Context, in *workflowservice.StartWorkflowExecutionRequest, opts ...grpc.CallOption) (*workflowservice.StartWorkflowExecutionResponse, error) { m.ctrl.T.Helper() @@ -1862,6 +2002,26 @@ func (mr *MockWorkflowServiceClientMockRecorder) TerminateActivityExecution(ctx, return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TerminateActivityExecution", reflect.TypeOf((*MockWorkflowServiceClient)(nil).TerminateActivityExecution), varargs...) } +// TerminateNexusOperationExecution mocks base method. +func (m *MockWorkflowServiceClient) TerminateNexusOperationExecution(ctx context.Context, in *workflowservice.TerminateNexusOperationExecutionRequest, opts ...grpc.CallOption) (*workflowservice.TerminateNexusOperationExecutionResponse, error) { + m.ctrl.T.Helper() + varargs := []any{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "TerminateNexusOperationExecution", varargs...) + ret0, _ := ret[0].(*workflowservice.TerminateNexusOperationExecutionResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// TerminateNexusOperationExecution indicates an expected call of TerminateNexusOperationExecution. +func (mr *MockWorkflowServiceClientMockRecorder) TerminateNexusOperationExecution(ctx, in any, opts ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TerminateNexusOperationExecution", reflect.TypeOf((*MockWorkflowServiceClient)(nil).TerminateNexusOperationExecution), varargs...) +} + // TerminateWorkflowExecution mocks base method. func (m *MockWorkflowServiceClient) TerminateWorkflowExecution(ctx context.Context, in *workflowservice.TerminateWorkflowExecutionRequest, opts ...grpc.CallOption) (*workflowservice.TerminateWorkflowExecutionResponse, error) { m.ctrl.T.Helper() diff --git a/go.mod b/go.mod index 60ad1aa45a..e5564423e9 100644 --- a/go.mod +++ b/go.mod @@ -60,7 +60,7 @@ require ( go.opentelemetry.io/otel/sdk v1.34.0 go.opentelemetry.io/otel/sdk/metric v1.34.0 go.opentelemetry.io/otel/trace v1.34.0 - go.temporal.io/api v1.62.2 + go.temporal.io/api v1.62.3-0.20260311231426-d5018d14a239 go.temporal.io/sdk v1.38.0 go.uber.org/fx v1.24.0 go.uber.org/mock v0.6.0 diff --git a/go.sum b/go.sum index 2cd1258aaf..2cd4be36cf 100644 --- a/go.sum +++ b/go.sum @@ -375,8 +375,8 @@ go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4= go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4= -go.temporal.io/api v1.62.2 h1:jFhIzlqNyJsJZTiCRQmTIMv6OTQ5BZ57z8gbgLGMaoo= -go.temporal.io/api v1.62.2/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.62.3-0.20260311231426-d5018d14a239 h1:YagBLnjeFGX2I41A5uELGneG1UFv+B+wd0Rcg9o1qzM= +go.temporal.io/api v1.62.3-0.20260311231426-d5018d14a239/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.temporal.io/sdk v1.38.0 h1:4Bok5LEdED7YKpsSjIa3dDqram5VOq+ydBf4pyx0Wo4= go.temporal.io/sdk v1.38.0/go.mod h1:a+R2Ej28ObvHoILbHaxMyind7M6D+W0L7edt5UJF4SE= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= diff --git a/service/frontend/configs/quotas.go b/service/frontend/configs/quotas.go index af667883e4..046b042c0a 100644 --- a/service/frontend/configs/quotas.go +++ b/service/frontend/configs/quotas.go @@ -35,26 +35,26 @@ var ( // from their corresponding quota, which is determined by // dynamicconfig.FrontendMaxConcurrentLongRunningRequestsPerInstance. If the value is not set, // then the method is not considered a long-running request and the number of concurrent - // requests will not be throttled. The Poll* methods here are long-running because they block - // until there is a task available. GetWorkflowExecutionHistory and DescribeActivityExecution - // methods are blocking only if WaitNewEvent/LongPollToken are set, otherwise they are not - // long-running. The QueryWorkflow and UpdateWorkflowExecution methods are long-running because - // they both block until a background WFT is complete. + // requests will not be throttled. ExecutionAPICountLimitOverride = map[string]int{ + // These methods here are long-running because they block until there is a task available. "/temporal.api.workflowservice.v1.WorkflowService/PollActivityTaskQueue": 1, "/temporal.api.workflowservice.v1.WorkflowService/PollWorkflowTaskQueue": 1, "/temporal.api.workflowservice.v1.WorkflowService/PollWorkflowExecutionUpdate": 1, - "/temporal.api.workflowservice.v1.WorkflowService/QueryWorkflow": 1, - "/temporal.api.workflowservice.v1.WorkflowService/UpdateWorkflowExecution": 1, "/temporal.api.workflowservice.v1.WorkflowService/PollNexusTaskQueue": 1, + "/temporal.api.workflowservice.v1.WorkflowService/PollActivityExecution": 1, + "/temporal.api.workflowservice.v1.WorkflowService/PollNexusOperationExecution": 1, - // Long-running if activity outcome is not already available - "/temporal.api.workflowservice.v1.WorkflowService/PollActivityExecution": 1, - // Long-running if certain request parameters are set - "/temporal.api.workflowservice.v1.WorkflowService/GetWorkflowExecutionHistory": 1, - "/temporal.api.workflowservice.v1.WorkflowService/DescribeActivityExecution": 1, + // These methods are long-running because they block until a background WFT is complete. + "/temporal.api.workflowservice.v1.WorkflowService/QueryWorkflow": 1, + "/temporal.api.workflowservice.v1.WorkflowService/UpdateWorkflowExecution": 1, - // potentially long-running, depending on the operations + // These methods are blocking only if WaitNewEvent/LongPollToken are set, otherwise they are not. + "/temporal.api.workflowservice.v1.WorkflowService/DescribeNexusOperationExecution": 1, + "/temporal.api.workflowservice.v1.WorkflowService/GetWorkflowExecutionHistory": 1, + "/temporal.api.workflowservice.v1.WorkflowService/DescribeActivityExecution": 1, + + // Potentially long-running, depending on the operations. "/temporal.api.workflowservice.v1.WorkflowService/ExecuteMultiOperation": 1, // Dispatching a Nexus task is a potentially long running RPC, it's classified in the same bucket as QueryWorkflow. @@ -83,6 +83,7 @@ var ( "/temporal.api.workflowservice.v1.WorkflowService/CreateSchedule": 1, "/temporal.api.workflowservice.v1.WorkflowService/StartBatchOperation": 1, "/temporal.api.workflowservice.v1.WorkflowService/StartActivityExecution": 1, + "/temporal.api.workflowservice.v1.WorkflowService/StartNexusOperationExecution": 1, DispatchNexusTaskByNamespaceAndTaskQueueAPIName: 1, DispatchNexusTaskByEndpointAPIName: 1, @@ -129,6 +130,9 @@ var ( "/temporal.api.workflowservice.v1.WorkflowService/RequestCancelActivityExecution": 2, "/temporal.api.workflowservice.v1.WorkflowService/TerminateActivityExecution": 2, "/temporal.api.workflowservice.v1.WorkflowService/DeleteActivityExecution": 2, + "/temporal.api.workflowservice.v1.WorkflowService/RequestCancelNexusOperationExecution": 2, + "/temporal.api.workflowservice.v1.WorkflowService/TerminateNexusOperationExecution": 2, + "/temporal.api.workflowservice.v1.WorkflowService/DeleteNexusOperationExecution": 2, "/temporal.api.workflowservice.v1.WorkflowService/PauseWorkflowExecution": 2, "/temporal.api.workflowservice.v1.WorkflowService/UnpauseWorkflowExecution": 2, @@ -147,6 +151,7 @@ var ( "/temporal.api.workflowservice.v1.WorkflowService/GetCurrentDeployment": 3, // [cleanup-wv-pre-release] "/temporal.api.workflowservice.v1.WorkflowService/DescribeWorkerDeploymentVersion": 3, "/temporal.api.workflowservice.v1.WorkflowService/DescribeWorkerDeployment": 3, + "/temporal.api.workflowservice.v1.WorkflowService/DescribeNexusOperationExecution": 3, // P3: Progress APIs for reporting cancellations and failures. // They are relatively low priority as the tasks need to be retried anyway. @@ -158,6 +163,7 @@ var ( "/temporal.api.workflowservice.v1.WorkflowService/RespondNexusTaskFailed": 3, // P4: Poll APIs and other low priority APIs + "/temporal.api.workflowservice.v1.WorkflowService/PollNexusOperationExecution": 4, "/temporal.api.workflowservice.v1.WorkflowService/PollActivityExecution": 4, // TODO(saa-preview): should it be 4 or 3? "/temporal.api.workflowservice.v1.WorkflowService/PollWorkflowTaskQueue": 4, "/temporal.api.workflowservice.v1.WorkflowService/PollActivityTaskQueue": 4, @@ -194,6 +200,8 @@ var ( "/temporal.api.workflowservice.v1.WorkflowService/DescribeWorker": 1, "/temporal.api.workflowservice.v1.WorkflowService/CountActivityExecutions": 1, "/temporal.api.workflowservice.v1.WorkflowService/ListActivityExecutions": 1, + "/temporal.api.workflowservice.v1.WorkflowService/CountNexusOperationExecutions": 1, + "/temporal.api.workflowservice.v1.WorkflowService/ListNexusOperationExecutions": 1, // APIs that rely on visibility "/temporal.api.workflowservice.v1.WorkflowService/GetWorkerTaskReachability": 1, diff --git a/service/frontend/configs/quotas_test.go b/service/frontend/configs/quotas_test.go index d86ceac364..0da60158fc 100644 --- a/service/frontend/configs/quotas_test.go +++ b/service/frontend/configs/quotas_test.go @@ -106,6 +106,9 @@ func (s *quotasSuite) TestVisibilityAPIs() { "/temporal.api.workflowservice.v1.WorkflowService/CountActivityExecutions": {}, "/temporal.api.workflowservice.v1.WorkflowService/ListActivityExecutions": {}, + + "/temporal.api.workflowservice.v1.WorkflowService/CountNexusOperationExecutions": {}, + "/temporal.api.workflowservice.v1.WorkflowService/ListNexusOperationExecutions": {}, } var service workflowservice.WorkflowServiceServer diff --git a/service/frontend/fx.go b/service/frontend/fx.go index 41887e3571..dcc8e7ca36 100644 --- a/service/frontend/fx.go +++ b/service/frontend/fx.go @@ -8,6 +8,7 @@ import ( "go.temporal.io/server/api/adminservice/v1" "go.temporal.io/server/chasm" "go.temporal.io/server/chasm/lib/activity" + "go.temporal.io/server/chasm/lib/nexusoperation" "go.temporal.io/server/chasm/lib/scheduler/gen/schedulerpb/v1" "go.temporal.io/server/client" "go.temporal.io/server/common" @@ -115,6 +116,7 @@ var Module = fx.Options( fx.Provide(schedulerpb.NewSchedulerServiceLayeredClient), nexusfrontend.Module, activity.FrontendModule, + nexusoperation.FrontendModule, fx.Provide(visibility.ChasmVisibilityManagerProvider), fx.Provide(chasm.ChasmVisibilityInterceptorProvider), ) @@ -787,6 +789,7 @@ func HandlerProvider( healthInterceptor *interceptor.HealthInterceptor, scheduleSpecBuilder *scheduler.SpecBuilder, activityHandler activity.FrontendHandler, + nexusOperationHandler nexusoperation.FrontendHandler, registry *chasm.Registry, frontendServiceResolver membership.ServiceResolver, ) Handler { @@ -823,6 +826,7 @@ func HandlerProvider( scheduleSpecBuilder, httpEnabled(cfg, serviceName), activityHandler, + nexusOperationHandler, registry, workerDeploymentReadRateLimiter, ) diff --git a/service/frontend/workflow_handler.go b/service/frontend/workflow_handler.go index c6afbbcafa..c53b9a6ed9 100644 --- a/service/frontend/workflow_handler.go +++ b/service/frontend/workflow_handler.go @@ -36,6 +36,7 @@ import ( taskqueuespb "go.temporal.io/server/api/taskqueue/v1" "go.temporal.io/server/chasm" "go.temporal.io/server/chasm/lib/activity" + "go.temporal.io/server/chasm/lib/nexusoperation" chasmscheduler "go.temporal.io/server/chasm/lib/scheduler" "go.temporal.io/server/chasm/lib/scheduler/gen/schedulerpb/v1" "go.temporal.io/server/client/frontend" @@ -109,10 +110,16 @@ const ( ) type ( + // ActivityHandler is the activity frontend handler, aliased to avoid embedding name collision. + ActivityHandler = activity.FrontendHandler + // NexusOperationHandler is the nexus operation frontend handler, aliased to avoid embedding name collision. + NexusOperationHandler = nexusoperation.FrontendHandler + // WorkflowHandler - gRPC handler interface for workflowservice WorkflowHandler struct { workflowservice.UnsafeWorkflowServiceServer - activity.FrontendHandler + ActivityHandler + NexusOperationHandler status int32 @@ -178,15 +185,17 @@ func NewWorkflowHandler( scheduleSpecBuilder *scheduler.SpecBuilder, httpEnabled bool, activityHandler activity.FrontendHandler, + nexusOperationHandler nexusoperation.FrontendHandler, registry *chasm.Registry, workerDeploymentReadRateLimiter quotas.RequestRateLimiter, ) *WorkflowHandler { handler := &WorkflowHandler{ - FrontendHandler: activityHandler, - status: common.DaemonStatusInitialized, - config: config, - tokenSerializer: tasktoken.NewSerializer(), - versionChecker: headers.NewDefaultVersionChecker(), + ActivityHandler: activityHandler, + NexusOperationHandler: nexusOperationHandler, + status: common.DaemonStatusInitialized, + config: config, + tokenSerializer: tasktoken.NewSerializer(), + versionChecker: headers.NewDefaultVersionChecker(), namespaceHandler: newNamespaceHandler( logger, persistenceMetadataManager, diff --git a/service/frontend/workflow_handler_test.go b/service/frontend/workflow_handler_test.go index c202ddbddc..7d7e614ec2 100644 --- a/service/frontend/workflow_handler_test.go +++ b/service/frontend/workflow_handler_test.go @@ -194,6 +194,7 @@ func (s *WorkflowHandlerSuite) getWorkflowHandler(config *Config) *WorkflowHandl scheduler.NewSpecBuilder(), true, nil, // Not testing activity handler here + nil, // Not testing nexus operation handler here nil, quotas.NoopRequestRateLimiter, )