Skip to content
Open
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ src/*.swp
src/*/*.swp
c_src/*.o
priv/*.so
logs/*
test/*.beam
32 changes: 30 additions & 2 deletions c_src/nsync_nifs.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ int decompress1(ErlNifEnv *env, ErlNifBinary *source, ErlNifBinary *target) {
int bufsize;
double expansion_factor = 1.1;
unsigned int result;
while(expansion_factor < 2.5) {
while(expansion_factor < 6) {
bufsize = (int) source->size * expansion_factor;
bufsize = bufsize < 66 ? 66 : bufsize;
enif_alloc_binary_compat(env, bufsize, target);
Expand All @@ -56,7 +56,7 @@ int compress1(ErlNifEnv *env, ErlNifBinary *source, ErlNifBinary *target) {
int bufsize;
double expansion_factor = 1.1;
int result;
while(expansion_factor < 2.5) {
while(expansion_factor < 6) {
bufsize = (int) source->size * expansion_factor;
bufsize = bufsize < 66 ? 66 : bufsize;
enif_alloc_binary_compat(env, bufsize, target);
Expand Down Expand Up @@ -89,6 +89,33 @@ static ERL_NIF_TERM decompress_nif(ErlNifEnv* env, int argc, const ERL_NIF_TERM
}
return retval;
}

static ERL_NIF_TERM decompress2_nif(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) {
ERL_NIF_TERM retval;
ErlNifBinary source;
ErlNifBinary target;
unsigned int decompressed_length;
unsigned int final_length;

if (argc != 2 ||
!enif_inspect_binary(env, argv[0], &source) ||
!enif_get_uint(env, argv[1], &decompressed_length) ||
decompressed_length < source.size ||
!enif_alloc_binary_compat(env, decompressed_length, &target)) {
return enif_make_badarg(env);
}

final_length = lzf_decompress(source.data, source.size, target.data, target.size);
if (final_length > 0 &&
final_length == decompressed_length) {
enif_realloc_binary_compat(env, &target, final_length);
retval = enif_make_binary(env, &target);
} else {
enif_release_binary_compat(env, &target);
retval = enif_make_badarg(env);
}
return retval;
}

ERL_NIF_TERM compress_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
ERL_NIF_TERM retval;
Expand All @@ -109,6 +136,7 @@ ERL_NIF_TERM compress_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {

static ErlNifFunc nif_funcs[] = {
{"decompress", 1, decompress_nif},
{"decompress", 2, decompress2_nif},
{"compress", 1, compress_nif}
};

Expand Down
6 changes: 5 additions & 1 deletion src/lzf.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
%% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
%% OTHER DEALINGS IN THE SOFTWARE.
-module(lzf).
-export([compress/1, decompress/1]).
-export([compress/1
,decompress/1
,decompress/2]).

-on_load(init/0).

Expand All @@ -39,3 +41,5 @@ compress(_X) ->
decompress(_X) ->
exit(nif_library_not_loaded).

decompress(_, _) ->
exit(nif_library_not_loaded).
10 changes: 6 additions & 4 deletions src/nsync.erl
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,16 @@ start_link(Opts) ->
%%====================================================================
init([Opts, CallerPid]) ->
case init_state(Opts, CallerPid) of
{ok, State} ->
{ok, State = #state{}} ->
{ok, State};
Error ->
{stop, Error}
end.

handle_call(_Request, _From, State) ->
handle_call(_Request, _From, State = #state{}) ->
{reply, ignore, State}.

handle_cast(_Msg, State) ->
handle_cast(_Msg, State = #state{}) ->
{noreply, State}.

handle_info({tcp, Socket, Data}, #state{callback=Callback,
Expand Down Expand Up @@ -146,7 +146,7 @@ handle_info({tcp_error, _ ,_}, #state{callback=Callback,
{stop, Error, State}
end;

handle_info(_Info, State) ->
handle_info(_Info, State = #state{}) ->
{noreply, State}.

terminate(_Reason, _State) ->
Expand Down Expand Up @@ -216,6 +216,8 @@ authenticate(Socket, Auth) ->
ok;
{ok, <<"+OK\r\n">>} ->
ok;
{ok, Other} ->
{error, {ok, Other}};
Error ->
Error
end;
Expand Down
9 changes: 8 additions & 1 deletion src/nsync.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,14 @@
-define(REDIS_ZSET, 3).
-define(REDIS_HASH, 4).

-define(REDIS_EXPIRETIME, 253).
-define(REDIS_ZMAP, 9).
-define(REDIS_ZLIST, 10).
-define(REDIS_INTSET, 11).
-define(REDIS_SSZLIST, 12).
-define(REDIS_HMAPZLIST, 13).

-define(REDIS_EXPIRETIME_MS, 252).
-define(REDIS_EXPIRETIME_SEC, 253).
-define(REDIS_SELECTDB, 254).
-define(REDIS_EOF, 255).

Expand Down
168 changes: 153 additions & 15 deletions src/rdb_load.erl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%% Copyright (c) 2011 Jacob Vorreuter <jacob.vorreuter@gmail.com>
%%
%%
%% Permission is hereby granted, free of charge, to any person
%% obtaining a copy of this software and associated documentation
%% files (the "Software"), to deal in the Software without
Expand All @@ -8,10 +8,10 @@
%% copies of the Software, and to permit persons to whom the
%% Software is furnished to do so, subject to the following
%% conditions:
%%
%%
%% The above copyright notice and this permission notice shall be
%% included in all copies or substantial portions of the Software.
%%
%%
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
%% EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
%% OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
Expand All @@ -21,12 +21,20 @@
%% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
%% OTHER DEALINGS IN THE SOFTWARE.
-module(rdb_load).
-export([packet/3]).
-export([packet/3
,load_file/2
]).

-record(state, {first = true, buffer = <<>>}).

-include("nsync.hrl").

load_file(FileName, Callback) ->
{ok, File} = file:read_file(FileName),
Data = iolist_to_binary(["$", integer_to_list(byte_size(File)),
"\r\n", File]),
packet(undefined, Data, Callback).

packet(State, Data, Callback) when State == undefined orelse State#state.first == true ->
Data1 =
case State of
Expand All @@ -39,9 +47,11 @@ packet(State, Data, Callback) when State == undefined orelse State#state.first =
{ok, Rest} ->
case parse_len(Rest) of
{ok, _Len, Rest1} ->
case parse_rdb_version(Rest1) of
case parse_rdb_version(Rest1) of
{ok, Rest2, Vsn} ->
Vsn /= <<"0001">> andalso exit({error, vsn_not_supported}),
lists:member(Vsn, [<<"0001">>,<<"0002">>,<<"0003">>,
<<"0004">>,<<"0005">>,<<"0006">>])
orelse exit({error, vsn_not_supported}),
packet(#state{buffer = <<>>, first = false}, Rest2, Callback);
{error, eof} ->
#state{buffer = Data}
Expand Down Expand Up @@ -115,12 +125,19 @@ parse(Data, Callback) ->
{ok, Type, Rest} = rdb_type(Data),
parse(Type, Rest, Callback).

parse(?REDIS_EXPIRETIME, Data, Callback) ->
parse(?REDIS_EXPIRETIME_MS, Data, Callback) ->
case Data of
<<_Time:64/unsigned-integer, Rest/binary>> ->
parse(Rest, Callback);
_ ->
{ok, <<?REDIS_EXPIRETIME_MS, Data/binary>>}
end;
parse(?REDIS_EXPIRETIME_SEC, Data, Callback) ->
case Data of
<<_Time:32/unsigned-integer, Rest/binary>> ->
parse(Rest, Callback);
_ ->
{ok, <<?REDIS_EXPIRETIME, Data/binary>>}
{ok, <<?REDIS_EXPIRETIME_SEC, Data/binary>>}
end;

parse(?REDIS_EOF, Rest, _Callback) ->
Expand Down Expand Up @@ -196,7 +213,7 @@ rdb_len(<<Type, Rest/binary>>) ->
end.

rdb_string_object(Data) ->
rdb_generic_string_object(Data, false).
rdb_generic_string_object(Data, false).

rdb_encoded_string_object(Data) ->
rdb_generic_string_object(Data, true).
Expand All @@ -215,7 +232,7 @@ rdb_generic_string_object(Data, _Encode) ->
?REDIS_RDB_ENC_LZF ->
rdb_lzf_string_object(Rest);
_ ->
exit("Unknown RDB encoding type")
exit({"Unknown RDB encoding type", Len})
end;
false ->
case Rest of
Expand Down Expand Up @@ -253,12 +270,12 @@ rdb_double_value(Data) ->

rdb_lzf_string_object(Data) ->
{ok, _Enc1, LzfLen, Rest} = rdb_len(Data),
{ok, _Enc2, _UncompLen, Rest1} = rdb_len(Rest),
{ok, _Enc2, UncompLen, Rest1} = rdb_len(Rest),
case Rest1 of
<<LzfEnc:LzfLen/binary, Rest2/binary>> ->
case (catch lzf:decompress(LzfEnc)) of
case (catch lzf:decompress(LzfEnc, UncompLen)) of
{'EXIT', _Err} ->
error_logger:error_msg("failed lzf_decompress(~p)~n", [LzfEnc]),
error_logger:error_msg("failed lzf_decompress(~p, ~p)~n", [LzfEnc, UncompLen]),
{ok, <<"">>, Rest2};
Str ->
{ok, Str, Rest2}
Expand All @@ -268,7 +285,7 @@ rdb_lzf_string_object(Data) ->
end.

rdb_load_object(_Type, <<>>) ->
exit({error, eof});
exit({error, eof});

rdb_load_object(?REDIS_STRING, Data) ->
rdb_encoded_string_object(Data);
Expand All @@ -289,8 +306,28 @@ rdb_load_object(?REDIS_HASH, Data) ->
{ok, _Enc, Size, Rest} = rdb_len(Data),
parse_hash_props(Size, Rest, dict:new());

rdb_load_object(?REDIS_ZMAP, Data) ->
{ok, List, Rest} = parse_zmap_vals(Data),
Dict = to_hash(List),
{ok, Dict, Rest};

rdb_load_object(?REDIS_ZLIST, Data) ->
parse_zlist_vals(Data);

rdb_load_object(?REDIS_INTSET, Data) ->
parse_intset_vals(Data);

rdb_load_object(?REDIS_SSZLIST, Data) ->
{ok, List, Rest} = parse_zlist_vals(Data),
Set = to_sorted_set(List),
{ok, Set, Rest};

rdb_load_object(?REDIS_HMAPZLIST, Data) ->
{ok, List, Rest} = parse_zlist_vals(Data),
Dict = to_hash(List),
{ok, Dict, Rest};

rdb_load_object(_Type, _Data) ->
io:format("unknown object type: ~p~n", [_Type]),
exit("Unknown object type").

parse_list_vals(0, Rest, Acc) ->
Expand All @@ -315,3 +352,104 @@ parse_hash_props(Size, Rest, Acc) ->
{ok, Key, Rest1} = rdb_encoded_string_object(Rest),
{ok, Val, Rest2} = rdb_encoded_string_object(Rest1),
parse_hash_props(Size-1, Rest2, dict:store(Key, Val, Acc)).


parse_zmap_vals(Data) ->
{ok, Str, Rest1} = rdb_encoded_string_object(Data),
<<_:8/unsigned-little, Rest/binary>> = Str,
{ok, parse_zmap_entry(Rest), Rest1}.

parse_zmap_entry(<<255>>) ->
[];
parse_zmap_entry(<<253, Len:32/little-unsigned, Entries/binary>>) ->
<<Entry:Len/binary, Free, ToSkip/binary>> = Entries,
<<_:Free/binary, Rest/binary>> = ToSkip,
[maybe_int(Entry) | parse_zmap_entry(Rest)];
parse_zmap_entry(<<Len:8, Entries/binary>>) ->
<<Entry:Len/binary, Free, ToSkip/binary>> = Entries,
<<_:Free/binary, Rest/binary>> = ToSkip,
[maybe_int(Entry) | parse_zmap_entry(Rest)].


maybe_int(Bin) ->
try
_ = list_to_integer(L = binary_to_list(Bin)),
L
catch
error:badarg ->
Bin
end.

parse_zlist_vals(Data) ->
{ok, Str, Rest1} = rdb_encoded_string_object(Data),
<<_ZlBytes:32/little-unsigned,
_ZlTail:32/little-unsigned,
ZlLen:16/little-unsigned,
Entries/binary>> = Str,
{ok, parse_zlist_entries(ZlLen, Entries), Rest1}.

parse_zlist_entries(0, <<255>>) ->
[];
parse_zlist_entries(Len, <<254:8/unsigned, _Prev:32, Entries/binary>>) ->
{Entry, Rest} = parse_zlist_entry(Entries),
[Entry | parse_zlist_entries(Len-1, Rest)];
parse_zlist_entries(Len, <<_Prev:8/unsigned, Entries/binary>>) ->
{Entry, Rest} = parse_zlist_entry(Entries),
[Entry | parse_zlist_entries(Len-1, Rest)].

%% String value with length less than or equal to 63 bytes (6 bits).
parse_zlist_entry(<<0:2, Len:6/little-unsigned, Entries/binary>>) ->
<<Entry:Len/binary, Rest/binary>> = Entries,
{Entry, Rest};
%% String value with length less than or equal to 16383 bytes (14 bits).
parse_zlist_entry(<<0:1,1:1, Len:14/little-unsigned, Entries/binary>>) ->
<<Entry:Len/binary, Rest/binary>> = Entries,
{Entry, Rest};
%% String value with length greater than or equal to 16384 bytes.
parse_zlist_entry(<<1:1,0:1,_:6, Len:32/little-unsigned, Entries/binary>>) ->
<<Entry:Len/binary, Rest/binary>> = Entries,
{Entry, Rest};
%% Read next 2 bytes as a 16 bit signed integer
parse_zlist_entry(<<1:1,1:1,0:2,_:4, Int:16/little-signed, Rest/binary>>) ->
{integer_to_list(Int), Rest};
%% Read next 4 bytes as a 32 bit signed integer
parse_zlist_entry(<<1:1,1:1,0:1,1:1,_:4, Int:32/little-signed, Rest/binary>>) ->
{integer_to_list(Int), Rest};
%% Read next 8 bytes as a 64 bit signed integer
parse_zlist_entry(<<1:1,1:1,1:1,0:1,_:4, Int:64/little-signed, Rest/binary>>) ->
{integer_to_list(Int), Rest};
%% Read next 3 bytes as a 24 bit signed integer
parse_zlist_entry(<<1:1,1:1,1:1,1:1,0:4, Int:24/little-signed, Rest/binary>>) ->
{integer_to_list(Int), Rest};
%% Read next byte as an 8 bit signed integer
parse_zlist_entry(<<1:1, 1:1, 1:1, 1:1, 1:1, 1:1, 1:1, 0:1, Int:8/little-signed, Rest/binary>>) ->
{integer_to_list(Int), Rest};
%% immediate 4 bit integer. Unsigned integer from 0 to 12.
%% The encoded value is actually from 1 to 13 because 0000 and 1111 can not
%% be used, so 1 should be subtracted from the encoded 4 bit value to
%% obtain the right value
parse_zlist_entry(<<1:1,1:1,1:1,1:1, Val:4/little-unsigned, Rest/binary>>) ->
{integer_to_list(Val-1), Rest}.

parse_intset_vals(Data) ->
{ok, Str, Rest1} = rdb_encoded_string_object(Data),
<<Encoding:32/little-unsigned, % byte size of integers
Length:32/little-unsigned,
Entries/binary>> = Str,
{ok, parse_intset_entries(Encoding*8,Length,Entries), Rest1}.

parse_intset_entries(_Size, 0, <<>>) ->
[];
parse_intset_entries(Size, N, Entries) ->
<<Int:Size/little-signed, Rest/binary>> = Entries,
[integer_to_list(Int) | parse_intset_entries(Size, N-1, Rest)].

to_hash(L) -> to_hash(L, dict:new()).

to_hash([], Dict) -> Dict;
to_hash([K,V|L], Dict) -> to_hash(L, dict:store(K,V,Dict)).

to_sorted_set(L) -> lists:sort(to_set(L)).

to_set([]) -> [];
to_set([Val,Weight|L]) -> [{Weight, Val} | to_set(L)].
Loading