Skip to content

Commit 415fdce

Browse files
committed
Add some concepts
1 parent 9a3b627 commit 415fdce

11 files changed

Lines changed: 220 additions & 21 deletions

File tree

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
using Talkie.Common;
2+
using Talkie.Controllers.MessageControllers;
3+
using Talkie.Disposables;
4+
using Talkie.Flows;
5+
using Talkie.Handlers;
6+
using Talkie.Models.Messages;
7+
using Talkie.Pipelines.Handling;
8+
using Talkie.Pipelines.Intercepting;
9+
using Talkie.Signals;
10+
using Talkie.Subscribers;
11+
12+
namespace Talkie.Examples;
13+
14+
public sealed class ExBehaviour : IBehaviorsSubscriber
15+
{
16+
public void Subscribe(ISignalFlow flow, IRegisterOnlyDisposableScope disposables, CancellationToken cancellationToken)
17+
{
18+
var s = ConcurrentExclusiveSchedulerPair
19+
20+
flow.Subscribe(signals => signals
21+
.OfType<UserAcceptInvoice>()
22+
.SkipSelfPublished()
23+
.Where(signal => signal.Message.GetText().IsNullOrEmpty() is false)
24+
.HandleAsync(async context =>
25+
{
26+
Console.WriteLine("Incoming message with text");
27+
28+
await context.ToMessageController().PublishMessageAsync("Hello");
29+
}))
30+
.UnsubscribeWith(disposables);
31+
}
32+
}

