diff --git a/.github/workflows/BuildAndTestOnEveryPush.yml b/.github/workflows/BuildAndTestOnEveryPush.yml new file mode 100644 index 0000000..959ac9e --- /dev/null +++ b/.github/workflows/BuildAndTestOnEveryPush.yml @@ -0,0 +1,28 @@ +name: BuildAndTestOnEveryPush.yml + +on: + push: + branches-ignore: + - master + +jobs: + build: + name: Build on windows-latest + runs-on: windows-latest + + steps: + - uses: actions/checkout@v1 + + - name: Build + run: dotnet build + + - name: Test + env: + EXAMPLE_ENVIROMENT_VARIABLE: ${{ secrets.EXAMPLE_ENVIROMENT_VARIABLE }} + run: dotnet test + + - name: Pack release version of task + run: dotnet pack --configuration Release --include-source + + - name: Push NuGet package to the testfeed + run: dotnet nuget push Frends.Community.RabbitMQ\bin\Release\Frends.Community.RabbitMQ.*.nupkg --api-key ${{ secrets.COMMUNITY_FEED_API_KEY }} --source https://www.myget.org/F/frends-community-test/api/v2/package --symbol-source https://www.myget.org/F/frends-community-test/symbols/api/v2/package diff --git a/.github/workflows/BuildAndTestonEveryPushLinux.yml b/.github/workflows/BuildAndTestonEveryPushLinux.yml new file mode 100644 index 0000000..9a98fb1 --- /dev/null +++ b/.github/workflows/BuildAndTestonEveryPushLinux.yml @@ -0,0 +1,30 @@ +name: BuildAndTestonEveryPushLinux.yml + +on: + push: + branches-ignore: + - master + +jobs: + build: + name: Build on ubuntu-latest + runs-on: ubuntu-latest + + + + steps: + - uses: actions/checkout@v1 + + - name: Build + run: dotnet build + + - name: Test + env: + EXAMPLE_ENVIROMENT_VARIABLE: ${{ secrets.EXAMPLE_ENVIROMENT_VARIABLE }} + run: dotnet test + + - name: Pack release version of task + run: dotnet pack --configuration Release --include-source + + - name: Push NuGet package to the testfeed + run: dotnet nuget push Frends.Community.RabbitMQ\bin\Release\Frends.Community.RabbitMQ.*.nupkg --api-key ${{ secrets.COMMUNITY_FEED_API_KEY }} --source https://www.myget.org/F/frends-community-test/api/v2/package --symbol-source https://www.myget.org/F/frends-community-test/symbols/api/v2/package diff --git a/.github/workflows/PackAndPushAfterMerge.yml b/.github/workflows/PackAndPushAfterMerge.yml new file mode 100644 index 0000000..4e7bd68 --- /dev/null +++ b/.github/workflows/PackAndPushAfterMerge.yml @@ -0,0 +1,40 @@ +name: PackAndPushAfterMerge + +on: + push: + branches: + - master + +jobs: + build: + name: ReleaseTheTask + runs-on: windows-latest + + steps: + - uses: actions/checkout@v1 + + - name: Get version number + id: vars + shell: pwsh + run: | + $version = ([xml]($tmp = Get-Content -Path 'Frends.Community.RabbitMQ/Frends.Community.RabbitMQ.csproj')).SelectSingleNode("Project/PropertyGroup/Version").InnerText + echo "::set-output name=version_number::$version" + + - name: Pack release version of task + run: dotnet pack --configuration Release --include-source + + - name: Push NuGet package to the (prod) feed + run: dotnet nuget push Frends.Community.RabbitMQ\bin\Release\Frends.Community.RabbitMQ.*.nupkg --api-key ${{ secrets.COMMUNITY_FEED_API_KEY }} --source https://www.myget.org/F/frends-community/api/v2/package --symbol-source https://www.myget.org/F/frends-community/symbols/api/v2/package + + - name: Create Release + id: create_release + uses: actions/create-release@v1 + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + with: + tag_name: ${{ steps.vars.outputs.version_number }} + release_name: Release ${{ steps.vars.outputs.version_number }} + body: | + You can install the task via frends UI Task View or you can find the NuGet package from https://www.myget.org/F/frends-community/api/v2/package/Frends.Community.RabbitMQ/${{ steps.vars.outputs.version_number }} + draft: false + prerelease: false diff --git a/Frends.Community.RabbitMQ.Tests/Frends.Community.RabbitMQ.Tests.csproj b/Frends.Community.RabbitMQ.Tests/Frends.Community.RabbitMQ.Tests.csproj new file mode 100644 index 0000000..07198ea --- /dev/null +++ b/Frends.Community.RabbitMQ.Tests/Frends.Community.RabbitMQ.Tests.csproj @@ -0,0 +1,22 @@ + + + + net472 + + false + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + + + + \ No newline at end of file diff --git a/Frends.Community.RabbitMQ.Tests/Properties/AssemblyInfo.cs b/Frends.Community.RabbitMQ.Tests/Properties/AssemblyInfo.cs deleted file mode 100644 index 9296caa..0000000 --- a/Frends.Community.RabbitMQ.Tests/Properties/AssemblyInfo.cs +++ /dev/null @@ -1,20 +0,0 @@ -using System.Reflection; -using System.Runtime.CompilerServices; -using System.Runtime.InteropServices; - -[assembly: AssemblyTitle("Frends.Community.RabbitMQ")] -[assembly: AssemblyDescription("")] -[assembly: AssemblyConfiguration("")] -[assembly: AssemblyCompany("")] -[assembly: AssemblyProduct("Frends.Community.RabbitMQ")] -[assembly: AssemblyCopyright("Copyright © 2019")] -[assembly: AssemblyTrademark("")] -[assembly: AssemblyCulture("")] - -[assembly: ComVisible(false)] - -[assembly: Guid("e7570bbf-f363-43fb-9c36-61bda1349b74")] - -// [assembly: AssemblyVersion("1.0.*")] -[assembly: AssemblyVersion("1.0.0.0")] -[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/Frends.Community.RabbitMQ.Tests/Tests.csproj b/Frends.Community.RabbitMQ.Tests/Tests.csproj index 7ca6c55..a0e9652 100644 --- a/Frends.Community.RabbitMQ.Tests/Tests.csproj +++ b/Frends.Community.RabbitMQ.Tests/Tests.csproj @@ -1,5 +1,7 @@  + + @@ -10,7 +12,7 @@ Properties Frends.Community.RabbitMQ.Tests Frends.Community.RabbitMQ.Tests - v4.5.2 + v4.6 512 {3AC096D0-A1C2-E12C-1390-A8335801FDAB};{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC} 15.0 @@ -20,6 +22,7 @@ UnitTest + true @@ -48,6 +51,9 @@ ..\packages\MSTest.TestFramework.1.3.2\lib\net45\Microsoft.VisualStudio.TestPlatform.TestFramework.Extensions.dll + + ..\packages\NUnit.3.12.0\lib\net45\nunit.framework.dll + ..\packages\RabbitMQ.Client.5.1.0\lib\net451\RabbitMQ.Client.dll @@ -75,6 +81,8 @@ + + \ No newline at end of file diff --git a/Frends.Community.RabbitMQ.Tests/UnitTests.cs b/Frends.Community.RabbitMQ.Tests/UnitTests.cs index f4a1b3f..6a07770 100644 --- a/Frends.Community.RabbitMQ.Tests/UnitTests.cs +++ b/Frends.Community.RabbitMQ.Tests/UnitTests.cs @@ -1,32 +1,48 @@ -using System; -using System.Linq; -using Microsoft.VisualStudio.TestTools.UnitTesting; +using NUnit.Framework; using RabbitMQ.Client; +using System; +using System.Linq; namespace Frends.Community.RabbitMQ.Tests { - [TestClass] + + /// + /// You will need access to RabbitMQ queue, you can create it e.g. by running + /// + /// docker run -d --hostname my-rabbit -p 9080:15672 -p 5772:5672 -e RABBITMQ_DEFAULT_USER=agent -e RABBITMQ_DEFAULT_PASS=agent123 rabbitmq:3.7-management + /// + /// In that case URI would be amqp://agent:agent123@localhost:5772 + /// + /// + + [TestFixture] + // [Ignore("RabbitMQ is not installed on build server.")] public class UnitTests { - //public const string TestURI = "amqp://user:password@hostname:port/vhost"; - public const string TestURI = "localhost"; - [TestInitialize] - public void TestInit() - { + public const string TestUri = "amqp://agent:agent123@localhost:5772"; + //public static string TestUri = Environment.GetEnvironmentVariable("HIQ_RABBITMQ_CONNECTIONSTRING"); + public static string TestHost = "localhost"; - //var retVal = Frends.Community.RabbitMQ.RabbitMQTask.ReadMessage(new ReadInputParams { HostName = "localhost", QueueName = "queue", AutoAck = ReadAckType.AutoAck, ReadMessageCount = 1000 }); - //var retVal2 = Frends.Community.RabbitMQ.RabbitMQTask.ReadMessage(new ReadInputParams { HostName = TestURI, QueueName = "queue", AutoAck = ReadAckType.AutoAck, ReadMessageCount = 1000, ConnectWithURI = true }); + private WriteInputParams _inputParameters = new WriteInputParams(); + private WriteInputParamsString _inputParametersString = new WriteInputParamsString(); + + private WriteBatchInputParams _inputBatchParameters = new WriteBatchInputParams(); + private WriteBatchInputParamsString _inputBatchParametersString = new WriteBatchInputParamsString(); + + private ReadInputParams _outputReadParams; - } /// /// Deletes test exchange and queue if it exists /// - private void DeleteExchangeAndQueue() + [TearDown] + public void DeleteExchangeAndQueue() { - var factory = new ConnectionFactory(); - factory.HostName = "localhost"; + var factory = new ConnectionFactory + { + Uri = new Uri(TestUri) + }; using (var connection = factory.CreateConnection()) { @@ -41,10 +57,13 @@ private void DeleteExchangeAndQueue() /// /// Creates test exchange and queue /// - private void CreateExchangeAndQueue() + [SetUp] + public void CreateExchangeAndQueue() { - var factory = new ConnectionFactory(); - factory.HostName = "localhost"; + var factory = new ConnectionFactory + { + Uri = new Uri(TestUri) + }; using (var connection = factory.CreateConnection()) { @@ -55,102 +74,189 @@ private void CreateExchangeAndQueue() channel.QueueBind("queue", "exchange", routingKey: ""); } } + + _inputParameters = new WriteInputParams + { + Data = new byte[] { 0, 1, 2 }, + HostName = TestHost, + RoutingKey = "queue", + QueueName = "queue", + ConnectWithURI = false, + Create = false, + Durable = false + }; + + + + _inputParametersString = new WriteInputParamsString + { + Data = "test message", + HostName = TestHost, + RoutingKey = "queue", + QueueName = "queue", + ConnectWithURI = false, + Create = false, + Durable = false + }; + + _outputReadParams = new ReadInputParams + { + HostName = TestHost, + QueueName = "queue", + AutoAck = ReadAckType.AutoAck, + ReadMessageCount = 1, + ConnectWithURI = false + }; + + _inputBatchParameters = new WriteBatchInputParams + { + Data = new byte[] { 0, 1, 2 }, + HostName = TestHost, + RoutingKey = "queue", + QueueName = "queue", + ProcessExecutionId = Guid.NewGuid().ToString(), + ConnectWithURI = false, + Create = false, + Durable = false + }; + + + + _inputBatchParametersString = new WriteBatchInputParamsString + { + Data = "test message", + HostName = TestHost, + RoutingKey = "queue", + QueueName = "queue", + ProcessExecutionId = Guid.NewGuid().ToString(), + ConnectWithURI = false, + Create = false, + Durable = false + }; } - [TestMethod] - [Ignore("RabbitMQ is not installed on build server.")] + [Test] public void TestWriteRead() { - DeleteExchangeAndQueue(); - CreateExchangeAndQueue(); - Frends.Community.RabbitMQ.RabbitMQTask.WriteMessage(new WriteInputParams { Data = new byte[] { 0, 1, 2 }, HostName = "localhost", RoutingKey = "queue", QueueName = "queue" }); - var retVal = Frends.Community.RabbitMQ.RabbitMQTask.ReadMessage(new ReadInputParams { HostName = "localhost", QueueName = "queue", AutoAck = ReadAckType.AutoAck, ReadMessageCount = 1 }); + RabbitMQTask.WriteMessage(_inputParameters); + var retVal = RabbitMQTask.ReadMessage(_outputReadParams); Assert.IsTrue(retVal != null && retVal.Messages.Count() == 1); } - [TestMethod] - [Ignore("Rabbit/*M*/Q is not installed on build server.")] + [Test] public void TestReadWithAck10() { - DeleteExchangeAndQueue(); - CreateExchangeAndQueue(); + _outputReadParams.ReadMessageCount = 1000; + for (int i = 0; i < 10; i++) - Frends.Community.RabbitMQ.RabbitMQTask.WriteMessage(new WriteInputParams { Data = new byte[] { 0, (byte)(i * i), (byte)i }, HostName = "localhost", RoutingKey = "queue", QueueName = "queue" }); - var retVal = Frends.Community.RabbitMQ.RabbitMQTask.ReadMessage(new ReadInputParams { HostName = "localhost", QueueName = "queue", AutoAck = ReadAckType.AutoAck, ReadMessageCount = 1000 }); + { + _inputParameters.Data = new byte[] { 0, (byte)(i * i) }; + RabbitMQTask.WriteMessage(_inputParameters); + } + var retVal = RabbitMQTask.ReadMessage(_outputReadParams); Assert.IsTrue(retVal != null && retVal.Messages.Count() == 10); } - [TestMethod] - [Ignore("RabbitMQ is not installed on build server.")] + [Test] public void TestReadNoAck10() { - DeleteExchangeAndQueue(); - CreateExchangeAndQueue(); + _inputParameters.ConnectWithURI = true; + _inputParameters.HostName = TestUri; + + _outputReadParams.ReadMessageCount = 10; + _outputReadParams.AutoAck = ReadAckType.AutoNackAndRequeue; + + for (int i = 0; i < 10; i++) - Frends.Community.RabbitMQ.RabbitMQTask.WriteMessage(new WriteInputParams { Data = new byte[] { 0, (byte)(i * i), (byte)i }, HostName = "localhost", RoutingKey = "queue", QueueName = "queue" }); + { + _inputParameters.Data = new byte[] { 0, (byte)(i * i), (byte)i }; - var retVal = Frends.Community.RabbitMQ.RabbitMQTask.ReadMessage(new ReadInputParams { HostName = "localhost", QueueName = "queue", AutoAck = ReadAckType.AutoNackAndRequeue, ReadMessageCount = 10 }); + RabbitMQTask.WriteMessage(_inputParameters); + } + var retVal = RabbitMQTask.ReadMessage(_outputReadParams); Assert.IsTrue(retVal != null && retVal.Messages.Count() == 10); } - [TestMethod] - [Ignore("RabbitMQ is not installed on build server.")] - public void TestWriteReadWithURI() + + [Test] + public void TestWriteReadWithUri() { - DeleteExchangeAndQueue(); - CreateExchangeAndQueue(); + _inputParameters.HostName = TestUri; + _inputParameters.ConnectWithURI = true; + + _outputReadParams.ConnectWithURI = true; + _outputReadParams.HostName = TestUri; - Frends.Community.RabbitMQ.RabbitMQTask.WriteMessage(new WriteInputParams { Data = new byte[] { 0, 1, 2 }, HostName = TestURI, RoutingKey = "queue", QueueName = "queue", ConnectWithURI = true }); - var retVal = Frends.Community.RabbitMQ.RabbitMQTask.ReadMessage(new ReadInputParams { HostName = TestURI, QueueName = "queue", AutoAck = ReadAckType.AutoAck, ReadMessageCount = 1, ConnectWithURI = true }); + + RabbitMQTask.WriteMessage(_inputParameters); + var retVal = RabbitMQTask.ReadMessage(_outputReadParams); Assert.IsTrue(retVal != null && retVal.Messages.Count() == 1); } - [TestMethod] - [Ignore("RabbitMQ is not installed on build server.")] - public void TestReadWithAck10WithURI() + [Test] + public void TestReadWithAck10WithUri() { - DeleteExchangeAndQueue(); - CreateExchangeAndQueue(); + _inputParameters.HostName = TestUri; + _inputParameters.ConnectWithURI = true; + + + _outputReadParams.ConnectWithURI = true; + _outputReadParams.HostName = TestUri; + _outputReadParams.ReadMessageCount = 10; + + for (int i = 0; i < 10; i++) { - Frends.Community.RabbitMQ.RabbitMQTask.WriteMessage(new WriteInputParams { Data = new byte[] { 0, (byte)(i * i), (byte)i }, HostName = TestURI, RoutingKey = "queue", QueueName = "queue", ConnectWithURI = false }); - } + _inputParameters.Data = new byte[] { 0, (byte)(i * i), (byte)i }; + RabbitMQTask.WriteMessage(_inputParameters); + } - var retVal = Frends.Community.RabbitMQ.RabbitMQTask.ReadMessage(new ReadInputParams { HostName = TestURI, QueueName = "queue", AutoAck = ReadAckType.AutoAck, ReadMessageCount = 10, ConnectWithURI = true }); + var retVal = RabbitMQTask.ReadMessage(_outputReadParams); Assert.IsTrue(retVal != null && retVal.Messages.Count() == 10); } - [TestMethod] - [Ignore("RabbitMQ is not installed on build server.")] - public void TestReadNoAck10WithURI() + [Test] + public void TestReadNoAck10WithUri() { - DeleteExchangeAndQueue(); - CreateExchangeAndQueue(); + _inputParameters.HostName = TestUri; + _inputParameters.ConnectWithURI = true; + + + _outputReadParams.ConnectWithURI = true; + _outputReadParams.AutoAck = ReadAckType.AutoNackAndRequeue; + _outputReadParams.HostName = TestUri; + _outputReadParams.ReadMessageCount = 10; + for (int i = 0; i < 10; i++) { - Frends.Community.RabbitMQ.RabbitMQTask.WriteMessage(new WriteInputParams { Data = new byte[] { 0, (byte)(i * i), (byte)i }, HostName = TestURI, RoutingKey = "queue", QueueName = "queue", ConnectWithURI = false }); + _inputParameters.Data = new byte[] { 0, (byte)(i * i), (byte)i }; + + RabbitMQTask.WriteMessage(_inputParameters); } - var retVal = Frends.Community.RabbitMQ.RabbitMQTask.ReadMessage(new ReadInputParams { HostName = TestURI, QueueName = "queue", AutoAck = ReadAckType.AutoNackAndRequeue, ReadMessageCount = 10, ConnectWithURI = true }); + var retVal = RabbitMQTask.ReadMessage(_outputReadParams); Assert.IsTrue(retVal != null && retVal.Messages.Count() == 10); } - [TestMethod] - [Ignore("RabbitMQ is not installed on build server.")] + [Test] public void TestWriteToNonExistingQueue() { - DeleteExchangeAndQueue(); Exception xx = null; + + _inputParameters.QueueName = "queue2"; // Queue won't exist, but don't create it + + _outputReadParams.QueueName = "queue2"; + try { - Frends.Community.RabbitMQ.RabbitMQTask.WriteMessage(new WriteInputParams { Data = new byte[] { 0 }, HostName = TestURI, RoutingKey = "queue", QueueName = "queue", ConnectWithURI = false, Create = false }); + RabbitMQTask.WriteMessage(_inputParameters); - var retVal = Frends.Community.RabbitMQ.RabbitMQTask.ReadMessage(new ReadInputParams { HostName = TestURI, QueueName = "queue", AutoAck = ReadAckType.AutoAck, ReadMessageCount = 1000, ConnectWithURI = false }); + var _ = RabbitMQTask.ReadMessage(_outputReadParams); } catch (Exception x) @@ -160,48 +266,38 @@ public void TestWriteToNonExistingQueue() Assert.IsTrue(xx != null); } - [TestMethod] - [Ignore("RabbitMQ is not installed on build server.")] - public void TestWriteToExistingQueue() - { - DeleteExchangeAndQueue(); - CreateExchangeAndQueue(); - Frends.Community.RabbitMQ.RabbitMQTask.WriteMessage(new WriteInputParams { Data = new byte[] { 0 }, HostName = TestURI, RoutingKey = "queue", QueueName = "queue", ConnectWithURI = false, Create = false, Durable = false }); - var retVal = Frends.Community.RabbitMQ.RabbitMQTask.ReadMessage(new ReadInputParams { HostName = TestURI, QueueName = "queue", AutoAck = ReadAckType.AutoAck, ReadMessageCount = 1000, ConnectWithURI = false }); - Assert.IsTrue(retVal != null && retVal.Messages.Count() == 1); - } - - [TestMethod] - [Ignore("RabbitMQ is not installed on build server.")] + [Test] public void TestWriteToExistingExchange() { - DeleteExchangeAndQueue(); - CreateExchangeAndQueue(); - Frends.Community.RabbitMQ.RabbitMQTask.WriteMessage(new WriteInputParams { Data = new byte[] { 0 }, HostName = TestURI, ExchangeName = "exchange", ConnectWithURI = false, Create = false, Durable = false }); - var retVal = Frends.Community.RabbitMQ.RabbitMQTask.ReadMessage(new ReadInputParams { HostName = TestURI, QueueName = "queue", AutoAck = ReadAckType.AutoAck, ReadMessageCount = 1000, ConnectWithURI = false }); + + _inputParameters.QueueName = null; + _inputParameters.ExchangeName = "exchange"; + + RabbitMQTask.WriteMessage(_inputParameters); + var retVal = RabbitMQTask.ReadMessage(_outputReadParams); Assert.IsTrue(retVal != null && retVal.Messages.Count() == 1); } - [TestMethod] - [Ignore("RabbitMQ is not installed on build server.")] + [Test] public void TestWriteReadStringToQueue() { - DeleteExchangeAndQueue(); - CreateExchangeAndQueue(); - Frends.Community.RabbitMQ.RabbitMQTask.WriteMessageString(new WriteInputParamsString { Data = "test message", HostName = TestURI, RoutingKey = "queue", QueueName = "queue", ConnectWithURI = false, Create = false, Durable = false }); - var retVal = Frends.Community.RabbitMQ.RabbitMQTask.ReadMessageString(new ReadInputParams { HostName = TestURI, QueueName = "queue", AutoAck = ReadAckType.AutoAck, ReadMessageCount = 1000, ConnectWithURI = false }); + + RabbitMQTask.WriteMessageString(_inputParametersString); + var retVal = RabbitMQTask.ReadMessageString(_outputReadParams); + Assert.IsTrue(retVal != null && retVal.Messages.Count() == 1 && retVal.Messages[0].Data == "test message"); } - [TestMethod] - [Ignore("RabbitMQ is not installed on build server.")] + [Test] public void TestWriteReadStringToExchange() { - DeleteExchangeAndQueue(); - CreateExchangeAndQueue(); - Frends.Community.RabbitMQ.RabbitMQTask.WriteMessageString(new WriteInputParamsString { Data = "test message", HostName = TestURI, ExchangeName = "exchange", RoutingKey = "queue", ConnectWithURI = false, Create = false, Durable = false }); - var retVal = Frends.Community.RabbitMQ.RabbitMQTask.ReadMessageString(new ReadInputParams { HostName = TestURI, QueueName = "queue", AutoAck = ReadAckType.AutoAck, ReadMessageCount = 1000, ConnectWithURI = false }); + _inputParametersString.QueueName = null; + _inputParametersString.ExchangeName = "exchange"; + + RabbitMQTask.WriteMessageString(_inputParametersString); + + var retVal = RabbitMQTask.ReadMessageString(new ReadInputParams { HostName = TestUri, QueueName = "queue", AutoAck = ReadAckType.AutoAck, ReadMessageCount = 1000, ConnectWithURI = true }); Assert.IsTrue(retVal != null && retVal.Messages.Count() == 1 && retVal.Messages[0].Data == "test message"); } @@ -209,18 +305,40 @@ public void TestWriteReadStringToExchange() /// /// Used for debugging, if connection is closed and opened for new hostname /// - [TestMethod] - [Ignore("RabbitMQ is not installed on build server.")] + [Test] + [Ignore("This test is actually used for debugging while developing task.")] public void TestChangingHostName() { - DeleteExchangeAndQueue(); - CreateExchangeAndQueue(); - Frends.Community.RabbitMQ.RabbitMQTask.WriteMessageString(new WriteInputParamsString { Data = "test message", HostName = "amqp://localhost", ExchangeName = "exchange", RoutingKey = "queue", ConnectWithURI = true, Create = false, Durable = false }); - Frends.Community.RabbitMQ.RabbitMQTask.WriteMessageString(new WriteInputParamsString { Data = "test message", HostName = "localhost2", ExchangeName = "exchange", RoutingKey = "queue", ConnectWithURI = false, Create = false, Durable = false }); + RabbitMQTask.WriteMessageString(new WriteInputParamsString { Data = "test message", HostName = "amqp://localhost", ExchangeName = "exchange", RoutingKey = "queue", ConnectWithURI = true, Create = false, Durable = false }); + RabbitMQTask.WriteMessageString(new WriteInputParamsString { Data = "test message", HostName = "localhost2", ExchangeName = "exchange", RoutingKey = "queue", ConnectWithURI = false, Create = false, Durable = false }); Assert.IsTrue(true); } + [Test] + public void TestReadWithAck10WithUriWithBatchWrite() + { + _inputBatchParameters.HostName = TestUri; + _inputBatchParameters.ConnectWithURI = true; + _inputBatchParameters.WriteMessageCount = 10; + _inputBatchParameters.WaitForAcknowledgement = 5; + + _outputReadParams.ConnectWithURI = true; + _outputReadParams.HostName = TestUri; + _outputReadParams.ReadMessageCount = 10; + + for (int i = 0; i < 10; i++) + { + _inputParameters.Data = new byte[] { 0, (byte)(i * i), (byte)i }; + _inputBatchParameters.ProcessExecutionId = Guid.NewGuid().ToString(); + + RabbitMQTask.WriteBatchMessage(_inputBatchParameters); + } + + var retVal = RabbitMQTask.ReadMessage(_outputReadParams); + + Assert.IsTrue(retVal != null && retVal.Messages.Count() < 10); + } } } diff --git a/Frends.Community.RabbitMQ.Tests/packages.config b/Frends.Community.RabbitMQ.Tests/packages.config deleted file mode 100644 index 5d42412..0000000 --- a/Frends.Community.RabbitMQ.Tests/packages.config +++ /dev/null @@ -1,7 +0,0 @@ - - - - - - - \ No newline at end of file diff --git a/Frends.Community.RabbitMQ.sln b/Frends.Community.RabbitMQ.sln index 50c203e..9cdde28 100644 --- a/Frends.Community.RabbitMQ.sln +++ b/Frends.Community.RabbitMQ.sln @@ -1,11 +1,11 @@  Microsoft Visual Studio Solution File, Format Version 12.00 -# Visual Studio 15 -VisualStudioVersion = 15.0.28307.271 +# Visual Studio Version 16 +VisualStudioVersion = 16.0.30503.244 MinimumVisualStudioVersion = 10.0.40219.1 -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Frends.Community.RabbitMQ", "Frends.Community.RabbitMQ\Frends.Community.RabbitMQ.csproj", "{DBB2AD78-CF28-44AB-BCAC-D4B7D4035DDF}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Frends.Community.RabbitMQ", "Frends.Community.RabbitMQ\Frends.Community.RabbitMQ.csproj", "{DBB2AD78-CF28-44AB-BCAC-D4B7D4035DDF}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Tests", "Frends.Community.RabbitMQ.Tests\Tests.csproj", "{E7570BBF-F363-43FB-9C36-61BDA1349B74}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Frends.Community.RabbitMQ.Tests", "Frends.Community.RabbitMQ.Tests\Frends.Community.RabbitMQ.Tests.csproj", "{FC0A38D4-6C86-4362-B65F-EDDE7CCA5D39}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -17,10 +17,10 @@ Global {DBB2AD78-CF28-44AB-BCAC-D4B7D4035DDF}.Debug|Any CPU.Build.0 = Debug|Any CPU {DBB2AD78-CF28-44AB-BCAC-D4B7D4035DDF}.Release|Any CPU.ActiveCfg = Release|Any CPU {DBB2AD78-CF28-44AB-BCAC-D4B7D4035DDF}.Release|Any CPU.Build.0 = Release|Any CPU - {E7570BBF-F363-43FB-9C36-61BDA1349B74}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {E7570BBF-F363-43FB-9C36-61BDA1349B74}.Debug|Any CPU.Build.0 = Debug|Any CPU - {E7570BBF-F363-43FB-9C36-61BDA1349B74}.Release|Any CPU.ActiveCfg = Release|Any CPU - {E7570BBF-F363-43FB-9C36-61BDA1349B74}.Release|Any CPU.Build.0 = Release|Any CPU + {FC0A38D4-6C86-4362-B65F-EDDE7CCA5D39}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {FC0A38D4-6C86-4362-B65F-EDDE7CCA5D39}.Debug|Any CPU.Build.0 = Debug|Any CPU + {FC0A38D4-6C86-4362-B65F-EDDE7CCA5D39}.Release|Any CPU.ActiveCfg = Release|Any CPU + {FC0A38D4-6C86-4362-B65F-EDDE7CCA5D39}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/Frends.Community.RabbitMQ/Definitions.cs b/Frends.Community.RabbitMQ/Definitions.cs index 49cf7ed..defb941 100644 --- a/Frends.Community.RabbitMQ/Definitions.cs +++ b/Frends.Community.RabbitMQ/Definitions.cs @@ -2,9 +2,6 @@ using System.Collections.Generic; using System.ComponentModel; using System.ComponentModel.DataAnnotations; -using System.Linq; -using System.Text; -using System.Threading.Tasks; namespace Frends.Community.RabbitMQ { @@ -135,6 +132,96 @@ public WriteInputParams() [DefaultValue(true)] [DisplayName(@"Set durable option when creating queue")] public bool Durable { get; set; } + + } + + + /// + /// Collection of write message parameters + /// + public class WriteBatchInputParams + { + + public WriteBatchInputParams() + { + ExchangeName = String.Empty; + RoutingKey = String.Empty; + } + + /// + /// Data payload + /// + [DisplayName(@"Data")] + [DisplayFormat(DataFormatString = "Expression")] + public byte[] Data { get; set; } + /// + /// Name of the exchange e.g. sampleExchange + /// + [DefaultValue("")] + [DisplayName(@"Exchange name")] + [DisplayFormat(DataFormatString = "Text")] + public string ExchangeName { get; set; } + /// + /// Name of the queue + /// + [DefaultValue("sampleQueue")] + [DisplayName(@"Queue name")] + [DisplayFormat(DataFormatString = "Text")] + public string QueueName { get; set; } + /// + /// Routing key name + /// + [DefaultValue("sampleQueue")] + [DisplayName(@"Routing key")] + [DisplayFormat(DataFormatString = "Text")] + public string RoutingKey { get; set; } + /// + /// RabbitMQ host name + /// + [DisplayName(@"Host name")] + [DisplayFormat(DataFormatString = "Text")] + public string HostName { get; set; } + /// + /// Amount of messages in the buffer under transaction which will be sent over the messaging channel as a chunk. + /// + [DefaultValue(1)] + [DisplayFormat(DataFormatString = "Text")] + [DisplayName(@"Write message count")] + public int WriteMessageCount { get; set; } + /// + /// Process execution id from the system + /// + [DefaultValue("#process.executionid")] + [DisplayFormat(DataFormatString = "Expression")] + [DisplayName(@"Process execution id")] + public string ProcessExecutionId { get; set; } + /// + /// Use URI instead of a hostname + /// + [DefaultValue(false)] + [DisplayName(@"Use URI for connection")] + public bool ConnectWithURI { get; set; } + /// + /// True to declare queue when writing + /// + [DefaultValue(false)] + [DisplayName(@"True to declare queue before writing. False to not declare it")] + public bool Create { get; set; } + /// + /// Durable option when creating queue + /// + [DefaultValue(true)] + [DisplayName(@"Set durable option when creating queue")] + public bool Durable { get; set; } + + /// + /// Amount of seconds waiting for confirmation messages. + /// + [DefaultValue(1)] + [DisplayFormat(DataFormatString = "Text")] + [DisplayName(@"Wait for acknowledgement in seconds")] + public int WaitForAcknowledgement { get; set; } + } public class WriteInputParamsString @@ -151,6 +238,7 @@ public WriteInputParamsString() [DisplayName(@"Data")] [DisplayFormat(DataFormatString = "Text")] public string Data { get; set; } + /// /// Name of the exchange /// @@ -198,6 +286,83 @@ public WriteInputParamsString() public bool Durable { get; set; } } + public class WriteBatchInputParamsString + { + public WriteBatchInputParamsString() + { + ExchangeName = String.Empty; + RoutingKey = String.Empty; + } + + /// + /// Data payload in string. Will be internally converted to byte array using UTF8.Convert method + /// + [DisplayName(@"Data")] + [DisplayFormat(DataFormatString = "Text")] + public string Data { get; set; } + + /// + /// Name of the exchange + /// + [DefaultValue("sampleExchange")] + [DisplayName(@"Exchange name")] + [DisplayFormat(DataFormatString = "Text")] + public string ExchangeName { get; set; } + /// + /// Name of the queue + /// + [DefaultValue("sampleQueue")] + [DisplayName(@"Queue name")] + [DisplayFormat(DataFormatString = "Text")] + public string QueueName { get; set; } + /// + /// Routing key name + /// + [DefaultValue("sampleQueue")] + [DisplayName(@"Routing key")] + [DisplayFormat(DataFormatString = "Text")] + public string RoutingKey { get; set; } + /// + /// RabbitMQ host name + /// + [DisplayName(@"Host name")] + [DisplayFormat(DataFormatString = "Text")] + public string HostName { get; set; } + /// + /// Amount of messages in the buffer under acknoledgement which will be sent over the messaging channel as a chunk. + /// + [DefaultValue(1)] + [DisplayFormat(DataFormatString = "Text")] + [DisplayName(@"Write message count")] + public int WriteMessageCount { get; set; } + + /// + /// Process execution id from the system + /// + [DefaultValue("#process.executionid")] + [DisplayFormat(DataFormatString = "Expression")] + [DisplayName(@"Process execution id")] + public string ProcessExecutionId { get; set; } + /// + /// Use URI instead of a hostname + /// + [DefaultValue(false)] + [DisplayName(@"Use URI for connection")] + public bool ConnectWithURI { get; set; } + /// + /// True to declare queue when writing + /// + [DefaultValue(false)] + [DisplayName(@"True to declare queue before writing. False to not declare it")] + public bool Create { get; set; } + /// + /// Durable option when creating queue + /// + [DefaultValue(true)] + [DisplayName(@"Set durable option when creating queue")] + public bool Durable { get; set; } + } + public class Output { public List Messages { get; set; } = new List(); diff --git a/Frends.Community.RabbitMQ/Frends.Community.RabbitMQ.csproj b/Frends.Community.RabbitMQ/Frends.Community.RabbitMQ.csproj index 2847eb5..8b88166 100644 --- a/Frends.Community.RabbitMQ/Frends.Community.RabbitMQ.csproj +++ b/Frends.Community.RabbitMQ/Frends.Community.RabbitMQ.csproj @@ -1,62 +1,23 @@ - - - + + - Debug - AnyCPU - {DBB2AD78-CF28-44AB-BCAC-D4B7D4035DDF} - Library - Properties - Frends.Community.RabbitMQ - Frends.Community.RabbitMQ - v4.5.2 - 512 - true + netstandard2.0;net471 + HiQ Finland + HiQ Finland + MIT + https://github.com/CommunityHiQ/Frends.Community.RabbitMQ + true + Frends + true + 1.5.2 + false - - true - full - false - bin\Debug\ - DEBUG;TRACE - prompt - 4 - - - pdbonly - true - bin\Release\ - TRACE - prompt - 4 - bin\Release\Frends.Community.RabbitMQ.xml - - - - ..\packages\Microsoft.Diagnostics.Tracing.EventSource.Redist.1.1.28\lib\net40\Microsoft.Diagnostics.Tracing.EventSource.dll - - - ..\packages\RabbitMQ.Client.5.1.0\lib\net451\RabbitMQ.Client.dll - - - - - - - - - - - - - - - - + - - - + + + - - \ No newline at end of file + + + diff --git a/Frends.Community.RabbitMQ/Frends.Community.RabbitMQ.nuspec b/Frends.Community.RabbitMQ/Frends.Community.RabbitMQ.nuspec deleted file mode 100644 index abc5e97..0000000 --- a/Frends.Community.RabbitMQ/Frends.Community.RabbitMQ.nuspec +++ /dev/null @@ -1,28 +0,0 @@ - - - - Frends.Community.RabbitMQ - 1.4.0 - FRENDS RabbitMQ Task - HiQ Poland - - false - FRENDS RabbitMQ Task - - - - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/Frends.Community.RabbitMQ/FrendsTaskMetadata.json b/Frends.Community.RabbitMQ/FrendsTaskMetadata.json new file mode 100644 index 0000000..5821b4a --- /dev/null +++ b/Frends.Community.RabbitMQ/FrendsTaskMetadata.json @@ -0,0 +1,7 @@ +{ + "Tasks": [ + { + "TaskMethod": "Frends.Community.RabbitMQ.Echo.ExecuteEcho" + } + ] +} \ No newline at end of file diff --git a/Frends.Community.RabbitMQ/Properties/AssemblyInfo.cs b/Frends.Community.RabbitMQ/Properties/AssemblyInfo.cs deleted file mode 100644 index 2981024..0000000 --- a/Frends.Community.RabbitMQ/Properties/AssemblyInfo.cs +++ /dev/null @@ -1,36 +0,0 @@ -using System.Reflection; -using System.Runtime.CompilerServices; -using System.Runtime.InteropServices; - -// General Information about an assembly is controlled through the following -// set of attributes. Change these attribute values to modify the information -// associated with an assembly. -[assembly: AssemblyTitle("Frends.RabbitMQ")] -[assembly: AssemblyDescription("")] -[assembly: AssemblyConfiguration("")] -[assembly: AssemblyCompany("")] -[assembly: AssemblyProduct("Frends.RabbitMQ")] -[assembly: AssemblyCopyright("Copyright © 2019")] -[assembly: AssemblyTrademark("")] -[assembly: AssemblyCulture("")] - -// Setting ComVisible to false makes the types in this assembly not visible -// to COM components. If you need to access a type in this assembly from -// COM, set the ComVisible attribute to true on that type. -[assembly: ComVisible(false)] - -// The following GUID is for the ID of the typelib if this project is exposed to COM -[assembly: Guid("dbb2ad78-cf28-44ab-bcac-d4b7d4035ddf")] - -// Version information for an assembly consists of the following four values: -// -// Major Version -// Minor Version -// Build Number -// Revision -// -// You can specify all the values or you can default the Build and Revision Numbers -// by using the '*' as shown below: -// [assembly: AssemblyVersion("1.0.*")] -[assembly: AssemblyVersion("1.0.6.0")] -[assembly: AssemblyFileVersion("1.0.6.0")] diff --git a/Frends.Community.RabbitMQ/README.md b/Frends.Community.RabbitMQ/README.md new file mode 100644 index 0000000..fb9d346 --- /dev/null +++ b/Frends.Community.RabbitMQ/README.md @@ -0,0 +1,182 @@ +# Frends.Community.RabbitMQ +Frends task for operating on RabbitMQ queues using AMQP 0-9-1. Supports reading and writing from queue. + +- [Installing](#installing) +- [Tasks](#tasks) + - [Write Message](#writemessage) + - [Write Message String](#writemessagestring) + - [Read Message](#readmessage) + - [Read Message String](#readmessagestring) +- [License](#license) +- [Building](#building) +- [Contributing](#contributing) +- [Change Log](#change-log) + +# Installing +You can install the task via FRENDS UI Task View or you can find the nuget package from the following nuget feed +'Nuget feed coming at later date' + +Tasks +===== + +## WriteMessage + +### Task Parameters + +| Property | Type | Description | Example | +| ---------------------| ---------------------| ------------------------------------ | ----- | +| Data | byte[] | Data to be put in message body| new byte[]{1,2,3}| +| QueueName | string | Name of the queue | sampleQueue | +| ExchangeName | string | Name of the exchange | sampleExchange | +| RoutingKey | string | Routing key (as in RabbitMQ specification) | sampleQueue | +| HostName | string | Address of the server hosting RabbitMQ | localhost or amqp://user:password@hostname:port/vhost | +| ConnectWithURI | bool | If true, hostname should be an URI | If false, use hostname only | +| Create | bool | True to declare queue before writing | False to not declare it| +| Durable | bool | Set durable option when creating queue | + +## WriteMessageString +| Property | Type | Description | Example | +| ---------------------| ---------------------| ------------------------------------ | ----- | +| Data | string | Data to be put in message body| "abc"| +| QueueName | string | Name of the queue | sampleQueue | +| ExchangeName | string | Name of the exchange | sampleQueue | +| RoutingKey | string | Routing key (as in RabbitMQ specification) | sampleQueue | +| HostName | string | Address of the server hosting RabbitMQ | localhost or amqp://user:password@hostname:port/vhost | +| Create | bool | True to declare queue before writing | False to not declare it| +| Durable | bool | Set durable option when creating queue | + + +## ReadMessage + +### Task Parameters + +| Property | Type | Description | Example | +| ---------------------| ---------------------| ------------------------------------ | ----- | +| QueueName | string | Name of the queue | sampleQueue | +| HostName | string | Address of the server hosting RabbitMQ | localhost or amqp://user:password@hostname:port/vhost | +| ReadMessageCount | int | Maximum number of messages to be read from queue. It can exceed number of available messages. | 1 | +| AutoAck | enum | Set acknowledgement type. AutoAck,AutoNack, AutoNackAndRequeue, AutoReject, AutoRejectAndRequeue, ManualAck| ReadAckType.AutoAck | +| ConnectWithURI | bool | If true, hostname should be an URI | If false, use hostname only | + +### Output + +| Property | Type | Description | Example | +| ---------------------| ---------------------| ------------------------------------ | ----- | +| Messages | List | A list of message-objects | | + +### Message-object + +| Property | Type | Description | Example | +| ---------------------| ---------------------| ------------------------------------ | ----- | +| Data | string (base64 encoded byte[]) | | | +| MessageCount | uint | | | +| DeliveryTag | ulong | | | + +### Read message sample JSON + +{"Messages":[{"Data":"AAEC","MessagesCount":0,"DeliveryTag":1}]} + +## ReadMessageString + +### Task Parameters + +| Property | Type | Description | Example | +| ---------------------| ---------------------| ------------------------------------ | ----- | +| QueueName | string | Name of the queue | sampleQueue | +| HostName | string | Address of the server hosting RabbitMQ | localhost or amqp://user:password@hostname:port/vhost | +| ReadMessageCount | int | Maximum number of messages to be read from queue. It can exceed number of available messages. | 1 | +| AutoAck | enum | Set acknowledgement type. AutoAck,AutoNack, AutoNackAndRequeue, AutoReject,AutoRejectAndRequeue, ManualAck| ReadAckType.AutoAck | +| ConnectWithURI | bool | If true, hostname should be an URI | If false, use hostname only | + +### OutputString + +| Property | Type | Description | Example | +| ---------------------| ---------------------| ------------------------------------ | ----- | +| Messages | List | A list of MessageString-objects | | + + +### MessageString-object + +| Property | Type | Description | Example | +| ---------------------| ---------------------| ------------------------------------ | ----- | +| Data | string (UTF8 converted byte[]) | | | +| MessageCount | uint | | | +| DeliveryTag | ulong | | | + + +## WriteBatchMessage + +### Task Parameters + +| Property | Type | Description | Example | +| ---------------------| ---------------------| ------------------------------------ | ----- | +| Data | byte[] | Data to be put in message body| new byte[]{1,2,3}| +| QueueName | string | Name of the queue | sampleQueue | +| ExchangeName | string | Name of the exchange | sampleExchange | +| RoutingKey | string | Routing key (as in RabbitMQ specification) | sampleQueue | +| HostName | string | Address of the server hosting RabbitMQ | localhost or amqp://user:password@hostname:port/vhost | +| WriteMessageCount | string | Amount of messages in the buffer which will be sent over messaging queue as a batch. | 20 | +| ProcessExecutionId | string | Unique id of the process execution. | igbmdajlskdhlfjaeirjwkdjfasdflht | +| ConnectWithURI | bool | If true, hostname should be an URI | If false, use hostname only | +| Create | bool | True to declare queue before writing | False to not declare it| +| Durable | bool | Set durable option when creating queue | + + +## WriteBatchMessageString +| Property | Type | Description | Example | +| ---------------------| ---------------------| ------------------------------------ | ----- | +| Data | string | Data to be put in message body| "abc"| +| QueueName | string | Name of the queue | sampleQueue | +| ExchangeName | string | Name of the exchange | sampleQueue | +| RoutingKey | string | Routing key (as in RabbitMQ specification) | sampleQueue | +| HostName | string | Address of the server hosting RabbitMQ | localhost or amqp://user:password@hostname:port/vhost | +| ConnectWithURI | bool | If true, hostname should be an URI | If false, use hostname only | +| ProcessExecutionId | string | Unique id of the process execution. | igbmdajlskdhlfjaeirjwkdjfasdflht | +| Create | bool | True to declare queue before writing | False to not declare it| +| Durable | bool | Set durable option when creating queue | + +# License + +This project is licensed under the MIT License - see the LICENSE file for details + +# Building + +Clone a copy of the repo + +`git clone https://github.com/CommunityHiQ/Frends.Community.RabbitMQ.git` + +Restore dependencies + +`nuget restore frends.community.rabbitmq` + +Rebuild the project + +Run Tests with nunit3. Tests can be found under + +`Frends.Community.Email.Tests\bin\Release\Frends.Community.RabbitMQ.Tests.dll` + +Create a nuget package + +`nuget pack nuspec/Frends.Community.RabbitMQ.nuspec` + +# Contributing +When contributing to this repository, please first discuss the change you wish to make via issue, email, or any other method with the owners of this repository before making a change. + +1. Fork the repo on GitHub +2. Clone the project to your own machine +3. Commit changes to your own branch +4. Push your work back up to your fork +5. Submit a Pull request so that we can review your changes + +NOTE: Be sure to merge the latest from "upstream" before making a pull request! + +# Change Log + +| Version | Changes | +| ---------------------| ---------------------| +| 1.0.2 | Initial version of RabbitMQ | +| 1.0.7 | Connect with URI added | +| 1.0.8 | Add Create and Durable options in WriteMessage. Remove declaring queue in ReadMessage operation | +| 1.1.0 | Fix nacking while reading multiple messages before it read same message multiple times, because of immediately nacking | +| 1.2.0 | Write to exchange, but does not implement creating exchange on fly. | +| 1.3.0 | Message persistence is set to true if durable parameter is true. | diff --git a/Frends.Community.RabbitMQ/RabbitMQTask.cs b/Frends.Community.RabbitMQ/RabbitMQTask.cs index d15a8f7..ebd2059 100644 --- a/Frends.Community.RabbitMQ/RabbitMQTask.cs +++ b/Frends.Community.RabbitMQ/RabbitMQTask.cs @@ -1,6 +1,6 @@ using RabbitMQ.Client; using System; -using System.Collections.Generic; +using System.Collections.Concurrent; using System.ComponentModel; using System.Text; using System.Linq; @@ -12,96 +12,204 @@ namespace Frends.Community.RabbitMQ /// public class RabbitMQTask { - private static IConnection _connection = null; - private static IModel _channel = null; + private static readonly ConnectionFactory Factory = new ConnectionFactory(); - private static void OpenConnectionIfClosed(string hostName, bool connectWithURI) - { - //close connection if hostname has changed - if(_connection!=null && _connection.Endpoint.HostName != hostName) - { - CloseConnection(); - } + private static ConcurrentDictionary ConcurrentDictionary = new ConcurrentDictionary(); + private static ConcurrentDictionary BatchChannels = new ConcurrentDictionary(); - if (_connection == null || _connection.IsOpen == false) + private static IConnection CreateConnection(string hostName, bool connectWithUri) + { + lock (Factory) { - var factory = new ConnectionFactory(); - - if (connectWithURI) - { - factory.Uri = new Uri(hostName); - } - else + IConnection connection; { - factory.HostName = hostName; + if (connectWithUri) + { + Factory.Uri = new Uri(hostName); + } + else + { + Factory.HostName = hostName; + } + connection = Factory.CreateConnection(); } - _connection = factory.CreateConnection(); + return connection; } + } - if (_channel == null || _channel.IsClosed) + private static IConnection CreateBatchConnection(string processExecutionId, string hostName, bool connectWithUri) + { + lock (Factory) { - _channel = _connection.CreateModel(); + return BatchChannels.GetOrAdd(processExecutionId, + (x) => CreateConnection(hostName, connectWithUri)); } } + private static IBasicPublishBatch CreateBasicPublishBatch(string processExecutionId, IModel channel) + { + return ConcurrentDictionary.GetOrAdd(processExecutionId, + (x) => channel.CreateBasicPublishBatch()); + } + + private static void DeleteBasicPublishBatch(string processExecutionId) + { + ConcurrentDictionary.TryRemove(processExecutionId, out var _); + } + /// /// Closes connection and channel to RabbitMQ /// - public static void CloseConnection() + private static void CloseConnection(IModel channel, IConnection connection) { - if (_channel != null) + if (channel != null) { - _channel.Close(); + channel.Close(); + channel.Dispose(); } + if (connection != null) + { + connection.Close(); + } + } - if (_connection != null) + /// + /// Closes connection and channel to RabbitMQ + /// + private static void CloseBatchConnection(IModel channel, IConnection connection, string processExecutionId) + { + if (!ConcurrentDictionary.ContainsKey(processExecutionId)) { - _connection.Close(); + if (channel != null) + { + channel.Close(); + channel.Dispose(); + } + if (connection != null) + { + connection.Close(); + BatchChannels.TryRemove(processExecutionId, out _); + } } } /// - /// Writes message to a queue + /// Writes messages into a queue with simple publish. Message is not under transaction. No rollback of failed messages. /// /// - public static bool WriteMessage([PropertyTab]WriteInputParams inputParams) + public static bool WriteMessage([PropertyTab] WriteInputParams inputParams) { - OpenConnectionIfClosed(inputParams.HostName, inputParams.ConnectWithURI); + IConnection connection = CreateConnection(inputParams.HostName, inputParams.ConnectWithURI); + IModel channel = connection.CreateModel(); - if (inputParams.Create) + try { - _channel.QueueDeclare(queue: inputParams.QueueName, - durable: inputParams.Durable, - exclusive: false, - autoDelete: false, - arguments: null); + if (inputParams.Create) + { + channel.QueueDeclare(queue: inputParams.QueueName, + durable: inputParams.Durable, + exclusive: false, + autoDelete: false, + arguments: null); + channel.ConfirmSelect(); + } + + IBasicProperties basicProperties = null; + + if (inputParams.Durable) + { + basicProperties = channel.CreateBasicProperties(); + basicProperties.Persistent = true; + } + + channel.BasicPublish(exchange: + inputParams.ExchangeName, + routingKey: inputParams.RoutingKey, + basicProperties: basicProperties, + body: inputParams.Data); + return true; + } + finally + { + CloseConnection(channel, connection); + } + } - IBasicProperties basicProperties = null; - if (inputParams.Durable == true) + /// + /// Writes messages into a queue with batch publish. All messages are under transaction. If one the message failes all messages will be rolled back. + /// + /// + public static bool WriteBatchMessage([PropertyTab] WriteBatchInputParams inputParams) + { + IConnection connection = CreateBatchConnection(inputParams.ProcessExecutionId, inputParams.HostName, inputParams.ConnectWithURI); + IModel channel = connection.CreateModel(); + try { + if (inputParams.Create) + { + channel.QueueDeclare(queue: inputParams.QueueName, + durable: inputParams.Durable, + exclusive: false, + autoDelete: false, + arguments: null); + channel.TxSelect(); + } - basicProperties = _channel.CreateBasicProperties(); - basicProperties.Persistent = true; + IBasicProperties basicProperties = null; - } + if (inputParams.Durable) + { + basicProperties = channel.CreateBasicProperties(); + basicProperties.Persistent = true; + } + + // Add message into a memory based on producer write capability. + if (channel.MessageCount(inputParams.QueueName) < inputParams.WriteMessageCount) + { + CreateBasicPublishBatch(inputParams.ProcessExecutionId, channel).Add( + exchange: inputParams.ExchangeName, + routingKey: inputParams.RoutingKey, + mandatory: true, + properties: basicProperties, + body: new ReadOnlyMemory(inputParams.Data)); - _channel.BasicPublish(exchange: inputParams.ExchangeName, - routingKey: inputParams.RoutingKey, - basicProperties: basicProperties, - body: inputParams.Data); + var c = inputParams.WriteMessageCount; - return true; + if (ConcurrentDictionary.Count < inputParams.WriteMessageCount) + return false; + } + + try + { + CreateBasicPublishBatch(inputParams.ProcessExecutionId, channel).Publish(); + channel.ConfirmSelect(); + channel.WaitForConfirmsOrDie(new TimeSpan(0, 0, inputParams.WaitForAcknowledgement )); + var messageCount = channel.MessageCount(inputParams.QueueName); + return messageCount == inputParams.WriteMessageCount; + } + catch (Exception) + { + channel.QueuePurge(inputParams.QueueName); + return false; + + } + } + finally + { + CloseBatchConnection(channel, connection, inputParams.ProcessExecutionId); + } } + /// /// Writes message to queue. Message is a string and there is internal conversion from string to byte[] using UTF8 encoding /// /// /// - public static bool WriteMessageString([PropertyTab]WriteInputParamsString inputParams) + public static bool WriteMessageString([PropertyTab] WriteInputParamsString inputParams) { WriteInputParams wip = new WriteInputParams { @@ -116,7 +224,30 @@ public static bool WriteMessageString([PropertyTab]WriteInputParamsString inputP }; return WriteMessage(wip); + } + + /// + /// Writes message to a queue in batch. Message is a string and there is internal conversion from string to byte[] using UTF8 encoding + /// + /// + /// + public static bool WriteTextMessageBatch([PropertyTab] WriteBatchInputParamsString inputParams) + { + WriteBatchInputParams wip = new WriteBatchInputParams + { + ConnectWithURI = inputParams.ConnectWithURI, + Create = inputParams.Create, + Data = Encoding.UTF8.GetBytes(inputParams.Data), + Durable = inputParams.Durable, + WriteMessageCount = inputParams.WriteMessageCount, + ProcessExecutionId = inputParams.ProcessExecutionId, + HostName = inputParams.HostName, + ExchangeName = inputParams.ExchangeName, + QueueName = inputParams.QueueName, + RoutingKey = inputParams.RoutingKey + }; + return WriteBatchMessage(wip); } /// @@ -124,64 +255,71 @@ public static bool WriteMessageString([PropertyTab]WriteInputParamsString inputP /// /// /// JSON structure with message contents - public static Output ReadMessage([PropertyTab]ReadInputParams inputParams) + public static Output ReadMessage([PropertyTab] ReadInputParams inputParams) { - Output output = new Output(); - - OpenConnectionIfClosed(inputParams.HostName, inputParams.ConnectWithURI); - - //channel.QueueDeclare(queue: inputParams.QueueName, - // durable: false, - // exclusive: false, - // autoDelete: false, - // arguments: null); - - while (inputParams.ReadMessageCount-- > 0) + IConnection connection = CreateConnection(inputParams.HostName, inputParams.ConnectWithURI); + IModel channel = connection.CreateModel(); + try { - var rcvMessage = _channel.BasicGet(queue: inputParams.QueueName, autoAck: inputParams.AutoAck == ReadAckType.AutoAck); - if (rcvMessage != null) + Output output = new Output(); + + while (inputParams.ReadMessageCount-- > 0) { - output.Messages.Add(new Message { Data = Convert.ToBase64String(rcvMessage.Body), MessagesCount = rcvMessage.MessageCount, DeliveryTag = rcvMessage.DeliveryTag }); + var rcvMessage = channel.BasicGet(queue: inputParams.QueueName, + autoAck: inputParams.AutoAck == ReadAckType.AutoAck); + if (rcvMessage != null) + { + output.Messages.Add(new Message + { + Data = Convert.ToBase64String(rcvMessage.Body.ToArray()), + MessagesCount = rcvMessage.MessageCount, + DeliveryTag = rcvMessage.DeliveryTag + }); + } + //break the loop if no more messages are present + else + { + break; + } } - //break the loop if no more messagages are present - else + + // Auto acking + if (inputParams.AutoAck != ReadAckType.AutoAck && inputParams.AutoAck != ReadAckType.ManualAck) { - break; - } - } + ManualAckType ackType = ManualAckType.NackAndRequeue; - // Auto acking - if (inputParams.AutoAck != ReadAckType.AutoAck && inputParams.AutoAck != ReadAckType.ManualAck) - { - ManualAckType ackType = ManualAckType.NackAndRequeue; + switch (inputParams.AutoAck) + { + case ReadAckType.AutoNack: + ackType = ManualAckType.Nack; + break; - switch (inputParams.AutoAck) - { - case ReadAckType.AutoNack: - ackType = ManualAckType.Nack; - break; + case ReadAckType.AutoNackAndRequeue: + ackType = ManualAckType.NackAndRequeue; + break; - case ReadAckType.AutoNackAndRequeue: - ackType = ManualAckType.NackAndRequeue; - break; + case ReadAckType.AutoReject: + ackType = ManualAckType.Reject; + break; - case ReadAckType.AutoReject: - ackType = ManualAckType.Reject; - break; + case ReadAckType.AutoRejectAndRequeue: + ackType = ManualAckType.RejectAndRequeue; + break; + } - case ReadAckType.AutoRejectAndRequeue: - ackType = ManualAckType.RejectAndRequeue; - break; + foreach (var message in output.Messages) + { + AcknowledgeMessage(channel, ackType, message.DeliveryTag); + } } - foreach (var message in output.Messages) - { - AcknowledgeMessage(ackType, message.DeliveryTag); - } + return output; + } + finally + { + CloseConnection(channel, connection); } - - return output; } /// @@ -189,17 +327,19 @@ public static Output ReadMessage([PropertyTab]ReadInputParams inputParams) /// /// /// JSON structure with message contents - public static OutputString ReadMessageString([PropertyTab]ReadInputParams inputParams) + public static OutputString ReadMessageString([PropertyTab] ReadInputParams inputParams) { var messages = ReadMessage(inputParams); - OutputString outString = new OutputString(); - outString.Messages = messages.Messages.Select(m => - new MessageString - { - DeliveryTag = m.DeliveryTag, - MessagesCount = m.MessagesCount, - Data = Encoding.UTF8.GetString(Convert.FromBase64String(m.Data)) - }).ToList(); + OutputString outString = new OutputString + { + Messages = messages.Messages.Select(m => + new MessageString + { + DeliveryTag = m.DeliveryTag, + MessagesCount = m.MessagesCount, + Data = Encoding.UTF8.GetString(Convert.FromBase64String(m.Data)) + }).ToList() + }; return outString; } @@ -207,11 +347,9 @@ public static OutputString ReadMessageString([PropertyTab]ReadInputParams inputP /// /// Acknowledges received message. Throws exception on error. /// - /// - /// - public static void AcknowledgeMessage(ManualAckType ackType, ulong deliveryTag) + public static void AcknowledgeMessage(IModel channel, ManualAckType ackType, ulong deliveryTag) { - if (_channel == null) + if (channel == null) { // do not try to re-connect, because messages already nacked automatically throw new Exception("No connection to RabbitMQ"); @@ -220,24 +358,28 @@ public static void AcknowledgeMessage(ManualAckType ackType, ulong deliveryTag) switch (ackType) { case ManualAckType.Ack: - _channel.BasicAck(deliveryTag, multiple: false); + channel.BasicAck(deliveryTag, multiple: false); break; case ManualAckType.Nack: - _channel.BasicNack(deliveryTag, multiple: false, requeue: false); + channel.BasicNack(deliveryTag, multiple: false, requeue: false); break; case ManualAckType.NackAndRequeue: - _channel.BasicNack(deliveryTag, multiple: false, requeue: true); + channel.BasicNack(deliveryTag, multiple: false, requeue: true); break; case ManualAckType.Reject: - _channel.BasicReject(deliveryTag, requeue: false); + channel.BasicReject(deliveryTag, requeue: false); break; case ManualAckType.RejectAndRequeue: - _channel.BasicReject(deliveryTag, requeue: true); + channel.BasicReject(deliveryTag, requeue: true); break; + default: + throw new Exception("Wrong acknowledge type."); + + } } } diff --git a/Frends.Community.RabbitMQ/packages.config b/Frends.Community.RabbitMQ/packages.config deleted file mode 100644 index 8cb3d9d..0000000 --- a/Frends.Community.RabbitMQ/packages.config +++ /dev/null @@ -1,5 +0,0 @@ - - - - - \ No newline at end of file diff --git a/README.md b/README.md index e04fad6..dc38070 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,8 @@ Tasks | ExchangeName | string | Name of the exchange | sampleExchange | | RoutingKey | string | Routing key (as in RabbitMQ specification) | sampleQueue | | HostName | string | Address of the server hosting RabbitMQ | localhost or amqp://user:password@hostname:port/vhost | +| WriteMessageCount | string | Amount of messages in the buffer which will be sent over messaging queue as a batch. | 20 | +| ProcessExecutionId | string | Unique id of the process execution. | igbmdajlskdhlfjaeirjwkdjfasdflht | | ConnectWithURI | bool | If true, hostname should be an URI | If false, use hostname only | | Create | bool | True to declare queue before writing | False to not declare it| | Durable | bool | Set durable option when creating queue | @@ -43,6 +45,7 @@ Tasks | RoutingKey | string | Routing key (as in RabbitMQ specification) | sampleQueue | | HostName | string | Address of the server hosting RabbitMQ | localhost or amqp://user:password@hostname:port/vhost | | ConnectWithURI | bool | If true, hostname should be an URI | If false, use hostname only | +| ProcessExecutionId | string | Unique id of the process execution. | igbmdajlskdhlfjaeirjwkdjfasdflht | | Create | bool | True to declare queue before writing | False to not declare it| | Durable | bool | Set durable option when creating queue |