Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion benchmark/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,11 @@ def run_client(address, size, rate, mechanism, timeout=None, nodes=[]):
nodes = f'--nodes {" ".join(nodes)}' if nodes else ""
return f"./client {address} --size {size} " f"--rate {rate} --timeout {timeout} {nodes}"
elif mechanism == "cometbft":
# use provided address as endpoint
endpoints = f"ws://{address}/websocket"
return (
f"./client -c 1 --size {size} --rate {rate} --time {timeout}"
f" --endpoints ws://localhost:26657/websocket -v --broadcast-tx-method sync --expect-peers {int(len(nodes)/2)} --min-peer-connectivity {int(round(len(nodes)/2))}"
f" --endpoints {endpoints} -v --broadcast-tx-method sync --expect-peers {int(len(nodes)/2)} --min-peer-connectivity {int(round(len(nodes)/2))}"
)
# f' --endpoints ws://localhost:26657/websocket -v --expect-peers {len(nodes)-1} --min-peer-connectivity {len(nodes)-1}')
elif mechanism == "bullshark":
Expand Down
123 changes: 75 additions & 48 deletions benchmark/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,17 @@ def _select_hosts(self, nodes=[]):
addrs = [line.strip() for line in f.readlines()]
return addrs[:max_count]

def _select_client_hosts(self, nodes=[]):
max_count = max(nodes)
client_addrs = []
with open(self.settings.ip_file, "r") as f:
reader = csv.DictReader(f)
if reader.fieldnames is None or "Client IP" not in reader.fieldnames:
raise BenchError("IP file missing 'Client IP' column", ValueError("Missing 'Client IP' in CSV header"))
for row in reader:
client_addrs.append(row["Client IP"])
return client_addrs[:max_count]

def _background_run(self, host, command, log_file):
name = splitext(basename(log_file))[0]
cmd = f'tmux new -d -s "{name}" "{command} |& tee {log_file}"'
Expand Down Expand Up @@ -238,22 +249,22 @@ def _config(self, isGeoremote, hosts, node_parameters, bench_parameters=None):

return committee

def _run_single(self, hosts, rate, bench_parameters, node_parameters, debug=False, committee=[]):
def _run_single(self, node_hosts, client_hosts, rate, bench_parameters, node_parameters, debug=False, committee=[]):
Print.info("Booting testbed...")

# Kill any potentially unfinished run and delete logs.
self.kill(hosts=hosts, delete_logs=True)
self.kill(hosts=node_hosts, delete_logs=True)

