Skip to content

Commit 87c7709

Browse files
committed
Implement chunked reception
1 parent b4404b3 commit 87c7709

1 file changed

Lines changed: 170 additions & 20 deletions

File tree

MatterDotNet/Protocol/Subprotocols/InteractionManager.cs

Lines changed: 170 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
using MatterDotNet.Protocol.Payloads.OpCodes;
1616
using MatterDotNet.Protocol.Payloads.Status;
1717
using MatterDotNet.Protocol.Sessions;
18+
using System.Data;
1819

1920
namespace MatterDotNet.Protocol.Subprotocols
2021
{
@@ -26,17 +27,19 @@ public static async Task<List<AttributeReportIB>> GetAttributes(SecureSession se
2627
{
2728
AttributePathIB[] paths = new AttributePathIB[attributes.Length];
2829
for (int i = 0; i < paths.Length; i++)
29-
paths[i] = new AttributePathIB() { Node = session.InitiatorNodeID, Endpoint = endpoint, Cluster = cluster, Attribute = attributes[i] };
30+
paths[i] = new AttributePathIB() { Endpoint = endpoint, Cluster = cluster, Attribute = attributes[i] };
3031
ReadRequestMessage read = new ReadRequestMessage()
3132
{
3233
InteractionModelRevision = Constants.MATTER_13_REVISION,
3334
FabricFiltered = false,
3435
AttributeRequests = paths,
36+
EventRequests = [],
37+
DataVersionFilters = []
3538
};
3639
Frame readFrame = new Frame(read, (byte)IMOpCodes.ReadRequest);
3740
readFrame.Message.Protocol = ProtocolType.InteractionModel;
3841
readFrame.SourceNodeID = session.InitiatorNodeID;
39-
readFrame.DestinationNodeID = session.ResponderNodeID;
42+
readFrame.DestinationID = session.ResponderNodeID;
4043
await secExchange.SendFrame(readFrame);
4144
List<AttributeReportIB> results = new List<AttributeReportIB>();
4245
bool more = false;
@@ -61,52 +64,105 @@ public static async Task<List<AttributeReportIB>> GetAttributes(SecureSession se
6164
}
6265
}
6366

64-
public static async Task<AttributeReportIB> GetAttribute(SecureSession session, ushort endpoint, uint cluster, uint attribute)
67+
public static async Task<object?> GetAttribute(SecureSession session, ushort endpoint, uint cluster, uint attribute)
6568
{
6669
using (Exchange secExchange = session.CreateExchange())
6770
{
6871
ReadRequestMessage read = new ReadRequestMessage()
6972
{
7073
InteractionModelRevision = Constants.MATTER_13_REVISION,
71-
FabricFiltered = false,
72-
AttributeRequests = [new AttributePathIB() { Node = session.InitiatorNodeID, Endpoint = endpoint, Cluster = cluster, Attribute = attribute }]
74+
FabricFiltered = true,
75+
AttributeRequests = [new AttributePathIB() { Endpoint = endpoint, Cluster = cluster, Attribute = attribute }]
7376
};
7477
Frame readFrame = new Frame(read, (byte)IMOpCodes.ReadRequest);
7578
readFrame.Message.Protocol = ProtocolType.InteractionModel;
7679
readFrame.SourceNodeID = session.InitiatorNodeID;
77-
readFrame.DestinationNodeID = session.ResponderNodeID;
80+
readFrame.DestinationID = session.ResponderNodeID;
7881
await secExchange.SendFrame(readFrame);
7982
while (true)
8083
{
8184
Frame response = await secExchange.Read();
8285
if (response.Message.Payload is ReportDataMessage msg)
8386
{
87+
if (msg.AttributeReports != null && !ValidateResponse(msg.AttributeReports[0], endpoint))
88+
throw new IOException("Failed to query attribute: " + (IMStatusCode?)msg.AttributeReports[0].AttributeStatus?.Status.Status);
89+
if (msg.MoreChunkedMessages.HasValue && msg.MoreChunkedMessages.Value == true)
90+
return await HandleChunked(msg, secExchange, endpoint);
8491
if (msg.AttributeReports != null)
85-
return msg.AttributeReports[0];
92+
return GetData(msg.AttributeReports);
8693
}
94+
else if (response.Message.Payload is StatusResponseMessage status)
95+
throw new IOException("Error: " + (IMStatusCode)status.Status);
8796
}
8897
}
8998
}
9099

100+
private static object? GetData(IList<AttributeReportIB> attributeReports)
101+
{
102+
if (attributeReports.Count == 0)
103+
return null;
104+
if (attributeReports.Count == 1)
105+
return attributeReports[0].AttributeData!.Data;
106+
dynamic[] result = (dynamic[])attributeReports[0].AttributeData!.Data!;
107+
int offset = result.Length;
108+
Array.Resize(ref result, attributeReports.Count - 1 + offset);
109+
for (int i = 1; i < attributeReports.Count; i++)
110+
result[offset + i - 1] = attributeReports[i].AttributeData!.Data!;
111+
return result;
112+
}
113+
114+
private static async Task<object?> HandleChunked(ReportDataMessage first, Exchange secExchange, ushort endpoint)
115+
{
116+
List<AttributeReportIB> attributes = new List<AttributeReportIB>();
117+
attributes.AddRange(first.AttributeReports!);
118+
for (int i = 0; i < 1024; i++) //Infinite loop protection
119+
{
120+
Frame response = await secExchange.Read();
121+
if (response.Message.Payload is ReportDataMessage msg)
122+
{
123+
if (msg.AttributeReports != null && !ValidateResponse(msg.AttributeReports[0], endpoint))
124+
throw new IOException("Failed to query attribute: " + (IMStatusCode?)msg.AttributeReports[0].AttributeStatus?.Status.Status);
125+
if (msg.AttributeReports != null)
126+
attributes.AddRange(msg.AttributeReports);
127+
if (!msg.MoreChunkedMessages.HasValue || msg.MoreChunkedMessages.Value == false)
128+
{
129+
if (!msg.SuppressResponse.HasValue || msg.SuppressResponse.Value == false)
130+
await SendStatus(IMStatusCode.SUCCESS, secExchange);
131+
break;
132+
}
133+
else
134+
await SendStatus(IMStatusCode.SUCCESS, secExchange);
135+
}
136+
else if (response.Message.Payload is StatusResponseMessage status)
137+
throw new IOException("Error: " + (IMStatusCode)status.Status);
138+
}
139+
return GetData(attributes);
140+
}
141+
142+
private static async Task SendStatus(IMStatusCode status, Exchange exchange)
143+
{
144+
await exchange.SendFrame(new Frame(new StatusPayload(GeneralCode.SUCCESS, 0, ProtocolType.SecureChannel, (ushort)status), (byte)SecureOpCodes.StatusReport));
145+
}
146+
91147
public static Task SendCommand(SecureSession session, ushort endpoint, uint cluster, uint command, bool timed, TLVPayload? payload = null)
92148
{
93149
using (Exchange exchange = session.CreateExchange())
94-
return SendCommand(exchange, endpoint, cluster, command, timed, payload);
150+
return SendCommand(exchange, endpoint, cluster, command, timed, null, payload);
95151
}
96152

97-
public static async Task SendCommand(Exchange exchange, ushort endpoint, uint cluster, uint command, bool timed, TLVPayload? payload = null)
153+
public static async Task SendCommand(Exchange exchange, ushort endpoint, uint cluster, uint command, bool timed, ushort? refNum, TLVPayload? payload = null)
98154
{
99155
InvokeRequestMessage run = new InvokeRequestMessage()
100156
{
101157
SuppressResponse = false,
102158
TimedRequest = timed,
103159
InteractionModelRevision = Constants.MATTER_13_REVISION,
104-
InvokeRequests = [new CommandDataIB() { CommandFields = payload, CommandPath = new CommandPathIB() { Endpoint = endpoint, Cluster = cluster, Command = command } }]
160+
InvokeRequests = [new CommandDataIB() { CommandFields = payload, CommandRef = refNum, CommandPath = new CommandPathIB() { Endpoint = endpoint, Cluster = cluster, Command = command } }]
105161
};
106162
Frame invokeFrame = new Frame(run, (byte)IMOpCodes.InvokeRequest);
107163
invokeFrame.Message.Protocol = ProtocolType.InteractionModel;
108164
invokeFrame.SourceNodeID = exchange.Session.InitiatorNodeID;
109-
invokeFrame.DestinationNodeID = exchange.Session.ResponderNodeID;
165+
invokeFrame.DestinationID = exchange.Session.ResponderNodeID;
110166
await exchange.SendFrame(invokeFrame);
111167
}
112168

@@ -120,7 +176,7 @@ public static async Task StartTimed(Exchange exchange, ushort timeout)
120176
Frame invokeFrame = new Frame(time, (byte)IMOpCodes.TimedRequest);
121177
invokeFrame.Message.Protocol = ProtocolType.InteractionModel;
122178
invokeFrame.SourceNodeID = exchange.Session.InitiatorNodeID;
123-
invokeFrame.DestinationNodeID = exchange.Session.ResponderNodeID;
179+
invokeFrame.DestinationID = exchange.Session.ResponderNodeID;
124180
await exchange.SendFrame(invokeFrame);
125181
while (true)
126182
{
@@ -138,12 +194,16 @@ public static async Task<InvokeResponseIB> ExecCommand(SecureSession secSession,
138194
{
139195
using (Exchange exchange = secSession.CreateExchange())
140196
{
141-
await SendCommand(exchange, endpoint, cluster, command, false, payload);
197+
ushort refNum = (ushort)Random.Shared.Next();
198+
await SendCommand(exchange, endpoint, cluster, command, false, refNum, payload);
142199
while (true)
143200
{
144201
Frame response = await exchange.Read();
145202
if (response.Message.Payload is InvokeResponseMessage msg)
146-
return msg.InvokeResponses[0];
203+
{
204+
if (!msg.InvokeResponses[0].Status!.CommandRef.HasValue || msg.InvokeResponses[0].Status!.CommandRef!.Value == refNum)
205+
return msg.InvokeResponses[0];
206+
}
147207
else if (response.Message.Payload is StatusResponseMessage status)
148208
throw new IOException("Error: " + (IMStatusCode)status.Status);
149209
}
@@ -155,7 +215,7 @@ public static async Task<InvokeResponseIB> ExecTimedCommand(SecureSession secSes
155215
using (Exchange exchange = secSession.CreateExchange())
156216
{
157217
await StartTimed(exchange, timeoutMS);
158-
await SendCommand(exchange, endpoint, cluster, command, true, payload);
218+
await SendCommand(exchange, endpoint, cluster, command, true, null, payload);
159219
while (true)
160220
{
161221
Frame response = await exchange.Read();
@@ -167,15 +227,15 @@ public static async Task<InvokeResponseIB> ExecTimedCommand(SecureSession secSes
167227
}
168228
}
169229

170-
internal static async Task<AttributeStatusIB> SetAttribute(SecureSession session, ushort endPoint, uint cluster, ushort attribute, object? value)
230+
internal static async Task SetAttribute(SecureSession session, ushort endPoint, uint cluster, ushort attribute, object? value)
171231
{
172232
using (Exchange secExchange = session.CreateExchange())
173233
{
174234
WriteRequestMessage write = new WriteRequestMessage()
175235
{
176236
InteractionModelRevision = Constants.MATTER_13_REVISION,
177237
WriteRequests = [ new AttributeDataIB() {
178-
Path = new AttributePathIB() { Node = session.InitiatorNodeID, Endpoint = endPoint, Cluster = cluster, Attribute = attribute },
238+
Path = new AttributePathIB() { Node = session.ResponderNodeID, Endpoint = endPoint, Cluster = cluster, Attribute = attribute },
179239
Data = value
180240
}
181241
],
@@ -185,20 +245,110 @@ internal static async Task<AttributeStatusIB> SetAttribute(SecureSession session
185245
Frame readFrame = new Frame(write, (byte)IMOpCodes.WriteRequest);
186246
readFrame.Message.Protocol = ProtocolType.InteractionModel;
187247
readFrame.SourceNodeID = session.InitiatorNodeID;
188-
readFrame.DestinationNodeID = session.ResponderNodeID;
248+
readFrame.DestinationID = session.ResponderNodeID;
189249
await secExchange.SendFrame(readFrame);
190250
while (true)
191251
{
192252
Frame response = await secExchange.Read();
193253
if (response.Message.Payload is WriteResponseMessage msg)
194254
{
195-
if (msg.WriteResponses != null)
196-
return msg.WriteResponses[0];
255+
if (msg.WriteResponses != null && !ValidateResponse(msg.WriteResponses[0], endPoint))
256+
throw new IOException($"Failed to set attribute {attribute} on cluster {cluster}@{endPoint}");
197257
}
198258
else if (response.Message.Payload is StatusResponseMessage status)
199259
throw new IOException("Error: " + (IMStatusCode)status.Status);
200260
}
201261
}
202262
}
263+
264+
/// <summary>
265+
/// Validates a response and throws an exception if it's an error status
266+
/// </summary>
267+
/// <param name="resp"></param>
268+
/// <param name="endPoint"></param>
269+
/// <returns></returns>
270+
/// <exception cref="InvalidDataException"></exception>
271+
internal static bool ValidateResponse(InvokeResponseIB resp, ushort endPoint)
272+
{
273+
if (resp.Status == null)
274+
{
275+
if (resp.Command?.CommandFields != null)
276+
return true;
277+
throw new InvalidDataException("Response received without status");
278+
}
279+
return ValidateStatus((IMStatusCode)resp.Status.Status.Status, endPoint);
280+
}
281+
282+
/// <summary>
283+
/// Validates a response and throws an exception if it's an error status
284+
/// </summary>
285+
/// <param name="resp"></param>
286+
/// <param name="endPoint"></param>
287+
/// <returns></returns>
288+
/// <exception cref="InvalidDataException"></exception>
289+
private static bool ValidateResponse(AttributeStatusIB resp, ushort endPoint)
290+
{
291+
if (resp.Status == null)
292+
throw new InvalidDataException("Response received without status");
293+
294+
return ValidateStatus((IMStatusCode)resp.Status.Status, endPoint);
295+
}
296+
297+
/// <summary>
298+
/// Validates a response and throws an exception if it's an error status
299+
/// </summary>
300+
/// <param name="resp"></param>
301+
/// <param name="endPoint"></param>
302+
/// <returns></returns>
303+
/// <exception cref="InvalidDataException"></exception>
304+
private static bool ValidateResponse(AttributeReportIB resp, ushort endPoint)
305+
{
306+
if (resp.AttributeStatus == null)
307+
{
308+
if (resp.AttributeData != null)
309+
return true;
310+
throw new InvalidDataException("Response received without status");
311+
}
312+
return ValidateStatus((IMStatusCode)resp.AttributeStatus.Status.Status, endPoint);
313+
}
314+
315+
private static bool ValidateStatus(IMStatusCode status, ushort endPoint)
316+
{
317+
switch (status)
318+
{
319+
case IMStatusCode.SUCCESS:
320+
return true;
321+
case IMStatusCode.FAILURE:
322+
return false;
323+
case IMStatusCode.UNSUPPORTED_ACCESS:
324+
throw new UnauthorizedAccessException("Unsupported / Unauthorized Access");
325+
case IMStatusCode.UNSUPPORTED_ENDPOINT:
326+
throw new InvalidOperationException("Endpoint " + endPoint + " is not supported");
327+
case IMStatusCode.INVALID_ACTION:
328+
throw new DataException("Invalid Action");
329+
case IMStatusCode.UNSUPPORTED_COMMAND:
330+
throw new DataException("Command ID not supported on this cluster");
331+
case IMStatusCode.INVALID_COMMAND:
332+
throw new DataException("Invalid Command Payload");
333+
case IMStatusCode.CONSTRAINT_ERROR:
334+
throw new DataException("Data constraint violated");
335+
case IMStatusCode.RESOURCE_EXHAUSTED:
336+
throw new InsufficientMemoryException("Resource exhausted");
337+
case IMStatusCode.DATA_VERSION_MISMATCH:
338+
throw new DataException("Data version mismatch");
339+
case IMStatusCode.TIMEOUT:
340+
throw new TimeoutException();
341+
case IMStatusCode.BUSY:
342+
throw new IOException("Resource Busy");
343+
case IMStatusCode.UNSUPPORTED_CLUSTER:
344+
throw new DataException("Unsupported Cluster");
345+
case IMStatusCode.FAILSAFE_REQUIRED:
346+
throw new InvalidOperationException("Failsafe required");
347+
case IMStatusCode.INVALID_IN_STATE:
348+
throw new InvalidOperationException("The received request cannot be handled due to the current operational state of the device");
349+
default:
350+
return false;
351+
}
352+
}
203353
}
204354
}

0 commit comments

Comments
 (0)