Skip to content

Commit 3dfe3dc

Browse files
feat(channels): Implement Task 3.1 - ChannelManager & Hosting Infrastructure
- Add ChannelStatus class in ClawSharp.Core.Channels - Implement ChannelManager class implementing IHostedService - Manages channel lifecycle (start/stop) - Routes incoming messages to message bus - Tracks status for each channel - Add unit tests following TDD approach Closes #46
1 parent 626dd1f commit 3dfe3dc

File tree

5 files changed

+341
-5
lines changed

5 files changed

+341
-5
lines changed
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
using ClawSharp.Core.Channels;
2+
using Microsoft.Extensions.Hosting;
3+
using Microsoft.Extensions.Logging;
4+
5+
namespace ClawSharp.Channels;
6+
7+
/// <summary>
8+
/// Manages the lifecycle of communication channels and routes messages to the message bus.
9+
/// </summary>
10+
public sealed class ChannelManager : IHostedService
11+
{
12+
private readonly IReadOnlyList<IChannel> _channels;
13+
private readonly IMessageBus _messageBus;
14+
private readonly ILogger<ChannelManager> _logger;
15+
private readonly List<ChannelStatus> _statuses = new();
16+
private readonly object _lock = new();
17+
private bool _isRunning;
18+
19+
public IReadOnlyList<ChannelStatus> Statuses
20+
{
21+
get
22+
{
23+
lock (_lock)
24+
{
25+
return _statuses.ToList();
26+
}
27+
}
28+
}
29+
30+
public ChannelManager(
31+
IEnumerable<IChannel> channels,
32+
IMessageBus messageBus,
33+
ILogger<ChannelManager> logger)
34+
{
35+
_channels = channels?.ToList() ?? throw new ArgumentNullException(nameof(channels));
36+
_messageBus = messageBus ?? throw new ArgumentNullException(nameof(messageBus));
37+
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
38+
39+
// Initialize statuses for each channel
40+
foreach (var channel in _channels)
41+
{
42+
_statuses.Add(new ChannelStatus
43+
{
44+
Name = channel.Name,
45+
IsRunning = false
46+
});
47+
}
48+
}
49+
50+
public async Task StartAsync(CancellationToken cancellationToken)
51+
{
52+
_logger.LogInformation("Starting ChannelManager with {ChannelCount} channels", _channels.Count);
53+
_isRunning = true;
54+
55+
foreach (var channel in _channels)
56+
{
57+
var status = GetStatus(channel.Name);
58+
try
59+
{
60+
_logger.LogInformation("Starting channel {ChannelName}", channel.Name);
61+
62+
// Subscribe to incoming messages
63+
channel.OnMessage += HandleChannelMessage;
64+
65+
await channel.StartAsync(cancellationToken);
66+
67+
status.IsRunning = true;
68+
status.LastStarted = DateTimeOffset.UtcNow;
69+
status.ErrorMessage = null;
70+
71+
_logger.LogInformation("Channel {ChannelName} started successfully", channel.Name);
72+
}
73+
catch (Exception ex)
74+
{
75+
_logger.LogError(ex, "Failed to start channel {ChannelName}", channel.Name);
76+
status.IsRunning = false;
77+
status.ErrorMessage = ex.Message;
78+
}
79+
}
80+
}
81+
82+
public async Task StopAsync(CancellationToken cancellationToken)
83+
{
84+
_logger.LogInformation("Stopping ChannelManager");
85+
_isRunning = false;
86+
87+
foreach (var channel in _channels)
88+
{
89+
var status = GetStatus(channel.Name);
90+
try
91+
{
92+
_logger.LogInformation("Stopping channel {ChannelName}", channel.Name);
93+
94+
await channel.StopAsync(cancellationToken);
95+
96+
// Unsubscribe from messages
97+
channel.OnMessage -= HandleChannelMessage;
98+
99+
status.IsRunning = false;
100+
101+
_logger.LogInformation("Channel {ChannelName} stopped successfully", channel.Name);
102+
}
103+
catch (Exception ex)
104+
{
105+
_logger.LogError(ex, "Error stopping channel {ChannelName}", channel.Name);
106+
status.ErrorMessage = ex.Message;
107+
}
108+
}
109+
}
110+
111+
private Task HandleChannelMessage(ChannelMessage message)
112+
{
113+
if (!_isRunning)
114+
{
115+
_logger.LogWarning("Received message but ChannelManager is not running. Message: {MessageId}", message.Id);
116+
return Task.CompletedTask;
117+
}
118+
119+
_logger.LogDebug("Received message {MessageId} from channel {ChannelName}", message.Id, message.Channel);
120+
121+
// Publish to message bus for the agent loop to consume
122+
return _messageBus.PublishAsync(message);
123+
}
124+
125+
private ChannelStatus GetStatus(string channelName)
126+
{
127+
lock (_lock)
128+
{
129+
return _statuses.First(s => s.Name == channelName);
130+
}
131+
}
132+
}
Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,19 @@
1-
<Project Sdk="Microsoft.NET.Sdk">
2-
3-
<ItemGroup>
4-
<ProjectReference Include="..\ClawSharp.Core\ClawSharp.Core.csproj" />
5-
</ItemGroup>
1+
<Project Sdk="Microsoft.NET.Sdk">
62

