diff --git a/src/main/java/com/binance/connector/client/WebSocketStreamClient.java b/src/main/java/com/binance/connector/client/WebSocketStreamClient.java index 58d97ef5..1e8f8620 100644 --- a/src/main/java/com/binance/connector/client/WebSocketStreamClient.java +++ b/src/main/java/com/binance/connector/client/WebSocketStreamClient.java @@ -37,6 +37,8 @@ public interface WebSocketStreamClient { int listenUserStream(String listenKey, WebSocketOpenCallback onOpenCallback, WebSocketMessageCallback onMessageCallback, WebSocketClosingCallback onClosingCallback, WebSocketClosedCallback onClosedCallback, WebSocketFailureCallback onFailureCallback); int combineStreams(ArrayList streams, WebSocketMessageCallback callback); int combineStreams(ArrayList streams, WebSocketOpenCallback onOpenCallback, WebSocketMessageCallback onMessageCallback, WebSocketClosingCallback onClosingCallback, WebSocketClosedCallback onClosedCallback, WebSocketFailureCallback onFailureCallback); + void subscribeCombineStreams(ArrayList streams, int connectionId); + void unsubscribeCombineStreams(ArrayList streams, int connectionId); void closeConnection(int streamId); void closeAllConnections(); } diff --git a/src/main/java/com/binance/connector/client/impl/WebSocketStreamClientImpl.java b/src/main/java/com/binance/connector/client/impl/WebSocketStreamClientImpl.java index d7587ced..9624f5e0 100644 --- a/src/main/java/com/binance/connector/client/impl/WebSocketStreamClientImpl.java +++ b/src/main/java/com/binance/connector/client/impl/WebSocketStreamClientImpl.java @@ -5,6 +5,7 @@ import java.util.Iterator; import java.util.Map; +import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -563,6 +564,58 @@ public int combineStreams(ArrayList streams, WebSocketOpenCallback onOpe return createConnection(onOpenCallback, onMessageCallback, onClosingCallback, onClosedCallback, onFailureCallback, request); } + /** + * Subscribe stream names using combine stream connection that already initiated + * + * @param streams ArrayList of stream names to be subscribed
+ * @param connectionId Id of initiated combined stream connection that will be used for sending subscribe request + * + * @see + * https://binance-docs.github.io/apidocs/spot/en/#live-subscribing-unsubscribing-to-streams + */ + @Override + public void subscribeCombineStreams(ArrayList streams, int connectionId) { + if (connections.containsKey(connectionId)) { + WebSocketConnection connection = connections.get(connectionId); + + JSONObject params = new JSONObject(); + params.put("method", "SUBSCRIBE"); + params.put("params", streams); + params.put("id", System.currentTimeMillis()); + + logger.info("Sending subscribe request to connection id {} with stream {}", connectionId, streams); + connection.send(params.toString()); + } else { + logger.info("Connection ID {} does not exist!", connectionId); + } + } + + /** + * Unsubscribe stream names using combine stream connection that already initiated + * + * @param streams ArrayList of stream names to be unsubscribed
+ * @param connectionId Id of initiated combined stream connection that will be used for sending unsubscribe request + * + * @see + * https://binance-docs.github.io/apidocs/spot/en/#live-subscribing-unsubscribing-to-streams + */ + @Override + public void unsubscribeCombineStreams(ArrayList streams, int connectionId) { + if (connections.containsKey(connectionId)) { + WebSocketConnection connection = connections.get(connectionId); + + JSONObject params = new JSONObject(); + params.put("method", "UNSUBSCRIBE"); + params.put("params", streams); + params.put("id", System.currentTimeMillis()); + + logger.info("Sending unsubscribe request to connection id {} with stream {}", connectionId, streams); + connection.send(params.toString()); + } else { + logger.info("Connection ID {} does not exist!", connectionId); + } + } + /** * Closes a specific stream based on stream ID. * diff --git a/src/test/java/examples/websocketstream/SubscribeCombineStreams.java b/src/test/java/examples/websocketstream/SubscribeCombineStreams.java new file mode 100644 index 00000000..d8bfab1c --- /dev/null +++ b/src/test/java/examples/websocketstream/SubscribeCombineStreams.java @@ -0,0 +1,31 @@ +package examples.websocketstream; + +import com.binance.connector.client.WebSocketStreamClient; +import com.binance.connector.client.impl.WebSocketStreamClientImpl; +import java.util.ArrayList; + +public final class SubscribeCombineStreams { + private SubscribeCombineStreams() { + } + + public static void main(String[] args) throws InterruptedException { + final long sleepTime = 3000; + WebSocketStreamClient client = new WebSocketStreamClientImpl(); + + ArrayList streams = new ArrayList<>(); + streams.add("btcusdt@trade"); + streams.add("bnbusdt@trade"); + + int connectionId = client.combineStreams(streams, ((event) -> { + System.out.println(event); + })); + + ArrayList streamsToSubscribe = new ArrayList<>(); + streamsToSubscribe.add("solusdt@trade"); + streamsToSubscribe.add("ltcusdt@trade"); + client.subscribeCombineStreams(streamsToSubscribe, connectionId); + + Thread.sleep(sleepTime); + client.closeAllConnections(); + } +} diff --git a/src/test/java/examples/websocketstream/UnsubscribeCombineStreams.java b/src/test/java/examples/websocketstream/UnsubscribeCombineStreams.java new file mode 100644 index 00000000..04996f69 --- /dev/null +++ b/src/test/java/examples/websocketstream/UnsubscribeCombineStreams.java @@ -0,0 +1,28 @@ +package examples.websocketstream; + +import com.binance.connector.client.WebSocketStreamClient; +import com.binance.connector.client.impl.WebSocketStreamClientImpl; +import java.util.ArrayList; + +public final class UnsubscribeCombineStreams { + private UnsubscribeCombineStreams() { + } + + public static void main(String[] args) throws InterruptedException { + final long sleepTime = 3000; + WebSocketStreamClient client = new WebSocketStreamClientImpl(); + + ArrayList streams = new ArrayList<>(); + streams.add("btcusdt@trade"); + streams.add("bnbusdt@trade"); + + int connectionId = client.combineStreams(streams, ((event) -> { + System.out.println(event); + })); + + client.unsubscribeCombineStreams(streams, connectionId); + + Thread.sleep(sleepTime); + client.closeAllConnections(); + } +}