-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathMultiplex.cs
More file actions
219 lines (153 loc) · 6.98 KB
/
Multiplex.cs
File metadata and controls
219 lines (153 loc) · 6.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
using System;
using System.Text;
using Microsoft.Azure.Devices.Client; // IoT Hub Device SDK
using Microsoft.Azure.Devices; //IoT Hub Service SDK
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Common.Exceptions;
using System.Collections.Generic;
using System.Linq;
namespace DeviceClientMultiplexing
{
class Multiplex
{
private static RegistryManager _registryManager;
private static string _ioTHubHostName;
// Change these to meet your needs
private static string _prefix = "tdevice"; // Prefix for each deviceId
private static int _devicesToSimulate = 1000; // Number of IoT devices to create
private static bool _pooling = true; // Enable or disable pooling (multiplexing)
private static uint _maxPoolSize = 2;
private static int _sendIterations = 10; // Number of send message iterations
private static int _delayBetweenSendIterations = 10000; // Delay between send iterations, in ms
// Replace with you IoT Hub owner connection string
private static string _iotHubConnString = "<your IoT Hub owner connection string>";
public Multiplex()
{
_registryManager = RegistryManager.CreateFromConnectionString(_iotHubConnString);
// Extract IoT Hub hostname from connection string
int start = _iotHubConnString.IndexOf("=") + 1;
int end = _iotHubConnString.IndexOf(";");
_ioTHubHostName = _iotHubConnString.Substring(start, end - start);
}
public async Task Start()
{
// Create an array of Devices
Device[] devices = new Device[_devicesToSimulate];
// Set the deviceid for each Device
for (int i = 0; i < _devicesToSimulate; i++)
{
devices[i] = new Device(_prefix + i);
}
// Register the device using the Service API. Ordinarily this should be done using DPS.
// Returns back the eTag and authentication key.
// This is being run sequentually due to IoT Hub identity operations throttling limits (100/min/unit).
// https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-quotas-throttling
for (int i = 0; i < _devicesToSimulate; i++)
{
try
{
devices[i] = await _registryManager.AddDeviceAsync(new Device(devices[i].Id));
Console.WriteLine($"Created device {devices[i].Id}");
}
catch (DeviceAlreadyExistsException)
{
devices[i] = await _registryManager.GetDeviceAsync(devices[i].Id);
Console.WriteLine($"Device {devices[i].Id} already exists");
}
}
// Create empty array of DeviceClients
DeviceClient[] deviceClients = new DeviceClient[_devicesToSimulate];
// Populate the array with a DeviceClient created for each Device
for (int i = 0; i < _devicesToSimulate; i++)
{
try
{
var auth = new DeviceAuthenticationWithRegistrySymmetricKey(devices[i].Id, devices[i].Authentication.SymmetricKey.PrimaryKey);
// For IoT PnP discovery
ClientOptions options = new ClientOptions() { ModelId = "dtmi:company:interface;1" };
deviceClients[i] = DeviceClient.Create(
_ioTHubHostName,
auth,
new ITransportSettings[]
{
new AmqpTransportSettings(Microsoft.Azure.Devices.Client.TransportType.Amqp_Tcp_Only)
{
AmqpConnectionPoolSettings = new AmqpConnectionPoolSettings()
{
Pooling = _pooling,
MaxPoolSize = _maxPoolSize
}
}
}, options);
}
catch (Exception e)
{
Console.WriteLine($"DeviceClient.Create Error: {e.Message}");
}
}
for (int x = 0; x < _sendIterations; x++)
{
Console.WriteLine($"********** Send iteration {x} **********");
// Send all the messages in parallel
Parallel.ForEach(deviceClients, async (dc, pls, index) =>
{
SendMessage(dc, index);
});
await Task.Delay(_delayBetweenSendIterations);
}
// Short delay to ensure all messages are received before deleting devices.
await Task.Delay(5000);
// Clean up
Console.WriteLine("Deleting devices...");
try
{
// Unregister all devices with IoT Hub using the Service API.
// RemoveDevices2Async can only do 100 at a time :-(
if (devices.Length > 100)
{
int count = devices.Length / 100;
// Convert to a list because it's easier to work with.
List<Device> deviceList = new List<Device>(devices);
for (int x = 0; x < count; x++)
{
// Unregister and delete in batches of 100
await _registryManager.RemoveDevices2Async(deviceList.Take(100));
deviceList.RemoveRange(0, 100);
Console.WriteLine($"Devices remaining to be deleted {deviceList.Count}");
}
// Unregister any remaining devices from IoT Hub
if (deviceList.Count > 0)
{
await _registryManager.RemoveDevices2Async(deviceList);
}
}
else
{
await _registryManager.RemoveDevices2Async(devices);
}
}
catch (Exception e)
{
Console.WriteLine($"RemoveDevices2Async Error: {e.Message}");
}
}
/// <summary>
/// Sends a D2C message containing the deviceId in the message payload
/// </summary>
/// <param name="dc"></param>
/// <param name="index"></param>
private void SendMessage(DeviceClient dc, long index)
{
try
{
Microsoft.Azure.Devices.Client.Message msg = new Microsoft.Azure.Devices.Client.Message(Encoding.UTF8.GetBytes(_prefix + index));
dc.SendEventAsync(msg);
Console.WriteLine($"Sent message to {_prefix}{index}");
}
catch (Exception e)
{
Console.WriteLine($"SendMessage Error: {e.Message}");
}
}
}
}