Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,4 @@ $RECYCLE.BIN/
#Nuget packages - will be automatically restored
packages

*.bak
94 changes: 72 additions & 22 deletions Frends.Community.RabbitMQ.Tests/UnitTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ public class UnitTests
public static string TestUri = "amqp://agent:agent123@localhost:5772";
public static string TestHost = "localhost";

private readonly string _exchangeName = "exchange";
private readonly string _queueName = "queue";
private readonly string _routingKey = "queue";

private WriteInputParams _inputParameters = new WriteInputParams();
private WriteInputParamsString _inputParametersString = new WriteInputParamsString();

Expand All @@ -42,8 +46,8 @@ public void DeleteExchangeAndQueue()
{
using (var channel = connection.CreateModel())
{
channel.QueueDelete("queue", false, false);
channel.ExchangeDelete("exchange", ifUnused: false);
channel.QueueDelete(_queueName, false, false);
channel.ExchangeDelete(_exchangeName, ifUnused: false);
}
}
}
Expand All @@ -63,32 +67,30 @@ public void CreateExchangeAndQueue()
{
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare("exchange", type: "fanout", durable: false, autoDelete: false);
channel.QueueDeclare("queue", durable: false, exclusive: false, autoDelete: false);
channel.QueueBind("queue", "exchange", routingKey: "");
channel.ExchangeDeclare(_exchangeName, type: "fanout", durable: false, autoDelete: false);
channel.QueueDeclare(_queueName, durable: false, exclusive: false, autoDelete: false);
channel.QueueBind(_queueName, _exchangeName, routingKey: "");
}
}

_inputParameters = new WriteInputParams
{
Data = new byte[] { 0, 1, 2 },
HostName = TestHost,
RoutingKey = "queue",
QueueName = "queue",
RoutingKey = _routingKey,
QueueName = _queueName,
ConnectWithURI = false,
Create = false,
Durable = false,
Headers = null
};



_inputParametersString = new WriteInputParamsString
{
Data = "test message",
HostName = TestHost,
RoutingKey = "queue",
QueueName = "queue",
RoutingKey = _routingKey,
QueueName = _queueName,
ConnectWithURI = false,
Create = false,
Durable = false,
Expand All @@ -108,7 +110,7 @@ public void CreateExchangeAndQueue()
_outputReadParams = new ReadInputParams
{
HostName = TestHost,
QueueName = "queue",
QueueName = _queueName,
AutoAck = ReadAckType.AutoAck,
ReadMessageCount = 1,
ConnectWithURI = false
Expand All @@ -119,6 +121,10 @@ public void CreateExchangeAndQueue()
public void TestWriteRead()
{
RabbitMQTask.WriteMessage(_inputParameters);

uint messageCount = RabbitMQTask.MessageCount(_outputReadParams.MakeQueueInputParams());
Assert.AreEqual(1, messageCount);

var retVal = RabbitMQTask.ReadMessage(_outputReadParams);
Assert.IsTrue(retVal != null && retVal.Messages.Count() == 1);
}
Expand All @@ -133,6 +139,10 @@ public void TestReadWithAck10()
_inputParameters.Data = new byte[] { 0, (byte)(i * i) };
RabbitMQTask.WriteMessage(_inputParameters);
}

uint messageCount = RabbitMQTask.MessageCount(_outputReadParams.MakeQueueInputParams());
Assert.AreEqual(10, messageCount);

var retVal = RabbitMQTask.ReadMessage(_outputReadParams);
Assert.IsTrue(retVal != null && retVal.Messages.Count() == 10);
}
Expand All @@ -153,8 +163,11 @@ public void TestReadNoAck10()

RabbitMQTask.WriteMessage(_inputParameters);
}
var retVal = RabbitMQTask.ReadMessage(_outputReadParams);

uint messageCount = RabbitMQTask.MessageCount(_outputReadParams.MakeQueueInputParams());
Assert.AreEqual(10, messageCount);

var retVal = RabbitMQTask.ReadMessage(_outputReadParams);
Assert.IsTrue(retVal != null && retVal.Messages.Count() == 10);
}

Expand All @@ -167,10 +180,12 @@ public void TestWriteReadWithUri()
_outputReadParams.ConnectWithURI = true;
_outputReadParams.HostName = TestUri;


RabbitMQTask.WriteMessage(_inputParameters);
var retVal = RabbitMQTask.ReadMessage(_outputReadParams);

uint messageCount = RabbitMQTask.MessageCount(_outputReadParams.MakeQueueInputParams());
Assert.AreEqual(1, messageCount);

var retVal = RabbitMQTask.ReadMessage(_outputReadParams);
Assert.IsTrue(retVal != null && retVal.Messages.Count() == 1);
}

Expand Down Expand Up @@ -217,8 +232,10 @@ public void TestReadNoAck10WithUri()
RabbitMQTask.WriteMessage(_inputParameters);
}

var retVal = RabbitMQTask.ReadMessage(_outputReadParams);
uint messageCount = RabbitMQTask.MessageCount(_outputReadParams.MakeQueueInputParams());
Assert.AreEqual(10, messageCount);

