Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
2c04738
Prepared implementation of rolling checksums
314eter Aug 5, 2014
1e6c6f3
Checksum validation
314eter Aug 7, 2014
33e16c5
Removed commented code
314eter Aug 7, 2014
843a72e
Changed nodenames to make tests independent
314eter Aug 7, 2014
1849998
Removed Uuused variable
314eter Aug 7, 2014
4a194a7
Raise ChecksumError
314eter Aug 7, 2014
d04a32f
Compatibility with old format (no checksums)
314eter Aug 7, 2014
5784c07
Use old tlog format for values with no checksums
314eter Aug 7, 2014
209b373
Fixed TODO: _previous_i_entry = _previous_entry at start
314eter Aug 8, 2014
b8956be
Moved checksum validation to log_value_explicit
314eter Aug 8, 2014
849edb7
Added OUnit test
314eter Aug 8, 2014
c081051
New system test and better error handling
314eter Aug 11, 2014
21ebbae
Better error msg in test
314eter Aug 11, 2014
738c031
Design document
314eter Aug 11, 2014
6eab894
collapser_test leaves head.db in root dir
314eter Aug 11, 2014
74c8ce9
Fixed wrong choice of previous_i_entry
314eter Aug 12, 2014
7ae2956
Removed options from Checksum module
314eter Aug 12, 2014
e12f217
Fixed some comments from pull request.
314eter Aug 13, 2014
e6f5229
New magic and upgrade path
314eter Aug 13, 2014
f0e08d2
Merge remote-tracking branch 'upstream/1.7' into checksum
314eter Aug 13, 2014
6cfc819
Disable checksum validation during catchup
314eter Aug 14, 2014
3fe9179
Save checksum in store, validation during catchup
314eter Aug 18, 2014
3dc63fe
Validate checksum in store
314eter Aug 19, 2014
84acf7a
Merge remote-tracking branch 'upstream/1.8' into checksum
314eter Aug 19, 2014
325e3a0
Fix merge with 1.8
314eter Aug 19, 2014
9426483
Updated design document
314eter Aug 19, 2014
f0fba76
Moved store validation to Store module
314eter Aug 20, 2014
787c955
Replaced LAST_ENTRIES and LAST_ENTRIES2 with LAST_ENTRIES3
314eter Aug 20, 2014
9ba4eea
Ensure i and checksum are updated simultaneously in store
314eter Aug 20, 2014
65f2401
Fixed some comments on pull request
314eter Aug 21, 2014
1235157
Add timeout to test power_failure
314eter Aug 21, 2014
c253991
Remove duplicate test_large_catchup_while_running
314eter Aug 21, 2014
e595752
Fix set_previous_checksum
314eter Aug 21, 2014
85659cc
Updated design document
314eter Aug 21, 2014
fdd6dd3
Validate checksums during catchup_store
314eter Aug 21, 2014
e34c6af
Use SSE4.2 in update_crc32c
314eter Aug 22, 2014
dc5cf62
Restored buildInSandbox.sh
314eter Sep 4, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions doc/design/rolling-checksums.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
=================
Rolling Checksums
=================

Problem
=======
If a node crashed, and failed to write some tlog entries to disk, this is not detected by Arakoon. The node announces it's in sync up to the last entry in the tlogs, even if other nodes diverged while the node was offline. Note that this should not happen if fsync is set to true (which is the default) and none of the layers below (file system, hardware) lie about fsync behaviour.

Another problematic situation occurs when for some reason tlog files, databases or even nodes end up in the wrong cluster. The nodes will continue as if nothing happened, and don't know they are diverged.

Example
-------
Consider this situation:

