Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions lib/futu.ex
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,11 @@ defmodule Futu do
rescue
e in MatchError ->
case e do
%{message: "TCP timeout"} ->
request(pid, module, opts)
# %{message: "TCP timeout"} ->
# request(pid, module, opts)

%{term: {:error, "Message size exceed"}} ->
Logger.warn("Message size exceed")
GenServer.stop(pid)
{:error, "Message size exceed, proto_id: #{module.proto_id}"}

Expand All @@ -151,7 +152,7 @@ defmodule Futu do
end
end

defp wait_until_free(pid) do
def wait_until_free(pid) do
case GenServer.call(pid, :is_occupied) do
true ->
:timer.sleep(100)
Expand Down
1 change: 1 addition & 0 deletions lib/futu/gen_server/heartbeat.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ defmodule Futu.GenServer.HeartBeat do

def handle_info({:heartbeat, tcp_pid, interval}, state) do
msg = Futu._heartbeat()
Futu.wait_until_free(tcp_pid)
GenServer.cast(tcp_pid, {:send_heartbeat, msg})
schedule_heartbeat(tcp_pid, interval)
{:noreply, state}
Expand Down
43 changes: 39 additions & 4 deletions lib/futu/gen_server/tcp.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,19 @@ defmodule Futu.GenServer.TCP do
host_charlist = String.to_charlist(host)

{:ok, socket} =
:gen_tcp.connect(host_charlist, port, [:binary, active: true, keepalive: true])
:gen_tcp.connect(host_charlist, port, [
:binary,
active: true,
keepalive: true,
buffer: 1024 * 1024
])

{:ok, %{socket: socket, from: nil, is_occupied: false, msg: "", proto_id: nil}}
end

def handle_call({:send, msg, proto_id}, from, state) do
:ok = :gen_tcp.send(state.socket, msg)
new_state = %{state | from: from, msg: "", proto_id: proto_id}
new_state = %{state | from: from, msg: "", proto_id: proto_id, is_occupied: true}

{:noreply, new_state}
end
Expand All @@ -57,9 +62,11 @@ defmodule Futu.GenServer.TCP do
end

def handle_info({:tcp, _socket, msg}, state) do
print_message(msg)

case Response.get_proto_id(msg) do
{:ok, 1004} ->
{:noreply, state}
{:noreply, reset_state(state)}

{:ok, 2218} ->
{:noreply, state}
Expand Down Expand Up @@ -104,7 +111,7 @@ defmodule Futu.GenServer.TCP do
{:noreply, reset_state(new_state)}

true ->
{:noreply, %{new_state | is_occupied: true}}
{:noreply, new_state}
end
end
end
Expand All @@ -118,4 +125,32 @@ defmodule Futu.GenServer.TCP do
defp reset_state(state) do
%{state | msg: "", proto_id: nil, is_occupied: false}
end

defp is_message_head?(packet) do
case Response.get_proto_id(packet) do
{:ok, _proto_id} -> true
_ -> false
end
end

defp print_message(message) do
payload = inspect(:binary.bin_to_list(message), limit: :infinity)
Logger.info("is head?: #{inspect(is_message_head?(message))}")
Logger.info("message: #{payload}")
Logger.info("length: #{byte_size(message)}")

case Response.get_proto_id(message) do
{:ok, proto_id} ->
Logger.info("proto_id: #{proto_id}")

body_size = Response.get_body_size(message)
total_msg_size = body_size + Response.header_length()
Logger.info("supposed length: #{total_msg_size}")

_ ->
nil
end

Logger.info("-------------------------------------------------")
end
end