Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package io.github.protocol.mdtp.client;

import io.github.protocol.mdtp.common.codec.MdtpDecoder;
import io.github.protocol.mdtp.common.model.CDATHeader;
import io.github.protocol.mdtp.common.model.CDATHeaderFactory;
import io.github.protocol.mdtp.common.model.DeviceDiscoveryRequest;
import io.github.protocol.mdtp.common.model.MdtpPacket;
import io.github.protocol.mdtp.common.model.MessageBodyHeader;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
Expand Down Expand Up @@ -42,6 +44,8 @@ public void start() throws Exception {
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new MdtpDecoder());
}
});
this.channelFuture = bootstrap.connect().sync();
Expand All @@ -66,8 +70,7 @@ public void close() throws IOException {
public void sendDeviceDiscoveryRequest(int[] deviceTypes) {
log.info("start to send device discovery request.");
DeviceDiscoveryRequest request = new DeviceDiscoveryRequest();
request.setMessageBodyHeader(MessageBodyHeader.DEVICE_DISCOVERY_REQUEST);
request.setRequestId(request.generateRequestId());
request.setRequestId(request.generateId());

if (deviceTypes == null) {
request.setDeviceTypeCount((byte) 0);
Expand All @@ -79,14 +82,7 @@ public void sendDeviceDiscoveryRequest(int[] deviceTypes) {
request.setDeviceTypes(deviceTypes);
}

CDATHeader cdatHeader = new CDATHeader();
cdatHeader.setFormatType((byte) 0x02);
cdatHeader.setProtocolVersion((byte) 1);
cdatHeader.setMessageLength((short) 0);
cdatHeader.setTimestamp(System.currentTimeMillis());
cdatHeader.setFlags((byte) 0b01100000);
cdatHeader.setSequenceNumber(0);
cdatHeader.setLogicalChannelId(0);
CDATHeader cdatHeader = CDATHeaderFactory.createDeviceDiscoveryCDATHeader();

MdtpPacket packet = new MdtpPacket();
packet.setHeader(cdatHeader);
Expand All @@ -95,6 +91,6 @@ public void sendDeviceDiscoveryRequest(int[] deviceTypes) {
packet.setSignature(null);

this.channelFuture.channel().writeAndFlush(packet.toByteBuf());
log.info("send device discovery request success: " + packet);
log.info("send device discovery request success.");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.github.protocol.mdtp.common.codec;

import io.github.protocol.mdtp.common.model.AbstractMessageBody;
import io.github.protocol.mdtp.common.model.DeviceDiscoveryResponse;
import io.netty.buffer.ByteBuf;

public class DeviceDiscoveryResponseDecoder implements MessageBodyDecoder {
@Override
public AbstractMessageBody handle(ByteBuf in) {
return DeviceDiscoveryResponse.readFromBuffer(in);

Check warning on line 10 in mdtp-common/src/main/java/io/github/protocol/mdtp/common/codec/DeviceDiscoveryResponseDecoder.java

View check run for this annotation

Codecov / codecov/patch

mdtp-common/src/main/java/io/github/protocol/mdtp/common/codec/DeviceDiscoveryResponseDecoder.java#L10

Added line #L10 was not covered by tests
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
MessageBodyHeader messageBodyHeader = MessageBodyHeader.readByteBuf(in);
MessageBodyDecoder messageDecode = MessageDecoderFactory.getDecoder(messageBodyHeader);
AbstractMessageBody messageBody = messageDecode.handle(in);
messageBody.setMessageBodyHeader(messageBodyHeader);

mdtpPacket.setHeader(header);
mdtpPacket.setBody(messageBody);

out.add(mdtpPacket);
log.info("decode packet success: {}", mdtpPacket);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ public class MessageDecoderFactory {

static {
decoders.put(MessageBodyHeader.DEVICE_DISCOVERY_REQUEST.toShort(), new DeviceDiscoveryRequestDecoder());
decoders.put(MessageBodyHeader.DEVICE_DISCOVERY_RESPONSE.toShort(), new DeviceDiscoveryResponseDecoder());
}

public static MessageBodyDecoder getDecoder(MessageBodyHeader header) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.github.protocol.mdtp.common.handler;

import io.github.protocol.mdtp.common.model.Attributes;
import io.github.protocol.mdtp.common.model.CDATHeader;
import io.github.protocol.mdtp.common.model.CDATHeaderFactory;
import io.github.protocol.mdtp.common.model.Device;
import io.github.protocol.mdtp.common.model.DeviceDiscoveryRequest;
import io.github.protocol.mdtp.common.model.DeviceDiscoveryResponse;
import io.github.protocol.mdtp.common.model.MdtpPacket;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class DeviceDiscoveryRequestHandler implements MessageBodyHandler {

@Override
public void handle (ChannelHandlerContext ctx, MdtpPacket requestPacket) {
log.info("start to send device discovery response.");
DeviceDiscoveryRequest deviceDiscoveryRequest = (DeviceDiscoveryRequest) requestPacket.getBody();
DeviceDiscoveryResponse deviceDiscoveryResponse = new DeviceDiscoveryResponse();
deviceDiscoveryResponse.setRequestId(deviceDiscoveryRequest.getRequestId());
deviceDiscoveryResponse.setResponseId(deviceDiscoveryResponse.generateId());
Device device = ctx.channel().attr(Attributes.DEVICE_KEY).get();
deviceDiscoveryResponse.setDevice(device);

CDATHeader cdatHeader = CDATHeaderFactory.createDeviceDiscoveryCDATHeader();

MdtpPacket packet = new MdtpPacket();
packet.setHeader(cdatHeader);
packet.setSecurityHeader(null);
packet.setBody(deviceDiscoveryResponse);
packet.setSignature(null);
ctx.channel().writeAndFlush(packet.toByteBuf());
log.info("send device discovery response success.");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.github.protocol.mdtp.common.handler;

import io.github.protocol.mdtp.common.model.MdtpPacket;
import io.netty.channel.ChannelHandlerContext;

public interface MessageBodyHandler {

void handle (ChannelHandlerContext ctx, MdtpPacket mdtpPacket);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.github.protocol.mdtp.common.handler;


import io.github.protocol.mdtp.common.model.MessageBodyHeader;

import java.util.HashMap;
import java.util.Map;

public class MessageHandlerFactory {

Check warning on line 9 in mdtp-common/src/main/java/io/github/protocol/mdtp/common/handler/MessageHandlerFactory.java

View check run for this annotation

Codecov / codecov/patch

mdtp-common/src/main/java/io/github/protocol/mdtp/common/handler/MessageHandlerFactory.java#L9

Added line #L9 was not covered by tests
private static final Map<Short, MessageBodyHandler> handlers = new HashMap<>();

static {
handlers.put(MessageBodyHeader.DEVICE_DISCOVERY_REQUEST.toShort(), new DeviceDiscoveryRequestHandler());
}

public static MessageBodyHandler getHandler(MessageBodyHeader header) {
return handlers.get(header.toShort());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,16 @@
public abstract class AbstractMessageBody {
private MessageBodyHeader messageBodyHeader;

public short generateRequestId() {
protected AbstractMessageBody () {
}

Check warning on line 13 in mdtp-common/src/main/java/io/github/protocol/mdtp/common/model/AbstractMessageBody.java

View check run for this annotation

Codecov / codecov/patch

mdtp-common/src/main/java/io/github/protocol/mdtp/common/model/AbstractMessageBody.java#L12-L13

Added lines #L12 - L13 were not covered by tests

protected AbstractMessageBody (MessageBodyHeader messageBodyHeader) {
this.messageBodyHeader = messageBodyHeader;
}

public short generateId() {
UUID uuid = UUID.randomUUID();
return (short) (uuid.getLeastSignificantBits() & 0xFFFF);
return (short) (uuid.getLeastSignificantBits() & 0x7FFF);
}

public void writeByteBuf(ByteBuf buffer) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.github.protocol.mdtp.common.model;

import io.netty.buffer.ByteBuf;

import java.net.InetAddress;
import java.net.UnknownHostException;

public class Address {
public static final byte IPV4_TYPE = 4;
public static final byte IPV6_TYPE = 6;

private final byte type;
private final byte[] value;

public Address(byte type, byte[] value) {
this.type = type;
this.value = value;
}

public String getIpString() throws UnknownHostException {
return InetAddress.getByAddress(value).getHostAddress();
}

public void writeByteBuf(ByteBuf buffer) {
buffer.writeByte(type);
buffer.writeBytes(value);
}

public static Address readByteBuf(ByteBuf buffer) {
byte type = buffer.readByte();
int length = (type == IPV4_TYPE) ? 4 : 16;
byte[] value = new byte[length];
buffer.readBytes(value);
return new Address(type, value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.github.protocol.mdtp.common.model;

import io.netty.util.AttributeKey;

public class Attributes {

Check warning on line 5 in mdtp-common/src/main/java/io/github/protocol/mdtp/common/model/Attributes.java

View check run for this annotation

Codecov / codecov/patch

mdtp-common/src/main/java/io/github/protocol/mdtp/common/model/Attributes.java#L5

Added line #L5 was not covered by tests
public static final AttributeKey<Device> DEVICE_KEY = AttributeKey.valueOf("device");
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@ public void writeByteBuf(ByteBuf buffer) {
buffer.writeShort(messageLength);
buffer.writeLong(timestamp);
buffer.writeByte(flags);
buffer.writeInt(sequenceNumber);
buffer.writeInt(logicalChannelId);
if (sequenceNumber != null) {
buffer.writeInt(sequenceNumber);
}
if (logicalChannelId != null) {
buffer.writeInt(logicalChannelId);
}
}

public static CDATHeader readByteBuf(ByteBuf buffer) {
Expand All @@ -43,8 +47,15 @@ public static CDATHeader readByteBuf(ByteBuf buffer) {
header.setMessageLength(buffer.readShort());
header.setTimestamp(buffer.readLong());
header.setFlags(buffer.readByte());
header.setSequenceNumber(buffer.readInt());
header.setLogicalChannelId(buffer.readInt());
byte formatType = header.getFormatType();

if (formatType == 0x00) {
header.setSequenceNumber(buffer.readInt());
header.setLogicalChannelId(buffer.readInt());
} else if (formatType == 0x02) {
header.setSequenceNumber(null);
header.setLogicalChannelId(null);
}

return header;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.github.protocol.mdtp.common.model;

public class CDATHeaderFactory {

Check warning on line 3 in mdtp-common/src/main/java/io/github/protocol/mdtp/common/model/CDATHeaderFactory.java

View check run for this annotation

Codecov / codecov/patch

mdtp-common/src/main/java/io/github/protocol/mdtp/common/model/CDATHeaderFactory.java#L3

Added line #L3 was not covered by tests
public static CDATHeader createMessageTransferCDATHeader() {
return initializeDefault(new CDATHeader(), (byte) 0x00);

Check warning on line 5 in mdtp-common/src/main/java/io/github/protocol/mdtp/common/model/CDATHeaderFactory.java

View check run for this annotation

Codecov / codecov/patch

mdtp-common/src/main/java/io/github/protocol/mdtp/common/model/CDATHeaderFactory.java#L5

Added line #L5 was not covered by tests
}

public static CDATHeader createDeviceDiscoveryCDATHeader() {
return initializeDefault(new CDATHeader(), (byte) 0x02);
}

private static CDATHeader initializeDefault(CDATHeader header, byte formatType) {
header.setFormatType(formatType);
header.setProtocolVersion((byte) 1);
header.setMessageLength((short) 0);
header.setTimestamp(System.currentTimeMillis());
header.setFlags((byte) 0b01100000);

if (formatType == 0x00) {
header.setSequenceNumber(0);
header.setLogicalChannelId(0);

Check warning on line 21 in mdtp-common/src/main/java/io/github/protocol/mdtp/common/model/CDATHeaderFactory.java

View check run for this annotation

Codecov / codecov/patch

mdtp-common/src/main/java/io/github/protocol/mdtp/common/model/CDATHeaderFactory.java#L20-L21

Added lines #L20 - L21 were not covered by tests
} else {
header.setSequenceNumber(null);
header.setLogicalChannelId(null);
}

return header;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package io.github.protocol.mdtp.common.model;

import io.netty.buffer.ByteBuf;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class Device {
private byte mask;

private byte deviceStatus;

private byte addressCount;

private List<Address> addresses;

private short port;

private int deviceType;

private byte[] uniqueId;

private String deviceName;

public void writeByteBuf(ByteBuf buffer) {
buffer.writeByte(mask);
buffer.writeByte(deviceStatus);
buffer.writeByte(addressCount);

for (Address address : addresses) {
address.writeByteBuf(buffer);
}

buffer.writeShort(port);
buffer.writeInt(deviceType);

if (uniqueId != null) {
buffer.writeShort(uniqueId.length);
buffer.writeBytes(uniqueId);
} else {
buffer.writeShort(0);
}

if (deviceName != null) {
byte[] nameBytes = deviceName.getBytes(StandardCharsets.UTF_8);
buffer.writeShort(nameBytes.length);
buffer.writeBytes(nameBytes);
} else {
buffer.writeShort(0);
}
}

public static Device readByteBuf(ByteBuf buffer) {
Device device = new Device();

device.mask = buffer.readByte();
device.deviceStatus = buffer.readByte();
device.addressCount = buffer.readByte();

device.addresses = new ArrayList<>();
for (int i = 0; i < device.addressCount; i++) {
device.addresses.add(Address.readByteBuf(buffer));
}

device.port = buffer.readShort();
device.deviceType = buffer.readInt();

int uniqueIdLength = buffer.readShort();
if (uniqueIdLength > 0) {
device.uniqueId = new byte[uniqueIdLength];
buffer.readBytes(device.uniqueId);
} else {
device.uniqueId = null;

Check warning on line 82 in mdtp-common/src/main/java/io/github/protocol/mdtp/common/model/Device.java

View check run for this annotation

Codecov / codecov/patch

mdtp-common/src/main/java/io/github/protocol/mdtp/common/model/Device.java#L82

Added line #L82 was not covered by tests
}

int deviceNameLength = buffer.readShort();
if (deviceNameLength > 0) {
byte[] nameBytes = new byte[deviceNameLength];
buffer.readBytes(nameBytes);
device.deviceName = new String(nameBytes, StandardCharsets.UTF_8);
} else {
device.deviceName = null;

Check warning on line 91 in mdtp-common/src/main/java/io/github/protocol/mdtp/common/model/Device.java

View check run for this annotation

Codecov / codecov/patch

mdtp-common/src/main/java/io/github/protocol/mdtp/common/model/Device.java#L91

Added line #L91 was not covered by tests
}

return device;
}
}
Loading
Loading