Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gigl/distributed/distributed_neighborloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ def _setup_for_graph_store(

# Get sampling ports for compute-storage connections.
sampling_ports = dataset.get_free_ports_on_storage_cluster(
num_ports=dataset.cluster_info.num_processes_per_compute
num_ports=dataset.cluster_info.num_compute_nodes
)
sampling_port = sampling_ports[node_rank]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,16 +499,28 @@ def _get_expected_input_nodes_by_rank(
)
expected_sampler_input = collections.defaultdict(list)
for server_rank in range(cluster_info.num_storage_nodes):
server_nodes = get_ids_on_rank(partition_book, server_rank)
server_nodes = get_ids_on_rank(partition_book=partition_book, rank=server_rank)
for compute_rank in range(cluster_info.num_compute_nodes):
generated_nodes = shard_nodes_by_process(
server_nodes, compute_rank, cluster_info.num_processes_per_compute
input_nodes=server_nodes,
local_process_rank=compute_rank,
local_process_world_size=cluster_info.num_compute_nodes,
)
expected_sampler_input[compute_rank].append(generated_nodes)
return dict(expected_sampler_input)


class GraphStoreIntegrationTest(TestCase):
"""
NOTE: Since these tests run on cloud build,
and our python process memory footprint is quite large due to tf, torch, etc,
We need to be careful to not spawn too many processes.
Otherwise we will OOM and see "myterious" failures like the below:
make: *** [Makefile:119: integration_test] Error 137
ERROR: build step 0 "docker-img/path:tag" failed: step exited with non-zero status: 2
ERROR: build step 0 "docker-img/path:tag" failed: step exited with non-zero status: 2
"""

def test_graph_store_homogeneous(self):
# Simulating two server machine, two compute machines.
# Each machine has one process.
Expand Down Expand Up @@ -625,7 +637,7 @@ def test_homogeneous_training(self):
cluster_info = GraphStoreInfo(
num_storage_nodes=2,
num_compute_nodes=2,
num_processes_per_compute=2,
num_processes_per_compute=1,
cluster_master_ip=host_ip,
storage_cluster_master_ip=host_ip,
compute_cluster_master_ip=host_ip,
Expand Down
2 changes: 1 addition & 1 deletion tests/test_assets/test_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

logger = Logger()

DEFAULT_TIMEOUT_SECONDS: Final[float] = 300.0
DEFAULT_TIMEOUT_SECONDS: Final[float] = 60.0 * 10 # 10 minutes
DEFAULT_POLL_INTERVAL_SECONDS: Final[float] = 0.1


Expand Down