diff --git a/pom.xml b/pom.xml
index a5877be..027e87a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -61,6 +61,16 @@
protobuf-java-format
1.4
+
+ org.apache.commons
+ commons-compress
+ 1.20
+
+
+ org.bouncycastle
+ bcprov-jdk15on
+ 1.66
+
UTF-8
diff --git a/src/main/java/iservice/sdk/core/operators/BaseDataOperator.java b/src/main/java/iservice/sdk/core/operators/BaseDataOperator.java
new file mode 100644
index 0000000..3387577
--- /dev/null
+++ b/src/main/java/iservice/sdk/core/operators/BaseDataOperator.java
@@ -0,0 +1,31 @@
+package iservice.sdk.core.operators;
+
+import iservice.sdk.entity.ctx.DataChainContext;
+
+/**
+ * @author : ori
+ * @date : 2020/9/28 4:51 下午
+ */
+public abstract class BaseDataOperator {
+
+ BaseDataOperator next;
+
+ BaseDataOperator(BaseDataOperator next) {
+ this.next = next;
+ }
+
+ /**
+ * operation to do when sending message
+ *
+ * @param context
+ */
+ public abstract void doSend(DataChainContext context);
+
+ /**
+ * operation to do when receiving message
+ *
+ * @param context
+ * @return
+ */
+ public abstract void doReceive(DataChainContext context);
+}
diff --git a/src/main/java/iservice/sdk/core/operators/EncodeOperator.java b/src/main/java/iservice/sdk/core/operators/EncodeOperator.java
new file mode 100644
index 0000000..8715140
--- /dev/null
+++ b/src/main/java/iservice/sdk/core/operators/EncodeOperator.java
@@ -0,0 +1,66 @@
+package iservice.sdk.core.operators;
+
+import iservice.sdk.core.operators.factory.EncodeStrategyFactory;
+import iservice.sdk.core.operators.factory.strategy.encode.EncodeStrategy;
+import iservice.sdk.entity.Header;
+import iservice.sdk.entity.ctx.DataChainContext;
+
+import java.io.IOException;
+import java.util.Base64;
+
+/**
+ * @author : ori
+ * @date : 2020/9/28 5:15 下午
+ */
+public class EncodeOperator extends BaseDataOperator{
+
+ EncodeOperator(BaseDataOperator next) {
+ super(next);
+ }
+
+ @Override
+ public void doSend(DataChainContext context) {
+ EncodeStrategy strategy = getEncodeStrategy(context);
+ try {
+ context.setObj(strategy.encode(context.getObj()));
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ if(next!= null) {
+ next.doSend(context);
+ }
+ }
+
+ @Override
+ public void doReceive(DataChainContext context) {
+ // make next doReceive first
+ if (next!= null) {
+ next.doReceive(context);
+ }
+ EncodeStrategy strategy = getEncodeStrategy(context);
+ try {
+ context.setObj(strategy.decode(context.getObj()));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private EncodeStrategy getEncodeStrategy(DataChainContext context) {
+ return EncodeStrategyFactory.getInstance().getEncodeStrategy(context.getHeader().getEncoding());
+ }
+
+ public static void main(String[] args) {
+ DataChainContext ctx = new DataChainContext();
+ Header header = new Header();
+ header.setEncoding("gzip");
+ ctx.setHeader(header);
+ ctx.setObj("asdfaf".getBytes());
+ EncodeOperator operator = new EncodeOperator(null);
+ operator.doSend(ctx);
+ String s = new String(Base64.getEncoder().encode(ctx.getObj()));
+ System.out.println(s);
+ ctx.setObj(Base64.getDecoder().decode(s));
+ operator.doReceive(ctx);
+ System.out.println(new String(ctx.getObj()));
+ }
+}
diff --git a/src/main/java/iservice/sdk/core/operators/HeaderProtocolOperator.java b/src/main/java/iservice/sdk/core/operators/HeaderProtocolOperator.java
new file mode 100644
index 0000000..262d1e5
--- /dev/null
+++ b/src/main/java/iservice/sdk/core/operators/HeaderProtocolOperator.java
@@ -0,0 +1,16 @@
+package iservice.sdk.core.operators;
+
+/**
+ * @author : ori
+ * @date : 2020/9/28 5:09 下午
+ */
+public class HeaderProtocolOperator {
+ private BaseDataOperator[] baseDataOperators;
+
+ public void doSendOperation(){
+
+ }
+
+
+
+}
diff --git a/src/main/java/iservice/sdk/core/operators/factory/EncodeStrategyFactory.java b/src/main/java/iservice/sdk/core/operators/factory/EncodeStrategyFactory.java
new file mode 100644
index 0000000..f853a36
--- /dev/null
+++ b/src/main/java/iservice/sdk/core/operators/factory/EncodeStrategyFactory.java
@@ -0,0 +1,47 @@
+package iservice.sdk.core.operators.factory;
+
+import iservice.sdk.core.operators.factory.strategy.encode.EncodeStrategy;
+import iservice.sdk.core.operators.factory.strategy.encode.GzipEncodeStrategy;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author : ori
+ * @date : 2020/9/28 5:39 下午
+ */
+public class EncodeStrategyFactory {
+
+ private EncodeStrategyFactory(){
+ strategyMap.put(EncodeType.GZIP.getName(), new GzipEncodeStrategy());
+ }
+
+ private static class InstanceHolder {
+ private static final EncodeStrategyFactory INSTANCE = new EncodeStrategyFactory();
+ }
+
+ public static EncodeStrategyFactory getInstance(){
+ return InstanceHolder.INSTANCE;
+ }
+
+ private final Map strategyMap = new HashMap<>();
+
+ public EncodeStrategy getEncodeStrategy(String encodeStrategyName){
+ return strategyMap.get(encodeStrategyName);
+ }
+
+ enum EncodeType {
+ GZIP("gzip"),
+ COMPRESS("compress")
+ ;
+ private final String name;
+
+ EncodeType(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+ }
+}
diff --git a/src/main/java/iservice/sdk/core/operators/factory/strategy/encode/EncodeStrategy.java b/src/main/java/iservice/sdk/core/operators/factory/strategy/encode/EncodeStrategy.java
new file mode 100644
index 0000000..bfe711d
--- /dev/null
+++ b/src/main/java/iservice/sdk/core/operators/factory/strategy/encode/EncodeStrategy.java
@@ -0,0 +1,15 @@
+package iservice.sdk.core.operators.factory.strategy.encode;
+
+import java.io.IOException;
+
+/**
+ * @author : ori
+ * @date : 2020/9/28 5:45 下午
+ */
+public abstract class EncodeStrategy {
+
+ public abstract byte[] encode(byte[] bytes) throws IOException;
+
+ public abstract byte[] decode(byte[] b) throws Exception;
+
+}
diff --git a/src/main/java/iservice/sdk/core/operators/factory/strategy/encode/GzipEncodeStrategy.java b/src/main/java/iservice/sdk/core/operators/factory/strategy/encode/GzipEncodeStrategy.java
new file mode 100644
index 0000000..df97385
--- /dev/null
+++ b/src/main/java/iservice/sdk/core/operators/factory/strategy/encode/GzipEncodeStrategy.java
@@ -0,0 +1,21 @@
+package iservice.sdk.core.operators.factory.strategy.encode;
+
+import iservice.sdk.util.GzipUtils;
+
+import java.io.IOException;
+
+/**
+ * @author : ori
+ * @date : 2020/9/28 6:43 下午
+ */
+public class GzipEncodeStrategy extends EncodeStrategy {
+ @Override
+ public byte[] encode(byte[] bytes) throws IOException {
+ return GzipUtils.gzip(bytes);
+ }
+
+ @Override
+ public byte[] decode(byte[] bytes) throws Exception {
+ return GzipUtils.unGzip(bytes);
+ }
+}
diff --git a/src/main/java/iservice/sdk/entity/Header.java b/src/main/java/iservice/sdk/entity/Header.java
index 32efb1e..b8942df 100644
--- a/src/main/java/iservice/sdk/entity/Header.java
+++ b/src/main/java/iservice/sdk/entity/Header.java
@@ -1,5 +1,7 @@
package iservice.sdk.entity;
+import iservice.sdk.enums.EncryptionType;
+
/**
* @author Yelong
*/
@@ -7,6 +9,9 @@ public class Header {
private String version = "1.0";
private Location location = new Location();
+ private String encoding;
+ private Encryption encryption;
+
public Header() {
}
@@ -26,18 +31,72 @@ public void setLocation(Location location) {
this.location = location;
}
- public class Location {
- private String status = "off";
+ public String getEncoding() {
+ return encoding;
+ }
+
+ public void setEncoding(String encoding) {
+ this.encoding = encoding;
+ }
+
+ public Encryption getEncryption() {
+ return encryption;
+ }
+
+ public void setEncryption(Encryption encryption) {
+ this.encryption = encryption;
+ }
+
+ public static class Location {
+ private String url;
+ private String hash;
public Location() {
}
- public String getStatus() {
- return status;
+ public String getUrl() {
+ return url;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
+
+ public String getHash() {
+ return hash;
+ }
+
+ public void setHash(String hash) {
+ this.hash = hash;
+ }
+
+ @Override
+ public String toString() {
+ return "Location{" +
+ "url='" + url + '\'' +
+ ", hash='" + hash + '\'' +
+ '}';
+ }
+ }
+
+ public static class Encryption {
+ private EncryptionType encryptionType;
+ private String publicKey;
+
+ public EncryptionType getEncryptionType() {
+ return encryptionType;
+ }
+
+ public void setEncryptionType(EncryptionType encryptionType) {
+ this.encryptionType = encryptionType;
+ }
+
+ public String getPublicKey() {
+ return publicKey;
}
- public void setStatus(String status) {
- this.status = status;
+ public void setPublicKey(String publicKey) {
+ this.publicKey = publicKey;
}
}
}
diff --git a/src/main/java/iservice/sdk/entity/ctx/DataChainContext.java b/src/main/java/iservice/sdk/entity/ctx/DataChainContext.java
new file mode 100644
index 0000000..4d7a318
--- /dev/null
+++ b/src/main/java/iservice/sdk/entity/ctx/DataChainContext.java
@@ -0,0 +1,38 @@
+package iservice.sdk.entity.ctx;
+
+import iservice.sdk.entity.Header;
+
+import java.util.Arrays;
+
+/**
+ * @author : ori
+ * @date : 2020/9/29 10:48 上午
+ */
+public class DataChainContext {
+ Header header;
+ byte[] obj;
+
+ public Header getHeader() {
+ return header;
+ }
+
+ public void setHeader(Header header) {
+ this.header = header;
+ }
+
+ public byte[] getObj() {
+ return obj;
+ }
+
+ public void setObj(byte[] obj) {
+ this.obj = obj;
+ }
+
+ @Override
+ public String toString() {
+ return "DataChainContext{" +
+ "header=" + header +
+ ", obj=" + Arrays.toString(obj) +
+ '}';
+ }
+}
diff --git a/src/main/java/iservice/sdk/enums/EncryptionType.java b/src/main/java/iservice/sdk/enums/EncryptionType.java
new file mode 100644
index 0000000..6c29c34
--- /dev/null
+++ b/src/main/java/iservice/sdk/enums/EncryptionType.java
@@ -0,0 +1,9 @@
+package iservice.sdk.enums;
+
+/**
+ * @author : ori
+ * @date : 2020/9/28 5:26 下午
+ */
+public enum EncryptionType {
+
+}
diff --git a/src/main/java/iservice/sdk/net/WebSocketClient.java b/src/main/java/iservice/sdk/net/WebSocketClient.java
index adaa774..4421ba1 100644
--- a/src/main/java/iservice/sdk/net/WebSocketClient.java
+++ b/src/main/java/iservice/sdk/net/WebSocketClient.java
@@ -1,7 +1,10 @@
package iservice.sdk.net;
import io.netty.bootstrap.Bootstrap;
-import io.netty.channel.*;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
@@ -10,6 +13,8 @@
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import iservice.sdk.core.WebSocketClientObserver;
import iservice.sdk.exception.WebSocketConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.net.URI;
import java.net.URISyntaxException;
@@ -23,9 +28,11 @@
* @date : 2020/9/21 5:46 下午
*/
public class WebSocketClient {
+ private final Logger LOGGER = LoggerFactory.getLogger(WebSocketClient.class);
+
private static final String ERR_MSG_CHANNEL_INACTIVE = "WebSocket channel inactive";
- private final ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(0, 1, 10L, TimeUnit.SECONDS,
+ private final ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2, 4, 10L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(), r -> new Thread(r, "WebSocket Client Daemon"));
private final WebSocketClientOptions options;
@@ -74,10 +81,12 @@ private void doConnect() {
ChannelFuture channelFuture = bootstrap.connect(options.getHost(), options.getPort()).sync();
// to hold a channel
channel = channelFuture.channel();
+ // waiting for handshake complete
+ blockUntilHandshakeFinished();
// notify main thread that the client is start
latch.countDown();
channel.closeFuture().sync();
- } catch (InterruptedException e) {
+ } catch (Exception e) {
System.err.println("Connect failed.");
startedFlag = false;
e.printStackTrace();
@@ -89,6 +98,13 @@ private void doConnect() {
}
}
+ private void blockUntilHandshakeFinished() {
+ WebSocketMessageHandler webSocketHandler = channel.pipeline().get(WebSocketMessageHandler.class);
+ for (; !webSocketHandler.handshaker().isHandshakeComplete(); ) {
+ System.out.println(webSocketHandler.handshaker().isHandshakeComplete());
+ }
+ }
+
private void initHandlerObserver() {
WebSocketMessageHandler.EVENT_OBSERVABLE.deleteObservers();
WebSocketMessageHandler.EVENT_OBSERVABLE.addObserver(new WebSocketClientObserver());
@@ -187,7 +203,13 @@ public void send(String msg) {
if (!isReady()) {
throw new WebSocketConnectException(ERR_MSG_CHANNEL_INACTIVE);
}
- channel.writeAndFlush(new TextWebSocketFrame(msg));
+ channel.writeAndFlush(new TextWebSocketFrame(msg)).addListener(future -> {
+ if (future.isSuccess()) {
+ LOGGER.debug("Message has sent out. content:{}", msg);
+ } else {
+ LOGGER.error("Fail to send message! content:{}", msg);
+ }
+ });
}
}
diff --git a/src/main/java/iservice/sdk/util/DecodeUtil.java b/src/main/java/iservice/sdk/util/DecodeUtil.java
index 23685e8..955ca78 100644
--- a/src/main/java/iservice/sdk/util/DecodeUtil.java
+++ b/src/main/java/iservice/sdk/util/DecodeUtil.java
@@ -64,7 +64,7 @@ public static String decodeConsumerReq(String json, ConsumerListenerOptions opti
}
private static boolean checkMessageType(BaseServiceResult messageResult, ServiceListenerOptions options, String requireType) {
- return Objects.equals(messageResult.getQuery(), SubscribeUtil.buildSubscribeParam(options).getQuery()) && messageResult.getData() != null && Objects.equals(messageResult.getData().getType(), requireType);
+ return messageResult != null && Objects.equals(messageResult.getQuery(), SubscribeUtil.buildSubscribeParam(options).getQuery()) && Objects.equals(messageResult.getData().getType(), requireType);
}
public static String formatJson(String json) {
diff --git a/src/main/java/iservice/sdk/util/GzipUtils.java b/src/main/java/iservice/sdk/util/GzipUtils.java
new file mode 100644
index 0000000..04b98a2
--- /dev/null
+++ b/src/main/java/iservice/sdk/util/GzipUtils.java
@@ -0,0 +1,38 @@
+package iservice.sdk.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+public class GzipUtils {
+
+ public static byte[] gzip(byte[] data) throws IOException {
+ try (
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ GZIPOutputStream gzip = new GZIPOutputStream(bos);
+ ) {
+ gzip.write(data);
+ gzip.finish();
+ return bos.toByteArray();
+ }
+ }
+
+ public static byte[] unGzip(byte[] data) throws Exception {
+ try (
+ ByteArrayInputStream bis = new ByteArrayInputStream(data);
+ GZIPInputStream gzip = new GZIPInputStream(bis);
+ ByteArrayOutputStream bos = new ByteArrayOutputStream()
+ ) {
+ byte[] buf = new byte[1024];
+ int num;
+ while ((num = gzip.read(buf, 0, buf.length)) != -1) {
+ bos.write(buf, 0, num);
+ }
+ byte[] ret = bos.toByteArray();
+ bos.flush();
+ return ret;
+ }
+ }
+}
\ No newline at end of file