Skip to content

Commit 7634d5c

Browse files
committed
Add statuses action for batch handle queries
The existing status action only accepts a single handle, requiring callers to make separate RPC calls for each running job. Attempting to get this to work at scale is challenging, since when using MCollective's shared NATS connector, concurrent RPC calls from separate threads race on the singleton connection and @@request_sequence counter, causing lost responses. This new statuses action accepts an array of handles and returns status, stdout, stderr, and exitcode for each completed job in a single response. This lets callers like the upcoming OpenBolt's Choria transport poll and fetch output in one batched RPC call per round instead of N sequential per-target calls.
1 parent 92444ae commit 7634d5c

4 files changed

Lines changed: 127 additions & 0 deletions

File tree

files/mcollective/agent/shell.ddl

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,20 @@ action "list", :description => "Get a list of all running commands" do
127127

128128
end
129129

130+
action "statuses", :description => "Get status and output of multiple managed commands" do
131+
display :always
132+
133+
input :handles,
134+
:prompt => "Handles",
135+
:description => "Array of command handles to query",
136+
:type => :array,
137+
:optional => false
138+
139+
output :statuses,
140+
:description => "status and output keyed by handle",
141+
:display_as => "statuses"
142+
end
143+
130144
action "kill", :description => "Kill a command by handle" do
131145
display :always
132146

files/mcollective/agent/shell.json

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,27 @@
9696
"display": "always",
9797
"description": "Run a command"
9898
},
99+
{
100+
"action": "statuses",
101+
"input": {
102+
"handles": {
103+
"prompt": "Handles",
104+
"description": "Array of command handles to query",
105+
"type": "array",
106+
"default": null,
107+
"optional": false
108+
}
109+
},
110+
"output": {
111+
"statuses": {
112+
"description": "status and output keyed by handle",
113+
"display_as": "statuses",
114+
"default": null
115+
}
116+
},
117+
"display": "always",
118+
"description": "Get status and output of multiple managed commands"
119+
},
99120
{
100121
"action": "start",
101122
"input": {

files/mcollective/agent/shell.rb

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,26 @@ class Shell<RPC::Agent
2525
end
2626
end
2727

28+
action 'statuses' do
29+
handles = request[:handles]
30+
results = {}
31+
handles.each do |handle|
32+
begin
33+
job = Job.new(handle)
34+
entry = {
35+
:status => job.status,
36+
:stdout => job.stdout,
37+
:stderr => job.stderr,
38+
}
39+
entry[:exitcode] = job.exitcode if job.status == :stopped
40+
results[handle] = entry
41+
rescue StandardError => error
42+
results[handle] = { :status => :error, :error => error.message }
43+
end
44+
end
45+
reply[:statuses] = results
46+
end
47+
2848
action 'kill' do
2949
handle = request[:handle]
3050
job = Job.new(handle)

spec/unit/agent/shell_spec.rb

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,78 @@ module Agent
1818
end
1919
end
2020

21+
describe '#statuses' do
22+
let(:reply) { {} }
23+
24+
before :each do
25+
agent.stubs(:reply).returns(reply)
26+
@tmpdir = Dir.mktmpdir
27+
Shell::Job.stubs(:state_path).returns(@tmpdir)
28+
end
29+
30+
after :each do
31+
FileUtils.remove_entry_secure @tmpdir
32+
end
33+
34+
it 'should return stdout, stderr, and exitcode for stopped jobs' do
35+
job = Shell::Job.new
36+
job.start_command('echo foo')
37+
job.wait_for_process
38+
39+
agent.call(:statuses, :handles => [job.handle])
40+
statuses = reply[:statuses]
41+
statuses.should have_key(job.handle)
42+
statuses[job.handle][:status].should == :stopped
43+
statuses[job.handle][:stdout].should == "foo\n"
44+
statuses[job.handle][:stderr].should == ''
45+
statuses[job.handle][:exitcode].should == 0
46+
end
47+
48+
it 'should return stdout and stderr for running jobs' do
49+
job = Shell::Job.new
50+
job.start_command(%{ruby -e 'STDOUT.sync = true; puts "partial"; sleep 60'})
51+
sleep 0.5
52+
53+
agent.call(:statuses, :handles => [job.handle])
54+
statuses = reply[:statuses]
55+
statuses[job.handle][:status].should == :running
56+
statuses[job.handle][:stdout].should == "partial\n"
57+
statuses[job.handle][:stderr].should == ''
58+
statuses[job.handle].should_not have_key(:exitcode)
59+
60+
job.kill
61+
end
62+
63+
it 'should return error for invalid handle without affecting valid handles' do
64+
job = Shell::Job.new
65+
job.start_command('echo good')
66+
job.wait_for_process
67+
68+
agent.call(:statuses, :handles => [job.handle, 'nonexistent-handle'])
69+
statuses = reply[:statuses]
70+
statuses[job.handle][:status].should == :stopped
71+
statuses[job.handle][:stdout].should == "good\n"
72+
statuses['nonexistent-handle'][:status].should == :error
73+
statuses['nonexistent-handle'].should have_key(:error)
74+
end
75+
76+
it 'should handle multiple handles in one call' do
77+
job_one = Shell::Job.new
78+
job_one.start_command('echo one')
79+
job_one.wait_for_process
80+
81+
job_two = Shell::Job.new
82+
job_two.start_command('echo two')
83+
job_two.wait_for_process
84+
85+
agent.call(:statuses, :handles => [job_one.handle, job_two.handle])
86+
statuses = reply[:statuses]
87+
statuses.keys.size.should == 2
88+
statuses[job_one.handle][:stdout].should == "one\n"
89+
statuses[job_two.handle][:stdout].should == "two\n"
90+
end
91+
end
92+
2193
describe '#run_command' do
2294
let(:reply) { {} }
2395

0 commit comments

Comments
 (0)