-
Notifications
You must be signed in to change notification settings - Fork 31
Expand file tree
/
Copy pathWebSocketJavaExample.java
More file actions
162 lines (138 loc) · 6.33 KB
/
WebSocketJavaExample.java
File metadata and controls
162 lines (138 loc) · 6.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
package org.example.ws;
import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import jakarta.annotation.PostConstruct;
import jakarta.websocket.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@ClientEndpoint
@Slf4j
@Component
public class WebsocketExample {
//本地session通道
private static Session session;
//wss连接地址 business可以为stock、crypto、common;apikey为您的凭证
private static final String WS_URL = "wss://data.infoway.io/ws?business=crypto&apikey=yourApikey";
@PostConstruct
public void connectAll() {
try {
//建立WEBSOCKET连接
connect(WS_URL);
//开启自动重连
startReconnection(WS_URL);
} catch (Exception e) {
log.error("Failed to connect to " + WS_URL + ": " + e.getMessage());
}
}
//自动重连机制,会开启一个定时线程判断连接是否断开,断开自动重连
private void startReconnection(String s) {
ScheduledExecutorService usExecutor = Executors.newScheduledThreadPool(1);
Runnable usTask = () -> {
if (session == null || !session.isOpen()) {
log.debug("reconnection...");
connect(s);
}
};
usExecutor.scheduleAtFixedRate(usTask, 1000, 10000, TimeUnit.MILLISECONDS);
}
//建立WEBSOCKET连接具体实现
private void connect(String s) {
try {
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
session = container.connectToServer(WebsocketExample.class, URI.create(s));
} catch (DeploymentException | IOException e) {
log.error("Failed to connect to the server: {}", e.getMessage());
}
}
//WEBSOCKET连接建立成功后会执行下面的方法
@OnOpen
public void onOpen(Session session) throws IOException, InterruptedException {
//WEBSOCKET连接建立成功会执行该方法
System.out.println("Connection opened: " + session.getId());
// 发送实时成交明细订阅请求
JSONObject tradeSendObj = new JSONObject();
//参考WEBSOCKET API 里面的不同请求的协议号,1000为订阅实时交易明细数据,文档:https://infoway.readme.io/reference/subscribe-trade#/
tradeSendObj.put("code", 10000);
//自定义trace
tradeSendObj.put("trace", "01213e9d-90a0-426e-a380-ebed633cba7a");
//封装订阅请求实体,json格式
JSONObject data = new JSONObject();
//订阅BTCUSDT
data.put("codes", "BTCUSDT");
tradeSendObj.put("data", data);
//发送实时成交明细订阅请求
session.getBasicRemote().sendText(tradeSendObj.toJSONString());
//-----------------------------------------------------------------//
//不同请求之间间隔一段时间
Thread.sleep(5000);
//发送实时盘口数据订阅请求,文档:https://infoway.readme.io/reference/subscribe-depth#/
JSONObject depthSendObj = new JSONObject();
//参考WEBSOCKET API 里面的不同请求的协议号,1003为订阅实时盘口数据
depthSendObj.put("code", 10003);
//自定义trace
depthSendObj.put("trace", "01213e9d-90a0-426e-a380-ebed633cba7a");
//封装订阅请求实体,json格式
depthSendObj.put("data", data);
//发送实时成交明细订阅请求
session.getBasicRemote().sendText(depthSendObj.toJSONString());
//-----------------------------------------------------------------//
//不同请求之间间隔一段时间
Thread.sleep(5000);
//发送实时k线数据订阅请求,文档:https://infoway.readme.io/reference/subscribe-candles#/
JSONObject klineSendObj = new JSONObject();
//参考WEBSOCKET API 里面的不同请求的协议号,1006为订阅K线数据
klineSendObj.put("code", 10006);
//自定义trace
klineSendObj.put("trace", "01213e9d-90a0-426e-a380-ebed633cba7a");
//封装订阅请求实体,json格式
JSONObject klineData = new JSONObject();
JSONArray klineDataArray = new JSONArray();
//封装订阅1分钟k线的实体
JSONObject kline1minObj = new JSONObject();
//klineType,可以查看文档:https://infoway.readme.io/reference/k%E7%BA%BF%E7%B1%BB%E5%9E%8B#/
kline1minObj.put("type", 1);
kline1minObj.put("codes", "BTCUSDT");
klineDataArray.add(kline1minObj);
klineData.put("arr", klineDataArray);
klineSendObj.put("data", klineData);
//发送实时成交明细订阅请求
session.getBasicRemote().sendText(klineSendObj.toJSONString());
//定时发送心跳任务,文档:https://infoway.readme.io/reference/heartbeat#/
ScheduledExecutorService pingExecutor = Executors.newScheduledThreadPool(1);
Runnable pingTask = WebsocketExample::ping;
pingExecutor.scheduleAtFixedRate(pingTask, 30, 30, TimeUnit.SECONDS);
}
@OnMessage
public void onMessage(String message, Session session) {
//会接收INFOWAY服务端返回的数据,包含订阅成功或失败的提示,以及正式的行情数据推送
try {
System.out.println("Message received: " + message);
} catch (Exception e) {
}
}
@OnClose
public void onClose(Session session, CloseReason reason) {
//WEBSOCKET连接关闭会回调该方法
System.out.println("Connection closed: " + session.getId() + ", reason: " + reason);
}
@OnError
public void onError(Throwable error) {
error.printStackTrace();
}
//持续性发送心跳,防止服务端长时间检查不到心跳请求关闭连接
public static void ping() {
try {
JSONObject jsonObject = new JSONObject();
jsonObject.put("code", 10010);
jsonObject.put("trace", "01213e9d-90a0-426e-a380-ebed633cba7a");
session.getBasicRemote().sendText(jsonObject.toJSONString());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}