73
<PropertyGroup>
84
<ImplicitUsings>enable</ImplicitUsings>
95
<Nullable>enable</Nullable>
106
</PropertyGroup>
117

8+
<ItemGroup>
9+
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="10.0.3" />
10+
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="10.0.3" />
11+
<PackageReference Include="Telegram.Bot" Version="22.0.0" />
12+
</ItemGroup>
13+
14+
<ItemGroup>
15+
<ProjectReference Include="..\ClawSharp.Core\ClawSharp.Core.csproj" />
16+
<ProjectReference Include="..\ClawSharp.Infrastructure\ClawSharp.Infrastructure.csproj" />
17+
</ItemGroup>
18+
1219
</Project>
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
namespace ClawSharp.Core.Channels;
2+
3+
/// <summary>
4+
/// Represents the status of a channel.
5+
/// </summary>
6+
public sealed class ChannelStatus
7+
{
8+
public required string Name { get; init; }
9+
public required bool IsRunning { get; set; }
10+
public string? ErrorMessage { get; set; }
11+
public DateTimeOffset? LastStarted { get; set; }
12+
}
Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
using ClawSharp.Channels;
2+
using ClawSharp.Core.Channels;
3+
using ClawSharp.Infrastructure.Messaging;
4+
using FluentAssertions;
5+
using Microsoft.Extensions.Logging.Abstractions;
6+
using NSubstitute;
7+
8+
namespace ClawSharp.Core.Tests.Channels;
9+
10+
public class ChannelManagerTests
11+
{
12+
private sealed class TestChannel : IChannel
13+
{
14+
public string Name { get; init; } = "test";
15+
public event Func<ChannelMessage, Task>? OnMessage;
16+
17+
public Task StartAsync(CancellationToken ct) => Task.CompletedTask;
18+
public Task StopAsync(CancellationToken ct) => Task.CompletedTask;
19+
public Task SendAsync(OutboundMessage message, CancellationToken ct = default) => Task.CompletedTask;
20+
21+
public Task SimulateMessage(ChannelMessage message) => OnMessage?.Invoke(message) ?? Task.CompletedTask;
22+
}
23+
24+
[Fact]
25+
public async Task StartAsync_StartsAllChannels()
26+
{
27+
// Arrange
28+
var ch1 = Substitute.For<IChannel>();
29+
ch1.Name.Returns("telegram");
30+
var ch2 = Substitute.For<IChannel>();
31+
ch2.Name.Returns("discord");
32+
33+
var manager = new ChannelManager(
34+
new[] { ch1, ch2 },
35+
new InProcessMessageBus(),
36+
NullLogger<ChannelManager>.Instance);
37+
38+
// Act
39+
await manager.StartAsync(CancellationToken.None);
40+
41+
// Assert
42+
await ch1.Received(1).StartAsync(Arg.Any<CancellationToken>());
43+
await ch2.Received(1).StartAsync(Arg.Any<CancellationToken>());
44+
}
45+
46+
[Fact]
47+
public async Task StartAsync_FailedChannel_ContinuesOthers()
48+
{
49+
// Arrange
50+
var ch1 = Substitute.For<IChannel>();
51+
ch1.Name.Returns("bad");
52+
ch1.StartAsync(Arg.Any<CancellationToken>()).Returns(x => throw new Exception("fail"));
53+
54+
var ch2 = Substitute.For<IChannel>();
55+
ch2.Name.Returns("good");
56+
57+
var manager = new ChannelManager(
58+
new[] { ch1, ch2 },
59+
new InProcessMessageBus(),
60+
NullLogger<ChannelManager>.Instance);
61+
62+
// Act
63+
await manager.StartAsync(CancellationToken.None);
64+
65+
// Assert
66+
await ch2.Received(1).StartAsync(Arg.Any<CancellationToken>());
67+
manager.Statuses.Should().Contain(s => s.Name == "bad" && !s.IsRunning);
68+
manager.Statuses.Should().Contain(s => s.Name == "good" && s.IsRunning);
69+
}
70+
71+
[Fact]
72+
public async Task StopAsync_StopsAllChannels()
73+
{
74+
// Arrange
75+
var ch1 = Substitute.For<IChannel>();
76+
ch1.Name.Returns("telegram");
77+
78+
var manager = new ChannelManager(
79+
new[] { ch1 },
80+
new InProcessMessageBus(),
81+
NullLogger<ChannelManager>.Instance);
82+
83+
await manager.StartAsync(CancellationToken.None);
84+
85+
// Act
86+
await manager.StopAsync(CancellationToken.None);
87+
88+
// Assert
89+
await ch1.Received(1).StopAsync(Arg.Any<CancellationToken>());
90+
}
91+
92+
[Fact]
93+
public async Task OnMessage_PublishesToMessageBus()
94+
{
95+
// Arrange
96+
var ch = new TestChannel { Name = "test" };
97+
98+
var bus = new InProcessMessageBus();
99+
ChannelMessage? received = null;
100+
bus.Subscribe<ChannelMessage>(msg =>
101+
{
102+
received = msg;
103+
return Task.CompletedTask;
104+
});
105+
106+
var manager = new ChannelManager(
107+
new[] { ch },
108+
bus,
109+
NullLogger<ChannelManager>.Instance);
110+
111+
await manager.StartAsync(CancellationToken.None);
112+
113+
// Act
114+
var testMessage = new ChannelMessage(
115+
"1", "user", "hi", "test", "chat", DateTimeOffset.UtcNow);
116+
117+
// Simulate channel receiving a message
118+
await ch.SimulateMessage(testMessage);
119+
120+
// Wait a bit for the message to propagate
121+
await Task.Delay(100);
122+
123+
// Assert
124+
received.Should().NotBeNull();
125+
received!.Id.Should().Be("1");
126+
received.Content.Should().Be("hi");
127+
}
128+
129+
[Fact]
130+
public void Statuses_InitiallyEmpty()
131+
{
132+
// Arrange & Act
133+
var manager = new ChannelManager(
134+
Array.Empty<IChannel>(),
135+
new InProcessMessageBus(),
136+
NullLogger<ChannelManager>.Instance);
137+
138+
// Assert
139+
manager.Statuses.Should().BeEmpty();
140+
}
141+
142+
[Fact]
143+
public async Task StartAsync_SetsRunningStatus()
144+
{
145+
// Arrange
146+
var ch = Substitute.For<IChannel>();
147+
ch.Name.Returns("telegram");
148+
149+
var manager = new ChannelManager(
150+
new[] { ch },
151+
new InProcessMessageBus(),
152+
NullLogger<ChannelManager>.Instance);
153+
154+
// Act
155+
await manager.StartAsync(CancellationToken.None);
156+
157+
// Assert
158+
manager.Statuses.Should().Contain(s => s.Name == "telegram" && s.IsRunning);
159+
}
160+
161+
[Fact]
162+
public async Task StopAsync_ClearsRunningStatus()
163+
{
164+
// Arrange
165+
var ch = Substitute.For<IChannel>();
166+
ch.Name.Returns("telegram");
167+
168+
var manager = new ChannelManager(
169+
new[] { ch },
170+
new InProcessMessageBus(),
171+
NullLogger<ChannelManager>.Instance);
172+
173+
await manager.StartAsync(CancellationToken.None);
174+
175+
// Act
176+
await manager.StopAsync(CancellationToken.None);
177+
178+
// Assert
179+
manager.Statuses.Should().Contain(s => s.Name == "telegram" && !s.IsRunning);
180+
}
181+
}