+----------------------------------+------------------------------------+------------------------------------+
| node0 | node1 | node 2 |
+==================================+====================================+====================================+
| 0:(Vm (node0,0.000000)) | 0:(Vm (node0,0.000000)) | 0:(Vm (node0,0.000000)) |
+----------------------------------+------------------------------------+------------------------------------+
| 1:(Vc ([Set;"a";1;"...";],false) | 1:(Vc ([Set;"a";1;"...";],false) | 1:(Vc ([Set;"a";1;"...";],false) |
+----------------------------------+------------------------------------+------------------------------------+
| 2:(Vc ([Set;"b";1;"...";],false) | *2:(Vc ([Set;"b";1;"...";],false)* | *2:(Vc ([Set;"b";1;"...";],false)* |
+----------------------------------+------------------------------------+------------------------------------+
| 3:(Vc ([Set;"c";1;"...";],false) | *3:(Vc ([Set;"c";1;"...";],false)* | |
+----------------------------------+------------------------------------+------------------------------------+

Node1 and node2 crashed, and the last tlog entries were lost. They are restarted, while node0 is still offline. When node0 comes back, this will result in the following situation:

+--------------------------------------+----------------------------------+----------------------------------+
| node0 | node1 | node 2 |
+======================================+==================================+==================================+
| 0:(Vm (node0,0.000000)) | 0:(Vm (node0,0.000000)) | 0:(Vm (node0,0.000000)) |
+--------------------------------------+----------------------------------+----------------------------------+
| 1:(Vc ([Set;"a";1;"...";],false) | 1:(Vc ([Set;"a";1;"...";],false) | 1:(Vc ([Set;"a";1;"...";],false) |
+--------------------------------------+----------------------------------+----------------------------------+
| **2:(Vc ([Set;"b";1;"...";],false)** | **2:(Vm (node1,0.000000))** | **2:(Vm (node1,0.000000))** |
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, node1 lost the 2 last tlog entries, and replaced them with others. This should not happen when you have an fsync between the writing of each entry, unless your mount options, file system, hardware are wanting.

So is this whole set of changes some kind of runtime detection of bad configuration or a hardware lie-detector?

+--------------------------------------+----------------------------------+----------------------------------+
| 3:(Vc ([Set;"d";1;"...";],false) | 3:(Vc ([Set;"d";1;"...";],false) | 3:(Vc ([Set;"d";1;"...";],false) |
+--------------------------------------+----------------------------------+----------------------------------+

Checksums
=========
This problem can be solved by using a rolling checksum, computed over all the entries in the tlogs. This checksum should be the same for all nodes. The checksum is part of the value that is synced with multi-paxos.

1. The client sends a request to the master node.
2. The master computes the rolling checksum, and makes a value of this checksum and the update commands.
3. This value is sent to the slaves in an accept request.
4. The slaves compute the rolling checksum, and compare it with the checksum in the value.
5. If the checksums are equal, the tlogs are in sync, the value is written to the tlogs, and the algorithm proceeds as usual.
6. If the checksums are different, something bad happened. The node halts, and the tlogs need to be inspected manually.

The catchup consists of two phases. In the first phase, the missing tlog entries are received from another node. The checksum of the first of these entries will be validated, to prevent a catchup from a diverged node. During the second phase, the tlog entries are replayed to the store, and all checksums are validated.

Remark
------
When several consecutive entries in the tlogs have the same number, it is only the last one that is agreed upon by multi-paxos, and thus only this entry is used in the computation of the checksum.

Tlog Specification
==================
* Serial number (int64)
* Crc-32 checksum of Cmd (int32)
* Cmd
- Value
- Marker, optional (string option)

Older value format
------------------
* Update
- Update type (int32 between 1 and 16)
- Update details (depends on type)
* Synced (bool)

Old value format
----------------
* 0xff (int32)
* Value type (char 'c' or 'm')
* Value details (depends on type)

New value format
----------------
* 0x100 (int32)
* Checksum (int32 if crc-32 is used)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe you want to dabble into MICs and MACs.

* Value type (char 'c' or 'm')
* Value details (depends on type)

Checksums in store
==================
The current tlog index and checksum are stored in the local store. When a node starts, they are compared with the values in the tlog.
Thus, after a collapse, the checksum of the last collapsed value is saved in the head database. Therefore the rolling tlog checksum is a continuation of the checksum in the head database.

Upgrade Path
============
To upgrade Arakoon to the new version, with a new tlog format, the nodes need to be restarted. A node that restarts after the upgrade can still read the old tlogs, and the checksums of these values will be set to None. Values with checksum None will never be valuated.

Nodes that need to do a catchup will do this as usual. The first received values will have checksum None, and are written to the tlogs in the old format, until all values from before the upgrade are synced. All values that are created after the upgrade will get a checksum. The checksum of the first value is a normal checksum (not depending on previous values), and all following checksums are rolling.

New and old nodes can not and will not communicate (they have a different magic). If the nodes are restarted one by one, the old nodes will keep going as long as possible, while the new nodes can't make progress because they don't have a majority. When the critical point is reached, the new nodes will do a catchup and take over.
1 change: 1 addition & 0 deletions pylabs/test/server/left/system_tests_long_left.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def test_catchup_only():
C.stopOne(n0)
C.compare_stores(n1,n0)
C.start_all()
time.sleep(1.0)
C.assert_running_nodes(2)


Expand Down
80 changes: 80 additions & 0 deletions pylabs/test/server/quick/test_checksums.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""
Copyright (2010-2014) INCUBAID BVBA

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""



import time
import shutil

from .. import system_tests_common as C
from nose.tools import *

from Compat import X

"""
@C.with_custom_setup(C.setup_3_nodes, C.basic_teardown)
def test_diverge ():
C.iterate_n_times(100, C.simple_set)
C.stop_all()

C.remove_node(1)
C.remove_node(2)
C.regenerateClientConfig(C.cluster_id)
C.start_all()
C.iterate_n_times(5, C.simple_set, 100)
C.stop_all()

C.remove_node(0)
C.add_node(1)
C.add_node(2)
C.regenerateClientConfig(C.cluster_id)
C.start_all()
C.iterate_n_times(10, C.simple_set)
C.stop_all()

C.add_node(0)
C.regenerateClientConfig(C.cluster_id)
C.start_all()
time.sleep(3.0)
"""

@C.with_custom_setup(C.setup_2_nodes_forced_master ,C.basic_teardown)
def test_power_failure ():
cluster = C._getCluster()
logging.info("")
C.iterate_n_times(50, C.simple_set)

for i in range(2):
node_id = C.node_names[i]
C.stopOne(node_id)
home = cluster.getNodeConfig(node_id)['home']
backup = '/'.join([X.tmpDir, 'backup_' + node_id])
shutil.copytree(home, backup)
C.startOne(node_id)

C.iterate_n_times(10, C.simple_set)
C.stop_all()

for i in range(2):
node_id = C.node_names[i]
home = cluster.getNodeConfig(node_id)['home']
backup = '/'.join([X.tmpDir, 'backup_' + node_id])
shutil.rmtree(home)
shutil.move(backup, home)
C.startOne(node_id)

C.iterate_n_times(10, C.simple_set)
C.startOne(C.node_names[2])
28 changes: 0 additions & 28 deletions pylabs/test/server/right/system_tests_long_right.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,34 +508,6 @@ def test_sabotage():
returncode = X.subprocess.call(cmd)
assert_equals(returncode, 50)

@Common.with_custom_setup( Common.setup_3_nodes_forced_master, Common.basic_teardown )
def test_large_catchup_while_running():
""" make sure catchup does not interphere with normal operation (eta : 720s) """
cli = Common.get_client()
cluster = Common._getCluster()

cli.set('k','v')
m = cli.whoMaster()

nod1 = Common.node_names[0]
nod2 = Common.node_names[1]
nod3 = Common.node_names[2]

n_name,others = (nod1, [nod2,nod3]) if nod1 != m else (nod2, [nod1, nod3])
node_pid = cluster._getPid(n_name)

time.sleep(0.1)
X.subprocess.call(["kill","-STOP",str(node_pid)])
Common.iterate_n_times( 200000, Common.simple_set )
for n in others:
Common.collapse(n)

time.sleep(1.0)
X.subprocess.call(["kill","-CONT", str(node_pid) ])
cli.delete('k')
time.sleep(10.0)
Common.assert_running_nodes(3)


@Common.with_custom_setup(Common.setup_1_node, Common.basic_teardown)
def test_log_rotation():
Expand Down
5 changes: 5 additions & 0 deletions pylabs/test/server/system_tests_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,11 @@ def add_node ( i ):
cluster.addLocalNode (ni )
cluster.createDirs(ni)

def remove_node(i):
ni = node_names[i]
cluster = _getCluster()
cluster.removeNode(ni)

def start_all(clusterId = None) :
cluster = _getCluster(clusterId )
cluster.start()
Expand Down
4 changes: 2 additions & 2 deletions src/client/arakoon_client.mli
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class type client = object

method nop : unit -> unit Lwt.t
(** [nop ()] is a paxos no-operation.
*)
*)
method confirm: key -> value -> unit Lwt.t
(** [confirm key value] does nothing if this value was already
associated to the key, otherwise, it behaves as [set key value]
Expand Down Expand Up @@ -118,7 +118,7 @@ class type client = object
[replace key wanted] assigns the wanted value to the key,
and returns the previous assignment (if any) for that key.
If wanted is None, the binding is deleted.
*)
*)

method ping: string -> string -> string Lwt.t

Expand Down
20 changes: 13 additions & 7 deletions src/client/client_protocol.ml
Original file line number Diff line number Diff line change
Expand Up @@ -312,18 +312,24 @@ let one_command stop (ic,oc,id as conn) (backend:Backend.backend) =
end
| LAST_ENTRIES ->
begin
Sn.input_sn ic >>= fun i ->
Logger.debug_f_ "connection=%s LAST_ENTRIES: i=%Li" id i >>= fun () ->
response_ok oc >>= fun () ->
backend # last_entries i oc >>= fun () ->
Lwt.return false
wrap_exception
(fun () ->
let msg = "Operation LAST_ENTRIES is not supported" in
Lwt.fail (XException(Arakoon_exc.E_NOT_SUPPORTED, msg)))
end
| LAST_ENTRIES2 ->
begin
wrap_exception
(fun () ->
let msg = "Operation LAST_ENTRIES2 is not supported" in
Lwt.fail (XException(Arakoon_exc.E_NOT_SUPPORTED, msg)))
end
| LAST_ENTRIES3 ->
begin
Sn.input_sn ic >>= fun i ->
Logger.debug_f_ "connection=%s LAST_ENTRIES2: i=%Li" id i >>= fun () ->
Logger.debug_f_ "connection=%s LAST_ENTRIES3: i=%Li" id i >>= fun () ->
response_ok oc >>= fun () ->
backend # last_entries2 i oc >>= fun () ->
backend # last_entries i oc >>= fun () ->
Lwt.return false
end
| WHO_MASTER ->
Expand Down
2 changes: 2 additions & 0 deletions src/client/common.ml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type client_command =
| TEST_AND_SET
| LAST_ENTRIES
| LAST_ENTRIES2
| LAST_ENTRIES3
| RANGE_ENTRIES
| MIGRATE_RANGE
| SEQUENCE
Expand Down Expand Up @@ -133,6 +134,7 @@ let code2int = [
GET_TXID , 0x43l;
COPY_DB_TO_HEAD , 0x44l;
USER_HOOK , 0x45l;
LAST_ENTRIES3 , 0x46l;
]

let int2code =
Expand Down
27 changes: 10 additions & 17 deletions src/client/remote_nodestream.ml
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ let section = Logger.Section.main

class type nodestream = object
method iterate:
Sn.t -> (Sn.t * Value.t -> unit Lwt.t) ->
Tlogcollection.tlog_collection ->
head_saved_cb:(string -> unit Lwt.t) -> unit Lwt.t
Sn.t -> f_entry:(Sn.t * Value.t -> unit Lwt.t) ->
f_head:(Lwt_io.input_channel -> unit Lwt.t) ->
f_file:(string -> int64 -> Lwt_io.input_channel -> unit Lwt.t) -> unit Lwt.t

method collapse: int -> unit Lwt.t

Expand All @@ -50,18 +50,13 @@ class type nodestream = object
method drop_master: unit -> unit Lwt.t
end

class remote_nodestream ((ic,oc) as conn) =
(object
method iterate (i:Sn.t) (f: Sn.t * Value.t -> unit Lwt.t)
(tlog_coll: Tlogcollection.tlog_collection)
~head_saved_cb
=
class remote_nodestream ((ic,oc) as conn) = (object
method iterate i ~f_entry ~f_head ~f_file =
let outgoing buf =
command_to buf LAST_ENTRIES2;
command_to buf LAST_ENTRIES3;
Sn.sn_to buf i
in
let incoming ic =
let save_head () = tlog_coll # save_head ic in
let last_seen = ref None in
let rec loop_entries () =
Sn.input_sn ic >>= fun i2 ->
Expand All @@ -78,7 +73,7 @@ class remote_nodestream ((ic,oc) as conn) =
Llio.input_int32 ic >>= fun _chksum ->
Llio.input_string ic >>= fun entry ->
let value = Value.value_from (Llio.make_buffer entry 0) in
f (i2, value) >>= fun () ->
f_entry (i2, value) >>= fun () ->
loop_entries ()
end
end
Expand All @@ -94,10 +89,8 @@ class remote_nodestream ((ic,oc) as conn) =
end
| 2 ->
begin
Logger.info_f_ "save_head" >>= fun ()->
save_head () >>= fun () ->
let hf_name = tlog_coll # get_head_name () in
head_saved_cb hf_name >>= fun () ->
Logger.info_f_ "save_head" >>= fun () ->
f_head ic >>= fun () ->
loop_parts ()
end
| 3 ->
Expand All @@ -106,7 +99,7 @@ class remote_nodestream ((ic,oc) as conn) =
Llio.input_string ic >>= fun name ->
Llio.input_int64 ic >>= fun length ->
Logger.info_f_ "got %s (%Li bytes)" name length >>= fun () ->
tlog_coll # save_tlog_file name length ic >>= fun () ->
f_file name length ic >>= fun () ->
loop_parts ()
end
| x -> Llio.lwt_failfmt "don't know what %i means" x
Expand Down
6 changes: 3 additions & 3 deletions src/client/remote_nodestream.mli
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ open Ncfg

class type nodestream = object
method iterate:
Sn.t -> (Sn.t * Value.t -> unit Lwt.t) ->
Tlogcollection.tlog_collection ->
head_saved_cb:(string -> unit Lwt.t) -> unit Lwt.t
Sn.t -> f_entry:(Sn.t * Value.t -> unit Lwt.t) ->
f_head:(Lwt_io.input_channel -> unit Lwt.t) ->
f_file:(string -> int64 -> Lwt_io.input_channel -> unit Lwt.t) -> unit Lwt.t

method collapse: int -> unit Lwt.t

Expand Down
2 changes: 1 addition & 1 deletion src/main/replay_main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ let replay_tlogs tlog_dir tlf_dir db_name end_i =
| None ->
begin
Tlc2.get_last_tlog tlog_dir tlf_dir >>= fun (_new_c,fn) ->
Tlc2._validate_one fn "" ~check_marker:false >>= fun (last, _index) ->
Tlc2._validate_one fn "" ~check_marker:false >>= fun (last, _, _index) ->
let i =
match last with
| None -> Sn.start
Expand Down
3 changes: 2 additions & 1 deletion src/msg/tcp_messaging.ml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ class tcp_messaging
my_addresses my_cookie (drop_it: drop_function)
max_buffer_size ~stop =

let _MAGIC = 0xB0BAFE7L in
(* previous magic: 0xB0BAFE7L *)
let _MAGIC = 0x20140820L in
let _VERSION = 1 in
let my_ips, my_port = my_addresses in
let my_ip = List.hd my_ips in
Expand Down
Loading