Skip to content

Commit 543e7c6

Browse files
authored
Ensure in progress requests are processed before resetting the gRPC connection with the management plane (#1391)
* Ensure in progress requests are processed before resetting the gRPC connection with the management plane * Add more debug logging * Update NGINX process parser to handle worker processes that look like master processes * Add more unit tests * Fix tests
1 parent e951ca1 commit 543e7c6

File tree

12 files changed

+202
-41
lines changed

12 files changed

+202
-41
lines changed

internal/command/command_plugin.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,9 @@ func (cp *CommandPlugin) Process(ctx context.Context, msg *bus.Message) {
120120
if logger.ServerType(ctxWithMetadata) == cp.commandServerType.String() {
121121
switch msg.Topic {
122122
case bus.ConnectionResetTopic:
123-
cp.processConnectionReset(ctxWithMetadata, msg)
123+
// Running as a separate go routine so that the command plugin can continue to process data plane responses
124+
// while the connection reset is in progress
125+
go cp.processConnectionReset(ctxWithMetadata, msg)
124126
case bus.ResourceUpdateTopic:
125127
cp.processResourceUpdate(ctxWithMetadata, msg)
126128
case bus.InstanceHealthTopic:
@@ -254,11 +256,19 @@ func (cp *CommandPlugin) processConnectionReset(ctx context.Context, msg *bus.Me
254256
slog.DebugContext(ctx, "Command plugin received connection reset message")
255257

256258
if newConnection, ok := msg.Data.(grpc.GrpcConnectionInterface); ok {
257-
slog.DebugContext(ctx, "Canceling Subscribe after connection reset")
258259
ctxWithMetadata := cp.config.NewContextWithLabels(ctx)
259260
cp.subscribeMutex.Lock()
260261
defer cp.subscribeMutex.Unlock()
261262

263+
// Update the command service with the new client first
264+
err := cp.commandService.UpdateClient(ctxWithMetadata, newConnection.CommandServiceClient())
265+
if err != nil {
266+
slog.ErrorContext(ctx, "Failed to reset connection", "error", err)
267+
return
268+
}
269+
270+
// Once the command service is updated, we close the old connection
271+
slog.DebugContext(ctx, "Canceling Subscribe after connection reset")
262272
if cp.subscribeCancel != nil {
263273
cp.subscribeCancel()
264274
slog.DebugContext(ctxWithMetadata, "Successfully canceled subscribe after connection reset")
@@ -270,12 +280,6 @@ func (cp *CommandPlugin) processConnectionReset(ctx context.Context, msg *bus.Me
270280
}
271281

272282
cp.conn = newConnection
273-
err := cp.commandService.UpdateClient(ctx, cp.conn.CommandServiceClient())
274-
if err != nil {
275-
slog.ErrorContext(ctx, "Failed to reset connection", "error", err)
276-
return
277-
}
278-
279283
slog.DebugContext(ctxWithMetadata, "Starting new subscribe after connection reset")
280284
subscribeCtx, cp.subscribeCancel = context.WithCancel(ctxWithMetadata)
281285
go cp.commandService.Subscribe(subscribeCtx)

internal/command/command_plugin_test.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,14 @@ func TestCommandPlugin_Process(t *testing.T) {
164164
Topic: bus.ConnectionResetTopic,
165165
Data: commandPlugin.conn,
166166
})
167-
require.Equal(t, 1, fakeCommandService.UpdateClientCallCount())
167+
168+
// Separate goroutine is executed so need to wait for it to complete
169+
assert.Eventually(
170+
t,
171+
func() bool { return fakeCommandService.UpdateClientCallCount() == 1 },
172+
2*time.Second,
173+
10*time.Millisecond,
174+
)
168175
}
169176

170177
func TestCommandPlugin_monitorSubscribeChannel(t *testing.T) {

internal/command/command_service.go

Lines changed: 57 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"log/slog"
1313
"sync"
1414
"sync/atomic"
15+
"time"
1516

1617
"google.golang.org/grpc/codes"
1718
"google.golang.org/grpc/status"
@@ -31,18 +32,20 @@ import (
3132

3233
var _ commandService = (*CommandService)(nil)
3334

34-
const (
35-
createConnectionMaxElapsedTime = 0
36-
)
35+
const createConnectionMaxElapsedTime = 0
36+
37+
var timeToWaitBetweenChecks = 5 * time.Second
3738

3839
type (
3940
CommandService struct {
4041
commandServiceClient mpi.CommandServiceClient
4142
subscribeClient mpi.CommandService_SubscribeClient
4243
agentConfig *config.Config
4344
isConnected *atomic.Bool
45+
connectionResetInProgress *atomic.Bool
4446
subscribeChannel chan *mpi.ManagementPlaneRequest
4547
configApplyRequestQueue map[string][]*mpi.ManagementPlaneRequest // key is the instance ID
48+
requestsInProgress map[string]*mpi.ManagementPlaneRequest // key is the correlation ID
4649
resource *mpi.Resource
4750
subscribeClientMutex sync.Mutex
4851
configApplyRequestQueueMutex sync.Mutex
@@ -56,19 +59,16 @@ func NewCommandService(
5659
agentConfig *config.Config,
5760
subscribeChannel chan *mpi.ManagementPlaneRequest,
5861
) *CommandService {
59-
isConnected := &atomic.Bool{}
60-
isConnected.Store(false)
61-
62-
commandService := &CommandService{
63-
commandServiceClient: commandServiceClient,
64-
agentConfig: agentConfig,
65-
isConnected: isConnected,
66-
subscribeChannel: subscribeChannel,
67-
configApplyRequestQueue: make(map[string][]*mpi.ManagementPlaneRequest),
68-
resource: &mpi.Resource{},
62+
return &CommandService{
63+
commandServiceClient: commandServiceClient,
64+
agentConfig: agentConfig,
65+
isConnected: &atomic.Bool{},
66+
connectionResetInProgress: &atomic.Bool{},
67+
subscribeChannel: subscribeChannel,
68+
configApplyRequestQueue: make(map[string][]*mpi.ManagementPlaneRequest),
69+
resource: &mpi.Resource{},
70+
requestsInProgress: make(map[string]*mpi.ManagementPlaneRequest),
6971
}
70-
71-
return commandService
7272
}
7373

7474
func (cs *CommandService) IsConnected() bool {
@@ -181,6 +181,11 @@ func (cs *CommandService) SendDataPlaneResponse(ctx context.Context, response *m
181181
return err
182182
}
183183

184+
if response.GetCommandResponse().GetStatus() == mpi.CommandResponse_COMMAND_STATUS_OK ||
185+
response.GetCommandResponse().GetStatus() == mpi.CommandResponse_COMMAND_STATUS_FAILURE {
186+
delete(cs.requestsInProgress, response.GetMessageMeta().GetCorrelationId())
187+
}
188+
184189
return backoff.Retry(
185190
cs.sendDataPlaneResponseCallback(ctx, response),
186191
backoffHelpers.Context(backOffCtx, cs.agentConfig.Client.Backoff),
@@ -272,6 +277,33 @@ func (cs *CommandService) CreateConnection(
272277
}
273278

274279
func (cs *CommandService) UpdateClient(ctx context.Context, client mpi.CommandServiceClient) error {
280+
cs.connectionResetInProgress.Store(true)
281+
defer cs.connectionResetInProgress.Store(false)
282+
283+
// Wait for any in-progress requests to complete before updating the client
284+
start := time.Now()
285+
286+
for len(cs.requestsInProgress) > 0 {
287+
if time.Since(start) >= cs.agentConfig.Client.Grpc.ConnectionResetTimeout {
288+
slog.WarnContext(
289+
ctx,
290+
"Timeout reached while waiting for in-progress requests to complete",
291+
"number_of_requests_in_progress", len(cs.requestsInProgress),
292+
)
293+
294+
break
295+
}
296+
297+
slog.InfoContext(
298+
ctx,
299+
"Waiting for in-progress requests to complete before updating command service gRPC client",
300+
"max_wait_time", cs.agentConfig.Client.Grpc.ConnectionResetTimeout,
301+
"number_of_requests_in_progress", len(cs.requestsInProgress),
302+
)
303+
304+
time.Sleep(timeToWaitBetweenChecks)
305+
}
306+
275307
cs.subscribeClientMutex.Lock()
276308
cs.commandServiceClient = client
277309
cs.subscribeClientMutex.Unlock()
@@ -379,7 +411,7 @@ func (cs *CommandService) sendResponseForQueuedConfigApplyRequests(
379411
cs.configApplyRequestQueue[instanceID] = cs.configApplyRequestQueue[instanceID][indexOfConfigApplyRequest+1:]
380412
slog.DebugContext(ctx, "Removed config apply requests from queue", "queue", cs.configApplyRequestQueue[instanceID])
381413

382-
if len(cs.configApplyRequestQueue[instanceID]) > 0 {
414+
if len(cs.configApplyRequestQueue[instanceID]) > 0 && !cs.connectionResetInProgress.Load() {
383415
cs.subscribeChannel <- cs.configApplyRequestQueue[instanceID][len(cs.configApplyRequestQueue[instanceID])-1]
384416
}
385417

@@ -423,6 +455,12 @@ func (cs *CommandService) dataPlaneHealthCallback(
423455
//nolint:revive // cognitive complexity is 18
424456
func (cs *CommandService) receiveCallback(ctx context.Context) func() error {
425457
return func() error {
458+
if cs.connectionResetInProgress.Load() {
459+
slog.DebugContext(ctx, "Connection reset in progress, skipping receive from subscribe stream")
460+
461+
return nil
462+
}
463+
426464
cs.subscribeClientMutex.Lock()
427465

428466
if cs.subscribeClient == nil {
@@ -463,6 +501,8 @@ func (cs *CommandService) receiveCallback(ctx context.Context) func() error {
463501
default:
464502
cs.subscribeChannel <- request
465503
}
504+
505+
cs.requestsInProgress[request.GetMessageMeta().GetCorrelationId()] = request
466506
}
467507

468508
return nil
@@ -495,7 +535,7 @@ func (cs *CommandService) queueConfigApplyRequests(ctx context.Context, request
495535

496536
instanceID := request.GetConfigApplyRequest().GetOverview().GetConfigVersion().GetInstanceId()
497537
cs.configApplyRequestQueue[instanceID] = append(cs.configApplyRequestQueue[instanceID], request)
498-
if len(cs.configApplyRequestQueue[instanceID]) == 1 {
538+
if len(cs.configApplyRequestQueue[instanceID]) == 1 && !cs.connectionResetInProgress.Load() {
499539
cs.subscribeChannel <- request
500540
} else {
501541
slog.DebugContext(

internal/command/command_service_test.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"google.golang.org/protobuf/types/known/timestamppb"
1919

2020
"github.com/nginx/agent/v3/internal/logger"
21+
"github.com/nginx/agent/v3/pkg/id"
2122
"github.com/nginx/agent/v3/test/helpers"
2223
"github.com/nginx/agent/v3/test/stub"
2324

@@ -211,6 +212,37 @@ func TestCommandService_UpdateClient(t *testing.T) {
211212
assert.NotNil(t, commandService.commandServiceClient)
212213
}
213214

215+
func TestCommandService_UpdateClient_requestInProgress(t *testing.T) {
216+
commandServiceClient := &v1fakes.FakeCommandServiceClient{}
217+
ctx := context.Background()
218+
219+
commandService := NewCommandService(
220+
commandServiceClient,
221+
types.AgentConfig(),
222+
make(chan *mpi.ManagementPlaneRequest),
223+
)
224+
225+
instanceID := id.GenerateMessageID()
226+
227+
commandService.requestsInProgress[instanceID] = &mpi.ManagementPlaneRequest{}
228+
timeToWaitBetweenChecks = 100 * time.Millisecond
229+
230+
wg := sync.WaitGroup{}
231+
wg.Add(1)
232+
233+
var updateClientErr error
234+
235+
go func() {
236+
updateClientErr = commandService.UpdateClient(ctx, commandServiceClient)
237+
wg.Done()
238+
}()
239+
240+
wg.Wait()
241+
242+
require.NoError(t, updateClientErr)
243+
assert.NotNil(t, commandService.commandServiceClient)
244+
}
245+
214246
func TestCommandService_UpdateDataPlaneHealth(t *testing.T) {
215247
ctx := context.Background()
216248
commandServiceClient := &v1fakes.FakeCommandServiceClient{}

internal/config/config.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -623,6 +623,12 @@ func registerClientFlags(fs *flag.FlagSet) {
623623
"File chunk size in bytes.",
624624
)
625625

626+
fs.Duration(
627+
ClientGRPCConnectionResetTimeoutKey,
628+
DefGRPCConnectionResetTimeout,
629+
"Duration to wait for in-progress management plane requests to complete before resetting the gRPC connection.",
630+
)
631+
626632
fs.Uint32(
627633
ClientGRPCMaxFileSizeKey,
628634
DefMaxFileSize,
@@ -1119,6 +1125,7 @@ func resolveClient() *Client {
11191125
FileChunkSize: viperInstance.GetUint32(ClientGRPCFileChunkSizeKey),
11201126
ResponseTimeout: viperInstance.GetDuration(ClientGRPCResponseTimeoutKey),
11211127
MaxParallelFileOperations: viperInstance.GetInt(ClientGRPCMaxParallelFileOperationsKey),
1128+
ConnectionResetTimeout: viperInstance.GetDuration(ClientGRPCConnectionResetTimeoutKey),
11221129
},
11231130
Backoff: &BackOff{
11241131
InitialInterval: viperInstance.GetDuration(ClientBackoffInitialIntervalKey),

internal/config/defaults.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,14 @@ const (
6060
DefAuxiliaryCommandTLServerNameKey = ""
6161

6262
// Client GRPC Settings
63-
DefMaxMessageSize = 0 // 0 = unset
64-
DefMaxMessageRecieveSize = 4194304 // default 4 MB
65-
DefMaxMessageSendSize = 4194304 // default 4 MB
66-
DefMaxFileSize uint32 = 1048576 // 1MB
67-
DefFileChunkSize uint32 = 524288 // 0.5MB
68-
DefMaxParallelFileOperations = 5
69-
DefResponseTimeout = 10 * time.Second
63+
DefMaxMessageSize = 0 // 0 = unset
64+
DefMaxMessageRecieveSize = 4194304 // default 4 MB
65+
DefMaxMessageSendSize = 4194304 // default 4 MB
66+
DefMaxFileSize uint32 = 1048576 // 1MB
67+
DefFileChunkSize uint32 = 524288 // 0.5MB
68+
DefMaxParallelFileOperations = 5
69+
DefResponseTimeout = 10 * time.Second
70+
DefGRPCConnectionResetTimeout = 3 * time.Minute
7071

7172
// Client HTTP Settings
7273
DefHTTPTimeout = 10 * time.Second

internal/config/flags.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ var (
4141
ClientGRPCMaxFileSizeKey = pre(ClientRootKey) + "grpc_max_file_size"
4242
ClientGRPCFileChunkSizeKey = pre(ClientRootKey) + "grpc_file_chunk_size"
4343
ClientGRPCMaxParallelFileOperationsKey = pre(ClientRootKey) + "grpc_max_parallel_file_operations"
44+
ClientGRPCConnectionResetTimeoutKey = pre(ClientRootKey) + "grpc_connection_reset_timeout"
4445
ClientGRPCResponseTimeoutKey = pre(ClientRootKey) + "grpc_response_timeout"
4546

4647
ClientBackoffInitialIntervalKey = pre(ClientRootKey) + "backoff_initial_interval"

internal/config/types.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -97,12 +97,13 @@ type (
9797
ResponseTimeout time.Duration `yaml:"response_timeout" mapstructure:"response_timeout"`
9898
// if MaxMessageSize is size set then we use that value,
9999
// otherwise MaxMessageRecieveSize and MaxMessageSendSize for individual settings
100-
MaxMessageSize int `yaml:"max_message_size" mapstructure:"max_message_size"`
101-
MaxMessageReceiveSize int `yaml:"max_message_receive_size" mapstructure:"max_message_receive_size"`
102-
MaxMessageSendSize int `yaml:"max_message_send_size" mapstructure:"max_message_send_size"`
103-
MaxFileSize uint32 `yaml:"max_file_size" mapstructure:"max_file_size"`
104-
FileChunkSize uint32 `yaml:"file_chunk_size" mapstructure:"file_chunk_size"`
105-
MaxParallelFileOperations int `yaml:"max_parallel_file_operations" mapstructure:"max_parallel_file_operations"`
100+
MaxMessageSize int `yaml:"max_message_size" mapstructure:"max_message_size"`
101+
MaxMessageReceiveSize int `yaml:"max_message_receive_size" mapstructure:"max_message_receive_size"`
102+
MaxMessageSendSize int `yaml:"max_message_send_size" mapstructure:"max_message_send_size"`
103+
MaxFileSize uint32 `yaml:"max_file_size" mapstructure:"max_file_size"`
104+
FileChunkSize uint32 `yaml:"file_chunk_size" mapstructure:"file_chunk_size"`
105+
MaxParallelFileOperations int `yaml:"max_parallel_file_operations" mapstructure:"max_parallel_file_operations"`
106+
ConnectionResetTimeout time.Duration `yaml:"connection_reset_timeout" mapstructure:"connection_reset_timeout"`
106107
}
107108

108109
KeepAlive struct {

internal/resource/nginx_instance_process_operator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func (p *NginxInstanceProcessOperator) FindParentProcessID(ctx context.Context,
7474
}
7575
processInstanceID := id.Generate("%s_%s_%s", info.ExePath, info.ConfPath, info.Prefix)
7676
if instanceID == processInstanceID {
77-
slog.DebugContext(ctx, "Found NGINX process ID", "process_id", processInstanceID)
77+
slog.DebugContext(ctx, "Found NGINX process ID", "instance_id", processInstanceID)
7878
return proc.PID, nil
7979
}
8080
}

internal/watcher/instance/nginx_process_parser.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,26 @@ func NewNginxProcessParser() *NginxProcessParser {
4646
// cognitive complexity of 16 because of the if statements in the for loop
4747
// don't think can be avoided due to the need for continue
4848
//
49-
//nolint:revive // cognitive complexity of 20 because of the if statements in the for loop
49+
//nolint:revive,gocognit // cognitive complexity of 20 because of the if statements in the for loop
5050
func (npp *NginxProcessParser) Parse(ctx context.Context, processes []*nginxprocess.Process) map[string]*mpi.Instance {
51+
slog.DebugContext(ctx, "Parsing NGINX processes", "number_of_processes", len(processes))
52+
5153
instanceMap := make(map[string]*mpi.Instance) // key is instanceID
5254
workers := make(map[int32][]*mpi.InstanceChild) // key is ppid of process
5355

5456
processesByPID := convertToMap(processes)
5557

5658
for _, proc := range processesByPID {
59+
slog.DebugContext(ctx, "NGINX process details",
60+
"ppid", proc.PPID,
61+
"pid", proc.PID,
62+
"name", proc.Name,
63+
"created", proc.Created,
64+
"status", proc.Status,
65+
"cmd", proc.Cmd,
66+
"exe", proc.Exe,
67+
)
68+
5769
if proc.IsWorker() {
5870
// Here we are determining if the worker process has a master
5971
if masterProcess, ok := processesByPID[proc.PPID]; ok {
@@ -90,6 +102,15 @@ func (npp *NginxProcessParser) Parse(ctx context.Context, processes []*nginxproc
90102

91103
// check if proc is a master process, process is not a worker but could be cache manager etc
92104
if proc.IsMaster() {
105+
// sometimes a master process can have another master as parent
106+
// which means that it is actually a worker process and not a master process
107+
if masterProcess, ok := processesByPID[proc.PPID]; ok {
108+
workers[masterProcess.PID] = append(workers[masterProcess.PID],
109+
&mpi.InstanceChild{ProcessId: proc.PID})
110+
111+
continue
112+
}
113+
93114
nginxInfo, err := npp.info(ctx, proc)
94115
if err != nil {
95116
slog.DebugContext(ctx, "Unable to get NGINX info", "pid", proc.PID, "error", err)

0 commit comments

Comments
 (0)