if self.mechanism.name == "hotstuff":
# Run the clients (they will wait for the nodes to be ready).
# Filter all faulty nodes from the client addresses (or they will wait
# for the faulty nodes to be online).
committee = Committee.load(PathMaker.committee_file())
addresses = [f'{x}:{self.settings.ports["front"]}' for x in hosts]
addresses = [f'{x}:{self.settings.ports["front"]}' for x in node_hosts]
rate_share = ceil(rate / committee.size()) # Take faults into account.
timeout = node_parameters.timeout_delay
client_logs = [PathMaker.client_log_file(i) for i in range(len(hosts))]
for host, addr, log_file in zip(hosts, addresses, client_logs):
client_logs = [PathMaker.client_log_file(i) for i in range(len(client_hosts))]
for client_host, addr, log_file in zip(client_hosts, addresses, client_logs):
cmd = CommandMaker.run_client(
addr,
bench_parameters.tx_size,
Expand All @@ -262,13 +273,13 @@ def _run_single(self, hosts, rate, bench_parameters, node_parameters, debug=Fals
timeout,
nodes=addresses,
)
self._background_run(host, cmd, log_file)
self._background_run(client_host, cmd, log_file)

# Run the nodes.
key_files = [PathMaker.key_file(i) for i in range(len(hosts))]
dbs = [PathMaker.db_path(i) for i in range(len(hosts))]
node_logs = [PathMaker.node_log_file(i) for i in range(len(hosts))]
for host, key_file, db, log_file in zip(hosts, key_files, dbs, node_logs):
key_files = [PathMaker.key_file(i) for i in range(len(node_hosts))]
dbs = [PathMaker.db_path(i) for i in range(len(node_hosts))]
node_logs = [PathMaker.node_log_file(i) for i in range(len(node_hosts))]
for host, key_file, db, log_file in zip(node_hosts, key_files, dbs, node_logs):
cmd = CommandMaker.run_node(
key_file,
PathMaker.committee_file(),
Expand All @@ -292,12 +303,12 @@ def _run_single(self, hosts, rate, bench_parameters, node_parameters, debug=Fals

# Run the clients
# committee = Committee.load(PathMaker.committee_file()) # TODO for cometbft
addresses = [f'{x}:{self.settings.ports["front"]}' for x in hosts]
addresses = [f'{x}:{self.settings.ports["front"]}' for x in node_hosts]
# rate_share = ceil(rate / committee.size()) # TODO Take faults into account.
rate_share = ceil(rate / len(hosts))
rate_share = ceil(rate / len(node_hosts))
duration = bench_parameters.duration # Duration for which the client should run
client_logs = [PathMaker.client_log_file(i) for i in range(len(hosts))]
for host, addr, log_file in zip(hosts, addresses, client_logs):
client_logs = [PathMaker.client_log_file(i) for i in range(len(client_hosts))]
for client_host, addr, log_file in zip(client_hosts, addresses, client_logs):
cmd = CommandMaker.run_client(
addr,
bench_parameters.tx_size,
Expand All @@ -306,11 +317,11 @@ def _run_single(self, hosts, rate, bench_parameters, node_parameters, debug=Fals
duration,
nodes=addresses,
)
self._background_run(host, cmd, log_file)
self._background_run(client_host, cmd, log_file)

# Run the nodes.
node_logs = [PathMaker.node_log_file(i) for i in range(len(hosts))]
for i, (host, log_file) in enumerate(zip(hosts, node_logs)):
node_logs = [PathMaker.node_log_file(i) for i in range(len(node_hosts))]
for i, (host, log_file) in enumerate(zip(node_hosts, node_logs)):
cmd = f'./node node --home ~/node{i} --proxy_app=kvstore --p2p.persistent_peers="{persistent_peers}" --log_level="state:info,consensus:info,txindex:info,consensus:debug,*:error"'
self._background_run(host, cmd, log_file)

Expand All @@ -323,8 +334,9 @@ def _run_single(self, hosts, rate, bench_parameters, node_parameters, debug=Fals
workers_addresses = committee.workers_addresses(faults)
rate_share = ceil(rate / committee.workers())
for i, addresses in enumerate(workers_addresses):
client_host = client_hosts[i]
for id, address in addresses:
host = BullsharkCommittee.ip(address)
host = client_host
cmd = CommandMaker.run_client(
address,
bench_parameters.tx_size,
Expand All @@ -339,7 +351,7 @@ def _run_single(self, hosts, rate, bench_parameters, node_parameters, debug=Fals
# Run the primaries (except the faulty ones).
Print.info("Booting primaries...")
for i, address in enumerate(committee.primary_addresses(faults)):
host = BullsharkCommittee.ip(address)
host = node_hosts[i]
cmd = CommandMaker.run_primary(
PathMaker.key_file(i),
PathMaker.committee_file(),
Expand All @@ -354,7 +366,7 @@ def _run_single(self, hosts, rate, bench_parameters, node_parameters, debug=Fals
Print.info("Booting workers...")
for i, addresses in enumerate(workers_addresses):
for id, address in addresses:
host = BullsharkCommittee.ip(address)
host = node_hosts[i]
cmd = CommandMaker.run_worker(
PathMaker.key_file(i),
PathMaker.committee_file(),
Expand All @@ -370,54 +382,64 @@ def _run_single(self, hosts, rate, bench_parameters, node_parameters, debug=Fals
duration = bench_parameters.duration
for _ in progress_bar(range(20), prefix=f"Running benchmark ({duration} sec):"):
sleep(ceil(duration / 20))
self.kill(hosts=hosts, delete_logs=False)
self.kill(hosts=node_hosts, delete_logs=False)

sleep(1)
if self.mechanism.name == "cometbft":
latency_logs = [PathMaker.latency_log_file(i) for i in range(len(hosts))]
for i, (host, log_file) in enumerate(zip(hosts, latency_logs)):
latency_logs = [PathMaker.latency_log_file(i) for i in range(len(node_hosts))]
for i, (host, log_file) in enumerate(zip(node_hosts, latency_logs)):
cmd = f"./cometbft/test/loadtime/build/report --database-type goleveldb --data-dir ~/node{i}/data"
self._background_run(host, cmd, log_file)

def _logs(self, hosts, faults, committee=[]): # , servers, run_id):
def _logs(self, node_hosts, client_hosts, faults, committee=[]): # , servers, run_id):
# Delete local logs (if any).
cmd = CommandMaker.clean_logs()
subprocess.run([cmd], shell=True, stderr=subprocess.DEVNULL)

# Download log files.
progress = progress_bar(hosts, prefix="Downloading logs:")
progress = progress_bar(node_hosts, prefix="Downloading node logs:")
if self.mechanism.name == "bullshark":
workers_addresses = committee.workers_addresses(faults)
progress = progress_bar(workers_addresses, prefix="Downloading workers logs:")
for i, addresses in enumerate(progress):
for id, address in addresses:
host = BullsharkCommittee.ip(address)
c = Connection(host, user="ubuntu", connect_kwargs=self.connect)
c.get(
# download client logs from client host
client_host = client_hosts[i]
c_client = Connection(client_host, user="ubuntu", connect_kwargs=self.connect)
c_client.get(
PathMaker.client_log_file_bull(i, id),
local=PathMaker.client_log_file_bull(i, id),
)
c.get(
# download worker logs from node host
node_host = node_hosts[i]
c_node = Connection(node_host, user="ubuntu", connect_kwargs=self.connect)
c_node.get(
PathMaker.worker_log_file(i, id),
local=PathMaker.worker_log_file(i, id),
)

primary_addresses = committee.primary_addresses(faults)
progress = progress_bar(primary_addresses, prefix="Downloading primaries logs:")
for i, address in enumerate(progress):
host = BullsharkCommittee.ip(address)
c = Connection(host, user="ubuntu", connect_kwargs=self.connect)
# download primary logs from node host
node_host = node_hosts[i]
c = Connection(node_host, user="ubuntu", connect_kwargs=self.connect)
c.get(PathMaker.primary_log_file(i), local=PathMaker.primary_log_file(i))
else:
for i, host in enumerate(progress):
c = Connection(host, user=self.settings.key_name, connect_kwargs=self.connect)
c.get(PathMaker.node_log_file(i), local=PathMaker.node_log_file(i))
c.get(PathMaker.client_log_file(i), local=PathMaker.client_log_file(i))
# download node logs first
for i, node_host in enumerate(progress):
c_node = Connection(node_host, user=self.settings.key_name, connect_kwargs=self.connect)
c_node.get(PathMaker.node_log_file(i), local=PathMaker.node_log_file(i))
if self.mechanism.name == "cometbft":
c.get(
c_node.get(
PathMaker.latency_log_file(i),
local=PathMaker.latency_log_file(i),
)
# then download client logs
client_progress = progress_bar(client_hosts, prefix="Downloading client logs:")
for i, client_host in enumerate(client_progress):
c_client = Connection(client_host, user=self.settings.key_name, connect_kwargs=self.connect)
c_client.get(PathMaker.client_log_file(i), local=PathMaker.client_log_file(i))

# Parse logs and return the parser.
Print.info("Parsing logs and computing performance...")
Expand All @@ -440,15 +462,17 @@ def run(self, bench_parameters_dict, node_parameters_dict, isGeoRemote, debug=Fa
except ConfigError as e:
raise BenchError("Invalid nodes or bench parameters", e)

# Select which hosts to use.
selected_hosts = self._select_hosts(bench_parameters.nodes)
if len(selected_hosts) < max(bench_parameters.nodes):
Print.warn("There are not enough instances available")
# Select which node and client hosts to use.
selected_node_hosts = self._select_hosts(bench_parameters.nodes)
selected_client_hosts = self._select_client_hosts(bench_parameters.nodes)
max_nodes = max(bench_parameters.nodes)
if len(selected_node_hosts) < max_nodes or len(selected_client_hosts) < max_nodes:
Print.warn("There are not enough instances available for nodes or clients")
return

# Update nodes.
try:
self._update(selected_hosts)
self._update(selected_node_hosts)
except (GroupException, ExecutionError) as e:
e = FabricError(e) if isinstance(e, GroupException) else e
raise BenchError("Failed to update nodes", e)
Expand All @@ -469,7 +493,7 @@ def run(self, bench_parameters_dict, node_parameters_dict, isGeoRemote, debug=Fa
# Set delay parameters for georemote
latencySetter = LatencySetter(self.settings, self.connect)
try:
latencySetter.configDelay(selected_hosts)
latencySetter.configDelay(selected_node_hosts)
latencySetter.addDelays(selected_servers, pingDelays, self.settings.interface)
except (subprocess.SubprocessError, GroupException) as e:
e = FabricError(e) if isinstance(e, GroupException) else e
Expand All @@ -479,11 +503,12 @@ def run(self, bench_parameters_dict, node_parameters_dict, isGeoRemote, debug=Fa
for n in bench_parameters.nodes:
for r in bench_parameters.rate:
Print.heading(f"\nRunning {n} nodes (input rate: {r:,} tx/s)")
hosts = selected_hosts[:n]
node_hosts = selected_node_hosts[:n]
client_hosts = selected_client_hosts[:n]

# Upload all configuration files.
try:
committee = self._config(isGeoRemote, hosts, node_parameters, bench_parameters)
committee = self._config(isGeoRemote, node_hosts, node_parameters, bench_parameters)
except (subprocess.SubprocessError, GroupException) as e:
e = FabricError(e) if isinstance(e, GroupException) else e
Print.error(BenchError("Failed to configure nodes", e))
Expand All @@ -496,7 +521,8 @@ def run(self, bench_parameters_dict, node_parameters_dict, isGeoRemote, debug=Fa

# Do not boot faulty nodes.
faults = bench_parameters.faults
hosts = hosts[: n - faults]
node_hosts = node_hosts[: n - faults]
client_hosts = client_hosts[: n - faults]

run_id_array = []

Expand All @@ -507,15 +533,16 @@ def run(self, bench_parameters_dict, node_parameters_dict, isGeoRemote, debug=Fa

try:
self._run_single(
hosts,
node_hosts,
client_hosts,
r,
bench_parameters,
node_parameters,
debug,
committee_copy,
)

logger = self._logs(hosts, faults, committee_copy)
logger = self._logs(node_hosts, client_hosts, faults, committee_copy)
logger.print(
PathMaker.result_file(
self.mechanism.name,
Expand All @@ -532,7 +559,7 @@ def run(self, bench_parameters_dict, node_parameters_dict, isGeoRemote, debug=Fa
GroupException,
ParseError,
) as e:
self.kill(hosts=hosts)
self.kill(hosts=node_hosts, delete_logs=False)
if isinstance(e, GroupException):
e = FabricError(e)
Print.error(BenchError("Benchmark failed", e))
Expand Down
12 changes: 6 additions & 6 deletions fab-params.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
"hotstuff": {
"bench_params": {
"faults": 0,
"nodes": [64],
"rate": [160000],
"nodes": [4],
"rate": [16000],
"tx_size": 128,
"duration": 100,
"runs": 5
"runs": 1
},
"node_params": {
"consensus": {
Expand All @@ -26,8 +26,8 @@
"cometbft": {
"bench_params": {
"faults": 0,
"nodes": [64],
"rate": [60000],
"nodes": [4],
"rate": [6000],
"tx_size": 128,
"duration": 100,
"runs": 5
Expand All @@ -49,7 +49,7 @@
"bullshark": {
"bench_params": {
"faults": 0,
"nodes": [42],
"nodes": [4],
"workers": 1,
"collocate": true,
"rate": [80000],
Expand Down
5 changes: 5 additions & 0 deletions results/metrics.csv
Original file line number Diff line number Diff line change
Expand Up @@ -2132,3 +2132,8 @@ run_id,name,faults,input_rate,committee_size,transaction_size,execution_time,bat
2132,cometbft,0.0,60032.0,64.0,128.0,79.0,,699.0,89493.0,1747.0,865.0,110660.0,77913.0,
2133,cometbft,0.0,60032.0,64.0,128.0,79.0,,769.0,98375.0,3119.0,933.0,119385.0,52383.0,
2134,cometbft,0.0,60032.0,64.0,128.0,70.0,,699.0,89493.0,2211.0,865.0,110660.0,52580.0,5.0
2135,bullshark,0.0,80000.0,4.0,128.0,300.0,120000.0,65936.0,8439776.0,1087.0,65807.0,8423318.0,1578.0,
2136,bullshark,0.0,80000.0,4.0,128.0,300.0,120000.0,65550.0,8390374.0,1117.0,65416.0,8373209.0,1644.0,
2137,bullshark,0.0,80000.0,4.0,128.0,300.0,120000.0,65743.0,8415075.0,1102.0,65611.5,8398263.5,1611.0,2.0
2138,hotstuff,0.0,16000.0,4.0,128.0,100.0,120000.0,16038.0,2052860.0,230.0,15903.0,2035577.0,629.0,
2139,hotstuff,0.0,16000.0,4.0,128.0,100.0,120000.0,16038.0,2052860.0,230.0,15903.0,2035577.0,629.0,1.0
10 changes: 5 additions & 5 deletions rundata/ip_file.csv
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
Instance Name,Internal IP,External IP
ubuntu,192.168.41.227,206.12.93.47
ubuntu,192.168.41.244,206.12.93.47
ubuntu,192.168.41.196,206.12.93.47
ubuntu,192.168.41.166,206.12.93.47
Instance Name,Internal IP,Client IP, External IP
ubuntu,192.168.41.227,192.168.41.166,206.12.93.47
ubuntu,192.168.41.244,192.168.41.227,206.12.93.47
ubuntu,192.168.41.196,192.168.41.244,206.12.93.47
ubuntu,192.168.41.166,192.168.41.196,206.12.93.47
ubuntu,192.168.41.92,206.12.93.47
ubuntu,192.168.41.197,206.12.93.47
ubuntu,192.168.41.153,206.12.93.47
Expand Down
Loading