diff --git a/lib/futu.ex b/lib/futu.ex index 7a135d8..fe9cb6e 100644 --- a/lib/futu.ex +++ b/lib/futu.ex @@ -131,10 +131,11 @@ defmodule Futu do rescue e in MatchError -> case e do - %{message: "TCP timeout"} -> - request(pid, module, opts) + # %{message: "TCP timeout"} -> + # request(pid, module, opts) %{term: {:error, "Message size exceed"}} -> + Logger.warn("Message size exceed") GenServer.stop(pid) {:error, "Message size exceed, proto_id: #{module.proto_id}"} @@ -151,7 +152,7 @@ defmodule Futu do end end - defp wait_until_free(pid) do + def wait_until_free(pid) do case GenServer.call(pid, :is_occupied) do true -> :timer.sleep(100) diff --git a/lib/futu/gen_server/heartbeat.ex b/lib/futu/gen_server/heartbeat.ex index 5cf47d8..6a5b656 100644 --- a/lib/futu/gen_server/heartbeat.ex +++ b/lib/futu/gen_server/heartbeat.ex @@ -21,6 +21,7 @@ defmodule Futu.GenServer.HeartBeat do def handle_info({:heartbeat, tcp_pid, interval}, state) do msg = Futu._heartbeat() + Futu.wait_until_free(tcp_pid) GenServer.cast(tcp_pid, {:send_heartbeat, msg}) schedule_heartbeat(tcp_pid, interval) {:noreply, state} diff --git a/lib/futu/gen_server/tcp.ex b/lib/futu/gen_server/tcp.ex index d9a57df..412898d 100644 --- a/lib/futu/gen_server/tcp.ex +++ b/lib/futu/gen_server/tcp.ex @@ -27,14 +27,19 @@ defmodule Futu.GenServer.TCP do host_charlist = String.to_charlist(host) {:ok, socket} = - :gen_tcp.connect(host_charlist, port, [:binary, active: true, keepalive: true]) + :gen_tcp.connect(host_charlist, port, [ + :binary, + active: true, + keepalive: true, + buffer: 1024 * 1024 + ]) {:ok, %{socket: socket, from: nil, is_occupied: false, msg: "", proto_id: nil}} end def handle_call({:send, msg, proto_id}, from, state) do :ok = :gen_tcp.send(state.socket, msg) - new_state = %{state | from: from, msg: "", proto_id: proto_id} + new_state = %{state | from: from, msg: "", proto_id: proto_id, is_occupied: true} {:noreply, new_state} end @@ -57,9 +62,11 @@ defmodule Futu.GenServer.TCP do end def handle_info({:tcp, _socket, msg}, state) do + print_message(msg) + case Response.get_proto_id(msg) do {:ok, 1004} -> - {:noreply, state} + {:noreply, reset_state(state)} {:ok, 2218} -> {:noreply, state} @@ -104,7 +111,7 @@ defmodule Futu.GenServer.TCP do {:noreply, reset_state(new_state)} true -> - {:noreply, %{new_state | is_occupied: true}} + {:noreply, new_state} end end end @@ -118,4 +125,32 @@ defmodule Futu.GenServer.TCP do defp reset_state(state) do %{state | msg: "", proto_id: nil, is_occupied: false} end + + defp is_message_head?(packet) do + case Response.get_proto_id(packet) do + {:ok, _proto_id} -> true + _ -> false + end + end + + defp print_message(message) do + payload = inspect(:binary.bin_to_list(message), limit: :infinity) + Logger.info("is head?: #{inspect(is_message_head?(message))}") + Logger.info("message: #{payload}") + Logger.info("length: #{byte_size(message)}") + + case Response.get_proto_id(message) do + {:ok, proto_id} -> + Logger.info("proto_id: #{proto_id}") + + body_size = Response.get_body_size(message) + total_msg_size = body_size + Response.header_length() + Logger.info("supposed length: #{total_msg_size}") + + _ -> + nil + end + + Logger.info("-------------------------------------------------") + end end