var retVal = RabbitMQTask.ReadMessage(_outputReadParams);
Assert.IsTrue(retVal != null && retVal.Messages.Count() == 10);
}

Expand Down Expand Up @@ -251,7 +268,7 @@ public void TestWriteToExistingExchange()
{

_inputParameters.QueueName = null;
_inputParameters.ExchangeName = "exchange";
_inputParameters.ExchangeName = _exchangeName;

RabbitMQTask.WriteMessage(_inputParameters);
var retVal = RabbitMQTask.ReadMessage(_outputReadParams);
Expand Down Expand Up @@ -280,13 +297,12 @@ public void TestWriteReadStringToQueue()
[Test]
public void TestWriteReadStringToExchange()
{

_inputParametersString.QueueName = null;
_inputParametersString.ExchangeName = "exchange";
_inputParametersString.ExchangeName = _exchangeName;

RabbitMQTask.WriteMessageString(_inputParametersString);

var retVal = RabbitMQTask.ReadMessageString(new ReadInputParams { HostName = TestUri, QueueName = "queue", AutoAck = ReadAckType.AutoAck, ReadMessageCount = 1000, ConnectWithURI = true });
var retVal = RabbitMQTask.ReadMessageString(new ReadInputParams { HostName = TestUri, QueueName = _queueName, AutoAck = ReadAckType.AutoAck, ReadMessageCount = 1000, ConnectWithURI = true });
Assert.IsTrue(retVal != null && retVal.Messages.Count() == 1);
Assert.AreEqual("test message", retVal.Messages[0].Data);
Assert.AreEqual("application id", retVal.Messages[0].Headers["X-AppId"]);
Expand All @@ -299,6 +315,40 @@ public void TestWriteReadStringToExchange()
Assert.AreEqual("custom header", retVal.Messages[0].Headers["Custom-Header"]);
}

[Test]
public void TestWriteReadWithUriAndNullHeaderToExchange()
{
var writeParams = new WriteInputParamsString
{
Data = "test message",
HostName = TestUri,
RoutingKey = "queue",
ExchangeName = "exchange",
ConnectWithURI = true,
Create = false,
Durable = false,
Headers = new Header[]
{
new Header { Name = "Null-Header", Value = null }
}
};

var readParams = new ReadInputParams
{
HostName = TestUri,
QueueName = "queue",
AutoAck = ReadAckType.AutoAck,
ReadMessageCount = 1000,
ConnectWithURI = true
};

RabbitMQTask.WriteMessageString(writeParams);

var retVal = RabbitMQTask.ReadMessageString(readParams);
Assert.IsTrue(retVal != null && retVal.Messages.Count() == 1);
Assert.AreEqual(1, retVal.Messages[0].Headers.Count);
Assert.AreEqual(null, retVal.Messages[0].Headers["Null-Header"]);
}

/// <summary>
/// Used for debugging, if connection is closed and opened for new hostname
Expand All @@ -307,8 +357,8 @@ public void TestWriteReadStringToExchange()
[Ignore("This test is actually used for debugging while developing task.")]
public void TestChangingHostName()
{
RabbitMQTask.WriteMessageString(new WriteInputParamsString { Data = "test message", HostName = "amqp://localhost", ExchangeName = "exchange", RoutingKey = "queue", ConnectWithURI = true, Create = false, Durable = false });
RabbitMQTask.WriteMessageString(new WriteInputParamsString { Data = "test message", HostName = "localhost2", ExchangeName = "exchange", RoutingKey = "queue", ConnectWithURI = false, Create = false, Durable = false });
RabbitMQTask.WriteMessageString(new WriteInputParamsString { Data = "test message", HostName = "amqp://localhost", ExchangeName = _exchangeName, RoutingKey = _queueName, ConnectWithURI = true, Create = false, Durable = false });
RabbitMQTask.WriteMessageString(new WriteInputParamsString { Data = "test message", HostName = "localhost2", ExchangeName = _exchangeName, RoutingKey = _queueName, ConnectWithURI = false, Create = false, Durable = false });

Assert.IsTrue(true);
}
Expand Down
9 changes: 7 additions & 2 deletions Frends.Community.RabbitMQ.sln
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 16
VisualStudioVersion = 16.0.30503.244
# Visual Studio Version 17
VisualStudioVersion = 17.2.32630.192
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Frends.Community.RabbitMQ", "Frends.Community.RabbitMQ\Frends.Community.RabbitMQ.csproj", "{DBB2AD78-CF28-44AB-BCAC-D4B7D4035DDF}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Frends.Community.RabbitMQ.Tests", "Frends.Community.RabbitMQ.Tests\Frends.Community.RabbitMQ.Tests.csproj", "{FC0A38D4-6C86-4362-B65F-EDDE7CCA5D39}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{FBFB854F-BD44-4E8E-923F-AD654515E867}"
ProjectSection(SolutionItems) = preProject
README.md = README.md
EndProjectSection
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down
46 changes: 44 additions & 2 deletions Frends.Community.RabbitMQ/Definitions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ namespace Frends.Community.RabbitMQ
/// Acknowledge type for manual operation.
/// </summary>
public enum ManualAckType
{
Ack,
{
Ack,
Nack,
NackAndRequeue,
Reject,
Expand All @@ -37,6 +37,34 @@ public enum ReadAckType
AutoRejectAndRequeue
}

/// <summary>
/// Collection of queue parameters.
/// </summary>
public class QueueInputParams
{
/// <summary>
/// Name of the queue.
/// </summary>
[DefaultValue("sampleQueue")]
[DisplayName(@"Queue name")]
[DisplayFormat(DataFormatString = "Text")]
public string QueueName { get; set; }
/// <summary>
/// RabbitMQ host name.
/// </summary>
[PasswordPropertyText]
[DefaultValue("localhost")]
[DisplayName(@"Host name")]
[DisplayFormat(DataFormatString = "Text")]
public string HostName { get; set; }
/// <summary>
/// Use URI instead of a hostname.
/// </summary>
[DefaultValue(false)]
[DisplayName(@"Use URI for connection")]
public bool ConnectWithURI { get; set; }
}

/// <summary>
/// Collection of read message parameters.
/// </summary>
Expand Down Expand Up @@ -75,6 +103,20 @@ public class ReadInputParams
[DefaultValue(false)]
[DisplayName(@"Use URI for connection")]
public bool ConnectWithURI { get; set; }

/// <summary>
/// Converts ReadInputParams to QueueInputParams
/// </summary>
/// <returns>QueueInputParams</returns>
public QueueInputParams MakeQueueInputParams()
{
return new QueueInputParams
{
HostName = this.HostName,
QueueName = this.QueueName,
ConnectWithURI = this.ConnectWithURI
};
}
}

/// <summary>
Expand Down
2 changes: 1 addition & 1 deletion Frends.Community.RabbitMQ/Frends.Community.RabbitMQ.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<IncludeSource>true</IncludeSource>
<PackageTags>Frends</PackageTags>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<Version>1.6.3</Version>
<Version>1.7.0</Version>
<GenerateAssemblyInfo>false</GenerateAssemblyInfo>
</PropertyGroup>

Expand Down
3 changes: 3 additions & 0 deletions Frends.Community.RabbitMQ/FrendsTaskMetadata.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
{
"TaskMethod": "Frends.Community.RabbitMQ.RabbitMQTask.ReadMessageString"
},
{
"TaskMethod": "Frends.Community.RabbitMQ.RabbitMQTask.MessageCount"
},
{
"TaskMethod": "Frends.Community.RabbitMQ.RabbitMQTask.AcknowledgeMessage"
}
Expand Down
40 changes: 29 additions & 11 deletions Frends.Community.RabbitMQ/RabbitMQTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,13 @@ public static void CloseConnection()
if (_channel != null)
{
_channel.Close();
_channel.Dispose();
}

if (_connection != null)
{
_connection.Close();
_connection.Dispose();
}
}