tests/ClawSharp.Core.Tests/ClawSharp.Core.Tests.csproj

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@
99
<ItemGroup>
1010
<PackageReference Include="coverlet.collector" Version="6.0.4" />
1111
<PackageReference Include="FluentAssertions" Version="8.8.0" />
12+
<PackageReference Include="Microsoft.Extensions.Logging" Version="10.0.3" />
13+
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="10.0.3" />
1214
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.14.1" />
15+
<PackageReference Include="NSubstitute" Version="5.3.0" />
1316
<PackageReference Include="xunit" Version="2.9.3" />
1417
<PackageReference Include="xunit.runner.visualstudio" Version="3.1.4" />
1518
</ItemGroup>
@@ -19,6 +22,7 @@
1922
</ItemGroup>
2023

2124
<ItemGroup>
25+
<ProjectReference Include="..\..\src\ClawSharp.Channels\ClawSharp.Channels.csproj" />
2226
<ProjectReference Include="..\..\src\ClawSharp.Core\ClawSharp.Core.csproj" />
2327
<ProjectReference Include="..\..\src\ClawSharp.Infrastructure\ClawSharp.Infrastructure.csproj" />
2428
<ProjectReference Include="..\ClawSharp.TestHelpers\ClawSharp.TestHelpers.csproj" />

0 commit comments

Comments
 (0)