Examples/Falko.Talkie.Examples.Microsoft.Hosting/HelloSubscriber.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@ public void Subscribe(ISignalFlow flow, IRegisterOnlyDisposableScope disposables
1717
{
1818
flow.Subscribe<MessagePublishedSignal>(static signals => signals
1919
.SkipSelfPublished()
20-
.SkipOlderThan(TimeSpan
21-
.FromMinutes(1))
20+
.SkipOlderThan(TimeSpan.FromMinutes(1))
21+
.InterceptOn(Schedulers.Intercepting.Random())
22+
.Do(_ => Thread.Sleep(1000))
23+
.InterceptOn(Schedulers.Intercepting.Current())
2224
.Where(signal => signal
2325
.Message
2426
.GetText()
Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,15 @@
1-
using Microsoft.Extensions.Configuration;
2-
using Microsoft.Extensions.Hosting;
3-
using Serilog;
4-
using Talkie.Examples;
5-
using Talkie.Hosting;
1+
// IWith и в чем отличие от обычных интерфейсов
2+
// Sequence/FrozenSequence и отличие от List
3+
// IDisposable и IAsyncDisposable, и IDisposableScope и IRegisterOnlyDisposableScope
4+
// foreach
5+
// Parallelize().ForEach
6+
// async/await
67

7-
// Please set 'Telegram:Token' environment variable before running 💚💚💚
8+
using Talkie.Bridges.Telegram.Clients;
9+
using Talkie.Bridges.Telegram.Configurations;
810

9-
await new HostBuilder()
10-
.ConfigureAppConfiguration(configuration => configuration
11-
.AddEnvironmentVariables())
12-
.UseSerilog((_, configuration) => configuration
13-
.MinimumLevel.Verbose()
14-
.WriteTo.Console())
15-
.UseTalkie(configuration => configuration
16-
.SetSignalsLogging())
17-
.ConfigureServices(services => services
18-
.AddIntegrations<TelegramSubscriber>()
19-
.AddBehaviors<HelloSubscriber>()
20-
.AddBehaviors<StartSubscriber>())
21-
.RunConsoleAsync();
11+
var configuration = new TelegramConfiguration("TOKEN");
12+
13+
var client = new TelegramClient(configuration);
14+
15+
var self = await client.GetMeAsync();
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
using Talkie.Pipelines.Intercepting;
2+
using Talkie.Signals;
3+
4+
namespace Talkie.Pipelines.Handling;
5+
6+
public static partial class SignalHandlingPipelineBuilderExtensions
7+
{
8+
public static ISignalHandlingPipelineBuilder HandleOn
9+
(
10+
this ISignalInterceptingPipelineBuilder builder,
11+
ISignalHandlingPipelineProcessorFactory processorFactory
12+
)
13+
{
14+
var pipeline = builder.Build();
15+
16+
return pipeline is not EmptySignalInterceptingPipeline
17+
? new SignalHandlingPipelineBuilder(pipeline)
18+
: SignalHandlingPipelineBuilder.Empty;
19+
}
20+
21+
public static ISignalHandlingPipelineBuilder<T> HandleOn<T>
22+
(
23+
this ISignalInterceptingPipelineBuilder<T> builder,
24+
ISignalHandlingPipelineProcessorFactory processorFactory
25+
) where T : Signal
26+
{
27+
var pipeline = builder.Build();
28+
29+
return pipeline is not EmptySignalInterceptingPipeline
30+
? new SignalHandlingPipelineBuilder<T>(pipeline)
31+
: SignalHandlingPipelineBuilder<T>.Empty;
32+
}
33+
34+
public static ISignalInterceptingPipelineBuilder<T> InterceptOn<T>
35+
(
36+
this ISignalInterceptingPipelineBuilder<T> builder,
37+
ISignalInterceptingPipelineProcessorFactory processorFactory
38+
) where T : Signal
39+
{
40+
var pipeline = builder.Build();
41+
42+
return builder;
43+
}
44+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
using Talkie.Pipelines.Handling;
2+
using Talkie.Pipelines.Intercepting;
3+
4+
namespace Talkie.Concurrent;
5+
6+
public static partial class Schedulers
7+
{
8+
public static readonly HandlingSchedulers Handling = default;
9+
10+
public static readonly InterceptingSchedulers Intercepting = default;
11+
}
12+
13+
public readonly struct HandlingSchedulers;
14+
15+
public readonly struct InterceptingSchedulers;
16+
17+
public static partial class HandlingSchedulersExtensions
18+
{
19+
public static ISignalHandlingPipelineProcessorFactory Current(this HandlingSchedulers _)
20+
{
21+
throw new NotImplementedException();
22+
}
23+
24+
public static ISignalHandlingPipelineProcessorFactory Random(this HandlingSchedulers _)
25+
{
26+
throw new NotImplementedException();
27+
}
28+
29+
public static ISignalHandlingPipelineProcessorFactory Parallel(this HandlingSchedulers _)
30+
{
31+
return ParallelSignalHandlingPipelineSchedulerFactory.Instance;
32+
}
33+
}
34+
35+
public static partial class InterceptingSchedulersExtensions
36+
{
37+
public static ISignalInterceptingPipelineProcessorFactory Current(this InterceptingSchedulers _)
38+
{
39+
throw new NotImplementedException();
40+
}
41+
42+
public static ISignalInterceptingPipelineProcessorFactory Random(this InterceptingSchedulers _)
43+
{
44+
throw new NotImplementedException();
45+
}
46+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
using Talkie.Handlers;
2+
3+
namespace Talkie.Pipelines.Handling;
4+
5+
public interface ISignalHandlingPipelineProcessor
6+
{
7+
ValueTask ProcessAsync
8+
(
9+
SignalContext context,
10+
CancellationToken cancellationToken
11+
);
12+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
using Talkie.Handlers;
2+
using Talkie.Sequences;
3+
4+
namespace Talkie.Pipelines.Handling;
5+
6+
public interface ISignalHandlingPipelineProcessorFactory
7+
{
8+
ISignalHandlingPipelineProcessor Create(FrozenSequence<ISignalHandler> handlers);
9+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
using Talkie.Concurrent;
2+
using Talkie.Handlers;
3+
using Talkie.Sequences;
4+
5+
namespace Talkie.Pipelines.Handling;
6+
7+
public sealed class ParallelSignalHandlingPipelineProcessor(FrozenSequence<ISignalHandler> handlers) : ISignalHandlingPipelineProcessor
8+
{
9+
private readonly ParallelismMeter _handlersParallelismMeter = new();
10+
11+
public ValueTask ProcessAsync(SignalContext context, CancellationToken cancellationToken)
12+
{
13+
return handlers.Parallelize(_handlersParallelismMeter)
14+
.ForEachAsync((handler, scopedCancellationToken) => handler.HandleAsync(context, scopedCancellationToken),
15+
cancellationToken: cancellationToken)
16+
.AsValueTask();
17+
}
18+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
using Talkie.Handlers;
2+
using Talkie.Sequences;
3+
4+
namespace Talkie.Pipelines.Handling;
5+
6+
public sealed class ParallelSignalHandlingPipelineSchedulerFactory : ISignalHandlingPipelineProcessorFactory
7+
{
8+
public static readonly ParallelSignalHandlingPipelineSchedulerFactory Instance = new();
9+
10+
private ParallelSignalHandlingPipelineSchedulerFactory() { }
11+
12+
public ISignalHandlingPipelineProcessor Create(FrozenSequence<ISignalHandler> handlers)
13+
{
14+
return new ParallelSignalHandlingPipelineProcessor(handlers);
15+
}
16+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
using Talkie.Handlers;
2+
using Talkie.Sequences;
3+
4+
namespace Talkie.Pipelines.Handling;
5+
6+
public sealed class SequencingHandlingPipelineProcessor(FrozenSequence<ISignalHandler> handlers) : ISignalHandlingPipelineProcessor
7+
{
8+
public async ValueTask ProcessAsync(SignalContext context, CancellationToken cancellationToken)
9+
{
10+
foreach (var handler in handlers.AsEnumerable())
11+
{
12+
await handler.HandleAsync(context, cancellationToken);
13+
}
14+
}
15+
}

0 commit comments

Comments
 (0)