From 79526ae58cd729e82937c0cbbeb88cdc226f5f7f Mon Sep 17 00:00:00 2001 From: Lok Chun Wai Date: Sun, 22 May 2022 23:00:57 +0800 Subject: [PATCH 1/3] Debug message --- lib/futu.ex | 5 +++-- lib/futu/gen_server/tcp.ex | 30 ++++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/lib/futu.ex b/lib/futu.ex index 7a135d8..8e95a25 100644 --- a/lib/futu.ex +++ b/lib/futu.ex @@ -131,10 +131,11 @@ defmodule Futu do rescue e in MatchError -> case e do - %{message: "TCP timeout"} -> - request(pid, module, opts) + # %{message: "TCP timeout"} -> + # request(pid, module, opts) %{term: {:error, "Message size exceed"}} -> + Logger.warn("Message size exceed") GenServer.stop(pid) {:error, "Message size exceed, proto_id: #{module.proto_id}"} diff --git a/lib/futu/gen_server/tcp.ex b/lib/futu/gen_server/tcp.ex index d9a57df..9c1281e 100644 --- a/lib/futu/gen_server/tcp.ex +++ b/lib/futu/gen_server/tcp.ex @@ -57,6 +57,8 @@ defmodule Futu.GenServer.TCP do end def handle_info({:tcp, _socket, msg}, state) do + print_message(msg) + case Response.get_proto_id(msg) do {:ok, 1004} -> {:noreply, state} @@ -118,4 +120,32 @@ defmodule Futu.GenServer.TCP do defp reset_state(state) do %{state | msg: "", proto_id: nil, is_occupied: false} end + + defp is_message_head?(packet) do + case Response.get_proto_id(packet) do + {:ok, _proto_id} -> true + _ -> false + end + end + + defp print_message(message) do + payload = inspect(:binary.bin_to_list(message), limit: :infinity) + Logger.info("is head?: #{inspect(is_message_head?(message))}") + Logger.info("message: #{payload}") + Logger.info("length: #{byte_size(message)}") + + case Response.get_proto_id(message) do + {:ok, proto_id} -> + Logger.info("proto_id: #{proto_id}") + + body_size = Response.get_body_size(message) + total_msg_size = body_size + Response.header_length() + Logger.info("supposed length: #{total_msg_size}") + + _ -> + nil + end + + Logger.info("-------------------------------------------------") + end end From 61fc64e727b19e9421c0fd5bc857db83821424aa Mon Sep 17 00:00:00 2001 From: Lok Chun Wai Date: Thu, 26 May 2022 23:20:48 +0800 Subject: [PATCH 2/3] Add buffer --- lib/futu/gen_server/tcp.ex | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/lib/futu/gen_server/tcp.ex b/lib/futu/gen_server/tcp.ex index 9c1281e..55cf335 100644 --- a/lib/futu/gen_server/tcp.ex +++ b/lib/futu/gen_server/tcp.ex @@ -27,7 +27,12 @@ defmodule Futu.GenServer.TCP do host_charlist = String.to_charlist(host) {:ok, socket} = - :gen_tcp.connect(host_charlist, port, [:binary, active: true, keepalive: true]) + :gen_tcp.connect(host_charlist, port, [ + :binary, + active: true, + keepalive: true, + buffer: 1024 * 1024 + ]) {:ok, %{socket: socket, from: nil, is_occupied: false, msg: "", proto_id: nil}} end From 0a3a6f7995af0147e556f91dd7e42650dec7fc7f Mon Sep 17 00:00:00 2001 From: Lok Chun Wai Date: Fri, 27 May 2022 10:45:38 +0800 Subject: [PATCH 3/3] Fix heartbeat occupied --- lib/futu.ex | 2 +- lib/futu/gen_server/heartbeat.ex | 1 + lib/futu/gen_server/tcp.ex | 6 +++--- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/lib/futu.ex b/lib/futu.ex index 8e95a25..fe9cb6e 100644 --- a/lib/futu.ex +++ b/lib/futu.ex @@ -152,7 +152,7 @@ defmodule Futu do end end - defp wait_until_free(pid) do + def wait_until_free(pid) do case GenServer.call(pid, :is_occupied) do true -> :timer.sleep(100) diff --git a/lib/futu/gen_server/heartbeat.ex b/lib/futu/gen_server/heartbeat.ex index 5cf47d8..6a5b656 100644 --- a/lib/futu/gen_server/heartbeat.ex +++ b/lib/futu/gen_server/heartbeat.ex @@ -21,6 +21,7 @@ defmodule Futu.GenServer.HeartBeat do def handle_info({:heartbeat, tcp_pid, interval}, state) do msg = Futu._heartbeat() + Futu.wait_until_free(tcp_pid) GenServer.cast(tcp_pid, {:send_heartbeat, msg}) schedule_heartbeat(tcp_pid, interval) {:noreply, state} diff --git a/lib/futu/gen_server/tcp.ex b/lib/futu/gen_server/tcp.ex index 55cf335..412898d 100644 --- a/lib/futu/gen_server/tcp.ex +++ b/lib/futu/gen_server/tcp.ex @@ -39,7 +39,7 @@ defmodule Futu.GenServer.TCP do def handle_call({:send, msg, proto_id}, from, state) do :ok = :gen_tcp.send(state.socket, msg) - new_state = %{state | from: from, msg: "", proto_id: proto_id} + new_state = %{state | from: from, msg: "", proto_id: proto_id, is_occupied: true} {:noreply, new_state} end @@ -66,7 +66,7 @@ defmodule Futu.GenServer.TCP do case Response.get_proto_id(msg) do {:ok, 1004} -> - {:noreply, state} + {:noreply, reset_state(state)} {:ok, 2218} -> {:noreply, state} @@ -111,7 +111,7 @@ defmodule Futu.GenServer.TCP do {:noreply, reset_state(new_state)} true -> - {:noreply, %{new_state | is_occupied: true}} + {:noreply, new_state} end end end