Expand Down Expand Up @@ -216,21 +218,33 @@ public static Output ReadMessage([PropertyTab]ReadInputParams inputParams)
public static OutputString ReadMessageString([PropertyTab]ReadInputParams inputParams)
{
var messages = ReadMessage(inputParams);
OutputString outString = new OutputString
{
Messages = messages.Messages.Select(m =>
new MessageString
{
DeliveryTag = m.DeliveryTag,
MessagesCount = m.MessagesCount,
Data = Encoding.UTF8.GetString(Convert.FromBase64String(m.Data)),
Headers = m.Headers
}).ToList()
OutputString outString = new OutputString
{
Messages = messages.Messages.Select(m =>
new MessageString
{
DeliveryTag = m.DeliveryTag,
MessagesCount = m.MessagesCount,
Data = Encoding.UTF8.GetString(Convert.FromBase64String(m.Data)),
Headers = m.Headers
}).ToList()
};

return outString;
}

/// <summary>
/// Reads message count in the queue. Throws exception on error.
/// </summary>
/// <param name="inputParams"></param>
/// <returns>uint</returns>
public static uint MessageCount([PropertyTab] QueueInputParams inputParams)
{
OpenConnectionIfClosed(inputParams.HostName, inputParams.ConnectWithURI);

return _channel.MessageCount(inputParams.QueueName);
}

/// <summary>
/// Acknowledges received message. Throws exception on error.
/// </summary>
Expand Down Expand Up @@ -339,7 +353,11 @@ private static Dictionary<string, string> GetResponseHeaderDictionary(IBasicProp

if (basicProperties.IsHeadersPresent())
{
basicProperties.Headers.ToList().ForEach(x => allHeaders[x.Key] = Encoding.UTF8.GetString(x.Value as byte[]));
// May be it is a bug in RabbitMQ Client library, but when publishing messages without headers in RabbitMQ Management UI,
// basicProperties.IsHeadersPresent() returns true, but basicProperties.Headers is null
basicProperties.Headers?.ToList()
/* Supporting only string headers */
.ForEach(x => allHeaders[x.Key] = (x.Value != null && x.Value is byte[]) ? Encoding.UTF8.GetString(x.Value as byte[]) : null);
}

return allHeaders;
Expand Down
Loading