The official MS docs on streaming consumption are a bit misleading in places but, based on a real deployment of Orleans that uses Implicit subscriptions, a grain should be implemented as follows:
[ImplicitStreamSubscription("MyStreamProvider")]
public class MyGrain : Grain, IStreamSubscriptionObserver, IGrainWithStringKey, IAsyncObserver<string>
{
public async Task OnSubscribed(IStreamSubscriptionHandleFactory handleFactory)
{
// This is called by the Orleans runtime after OnActivateAsync() has been called
var handle = handleFactory.Create<string>();
await handle.ResumeAsync(this);
}
public Task OnNextAsync(string item, StreamSequenceToken? token = null)
{
_logger.LogInformation($"Received an item from the stream: {item}");
}
public Task OnErrorAsync(Exception ex) => _logger.LogError(ex, "Stream error");
}
It's the invocation of IStreamSubscriptionHandleFactory.OnSubscribed method that needs to be performed during TestKitSilo.CreateGrainAsync for grains that have the ImplicitStreamSubscription attribute applied and implement IStreamSubscriptionObserver.
I've been able to workaround this locally for now but I had to create a wrapper around CreateGrainAsync as I was in a rush.
It would be great if OrleansTestKit could incorporate this as standard.
The official MS docs on streaming consumption are a bit misleading in places but, based on a real deployment of Orleans that uses Implicit subscriptions, a grain should be implemented as follows:
It's the invocation of
IStreamSubscriptionHandleFactory.OnSubscribedmethod that needs to be performed duringTestKitSilo.CreateGrainAsyncfor grains that have theImplicitStreamSubscriptionattribute applied and implementIStreamSubscriptionObserver.I've been able to workaround this locally for now but I had to create a wrapper around
CreateGrainAsyncas I was in a rush.It would be great if OrleansTestKit could incorporate this as standard.