Skip to content

Commit bb0ee5e

Browse files
author
Calvin Nhieu
authored
parallelize read from DSS in workers (#94)
* parallelize read from DSS in workers * Use thread-safer DCP interface This incorporates the change in HumanCellAtlas/dcp-cli#202 * Include thread id in logs This is helpful in our multithreaded code paths * Don't print logs twice AWS Lambda has a default root logger that captures everything that goes to stdout. If you don't turn `propagate` off, you get every message twice in cloudwatch. * Poll for locks much less frequently A failure mode we encounter is using up capacity on the lock table. If we're anticipating smaller numbers of worker lambdas that handle larger blocks of works, we can wait longer between checking for lock availability. This reduced our dynamo consumed capacity. * Distribute 100 bundles to each lambda And aggressively parallelize DSS I/O. * Assume that each bundle has a single cell in the mapper This is a bad assumption, but it's helpful for the demo. * Address comments
1 parent 5140a72 commit bb0ee5e

File tree

10 files changed

+591
-40
lines changed

10 files changed

+591
-40
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
.PHONY: lint test unit-tests
22
MODULES=matrix tests daemons chalice
3-
EXCLUDE=target,vendor,chalicelib
3+
EXCLUDE=target,vendor,chalicelib,target.in
44

55
deploy:
66
$(MAKE) -C chalice $@

daemons/worker/Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ build:
1616
mkdir target
1717
pip install -r requirements.txt -t target/ --upgrade
1818
cp target.in/blosc.cpython-36m-x86_64-linux-gnu.so target/numcodecs
19+
cp target.in/hca_util__init__.py target/hca/util/__init__.py
1920
cp -R ../../matrix target/
2021
cp -R *.py target/
2122

daemons/worker/target.in/hca_util__init__.py

Lines changed: 532 additions & 0 deletions
Large diffs are not rendered by default.

matrix/common/dynamo_utils.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@
1212
import boto3
1313
import botocore
1414

15+
from matrix.common.logging import Logging
16+
17+
logger = Logging.get_logger(__name__)
18+
1519

1620
class Lock(object):
1721
"""Implement a lock with DynamoDB."""
@@ -115,9 +119,9 @@ def acquire(self):
115119
# Or maybe we hold the lock ourselves, then just return
116120
elif db_response["Item"]["LockHolder"] == self._lock_id:
117121
return
118-
122+
logger.debug(f"Waiting for lock on {self._lock_key}")
119123
# Chill out for a bit
120-
time.sleep(.5)
124+
time.sleep(6)
121125

122126
def release(self):
123127
"""Release the lock.

matrix/common/logging.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,14 @@ def get_logger(name: str):
1010
log_level_name = os.environ['LOG_LEVEL'] if 'LOG_LEVEL' in os.environ else 'DEBUG'
1111
log_level = getattr(logging, log_level_name.upper())
1212
ch.setLevel(log_level)
13-
formatter = logging.Formatter('%(asctime)s %(levelname)s %(name)s %(message)s',
13+
formatter = logging.Formatter('%(asctime)s %(thread)d %(levelname)s %(name)s %(message)s',
1414
datefmt="%Y-%m-%dT%H:%M:%S%z")
1515
ch.setFormatter(formatter)
1616
logger = logging.getLogger(name)
1717
logger.handlers = []
1818
logger.addHandler(ch)
1919
logger.setLevel(logging.DEBUG)
20+
logger.propagate = False
2021
return logger
2122

2223
@staticmethod

matrix/lambdas/daemons/driver.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ class Driver:
1313
"""
1414
The first task in a distributed filter merge job.
1515
"""
16-
def __init__(self, request_id: str, bundles_per_worker: int=25):
16+
def __init__(self, request_id: str, bundles_per_worker: int=100):
1717
Logging.set_correlation_id(logger, value=request_id)
1818

1919
self.request_id = request_id

matrix/lambdas/daemons/mapper.py

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,5 @@
1-
import os
21
import typing
32

4-
import zarr
5-
6-
from matrix.common.dss_zarr_store import DSSZarrStore
73
from matrix.common.dynamo_handler import DynamoHandler, DynamoTable, StateTableField
84
from matrix.common.lambda_handler import LambdaHandler, LambdaName
95
from matrix.common.logging import Logging
@@ -82,19 +78,13 @@ def _get_chunk_specs(bundle_fqids: typing.List[str]) -> typing.List[dict]:
8278
chunk_work_spec = []
8379
for bundle_fqid in bundle_fqids:
8480
bundle_uuid, bundle_version = bundle_fqid.split(".", 1)
85-
zarr_store = DSSZarrStore(bundle_uuid,
86-
bundle_version=bundle_version,
87-
dss_instance=os.environ['DEPLOYMENT_STAGE'])
88-
89-
root = zarr.group(store=zarr_store)
90-
91-
rows_per_chunk = root.expression.chunks[0]
92-
total_chunks = root.expression.nchunks
81+
# TODO: Not this! This is taking advantage of the fact that every
82+
# matrixable bundle in the DSS will at first have data for a single
83+
# cell. That won't be true for too long
9384

9485
chunk_work_spec.extend(
9586
[{"bundle_uuid": bundle_uuid,
9687
"bundle_version": bundle_version,
97-
"start_row": n * rows_per_chunk,
98-
"num_rows": rows_per_chunk}
99-
for n in range(total_chunks)])
88+
"start_row": 0,
89+
"num_rows": 1}])
10090
return chunk_work_spec

matrix/lambdas/daemons/worker.py

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import concurrent.futures
12
import math
23
import os
34
import typing
@@ -31,6 +32,7 @@ def __init__(self, request_id: str):
3132
self._input_start_rows = []
3233
self._input_end_rows = []
3334
self._num_rows = []
35+
self.zarr_group = None
3436

3537
def run(self, format: str, worker_chunk_spec: typing.List[dict]):
3638
"""Process and write one chunk of dss bundle matrix to s3 and
@@ -43,24 +45,30 @@ def run(self, format: str, worker_chunk_spec: typing.List[dict]):
4345
logger.debug(f"Worker running with parameters: worker_chunk_spec={worker_chunk_spec}, format={format}")
4446
# TO DO pass in the parameters in worker chunk spec flat
4547
self._parse_worker_chunk_spec(worker_chunk_spec)
48+
num_bundles = len(self._bundle_uuids)
4649
exp_dfs = []
4750
qc_dfs = []
48-
num_bundles = len(self._bundle_uuids)
49-
for chunk_idx in range(num_bundles):
50-
dss_zarr_store = DSSZarrStore(bundle_uuid=self._bundle_uuids[chunk_idx],
51-
bundle_version=self._bundle_versions[chunk_idx],
52-
dss_instance=self._deployment_stage)
53-
group = zarr.group(store=dss_zarr_store)
54-
exp_df, qc_df = convert_dss_zarr_root_to_subset_pandas_dfs(
55-
group, self._input_start_rows[chunk_idx], self._input_end_rows[chunk_idx])
56-
exp_dfs.append(exp_df)
57-
qc_dfs.append(qc_df)
58-
59-
# log every tertile of bundles read
60-
if any(chunk_idx == int(math.ceil((num_bundles - 1) * ((i + 1) / 3))) for i in range(2)):
61-
logger.debug(f"{chunk_idx + 1} of {len(self._bundle_uuids)} bundles successfully read from the DSS")
62-
63-
# In some test cases, dataframes aren't actually returned. Don't try to
51+
52+
# Parallelize high latency bundle reads from DSS
53+
with concurrent.futures.ThreadPoolExecutor(max_workers=25) as executor:
54+
future_to_chunk_map = {executor.submit(self._parse_chunk_to_dataframe, chunk_idx): chunk_idx
55+
for chunk_idx in range(num_bundles)}
56+
for future in concurrent.futures.as_completed(future_to_chunk_map):
57+
chunk_idx = future_to_chunk_map[future]
58+
try:
59+
exp_df, qc_df = future.result()
60+
except Exception as e:
61+
logger.debug(f"Parsing bundle uuid {self._bundle_uuids[chunk_idx]} from DSS "
62+
f"to pandas.DataFrame caused exception {e}")
63+
raise
64+
exp_dfs.append(exp_df)
65+
qc_dfs.append(qc_df)
66+
67+
# log every tertile of bundles read
68+
if any(chunk_idx + 1 == math.ceil(num_bundles * ((i + 1) / 3)) for i in range(3)):
69+
logger.debug(f"{chunk_idx + 1} of {num_bundles} bundles successfully read from the DSS")
70+
71+
# In some test cases, empty dataframes are actually returned. Don't try to
6472
# pass those to pandas.concat
6573
if any(not df.empty for df in exp_dfs):
6674
exp_df = pandas.concat(exp_dfs, axis=0, copy=False)
@@ -81,13 +89,22 @@ def run(self, format: str, worker_chunk_spec: typing.List[dict]):
8189
if workers_and_mappers_are_complete:
8290
logger.debug("Mappers and workers are complete. Invoking reducer.")
8391

84-
s3_zarr_store.write_column_data(group)
92+
s3_zarr_store.write_column_data(self.zarr_group)
8593
reducer_payload = {
8694
"request_id": self._request_id,
8795
"format": format
8896
}
8997
self.lambda_handler.invoke(LambdaName.REDUCER, reducer_payload)
9098

99+
def _parse_chunk_to_dataframe(self, i: int):
100+
dss_zarr_store = DSSZarrStore(bundle_uuid=self._bundle_uuids[i],
101+
bundle_version=self._bundle_versions[i],
102+
dss_instance=self._deployment_stage)
103+
group = zarr.group(store=dss_zarr_store)
104+
if not self.zarr_group:
105+
self.zarr_group = group
106+
return convert_dss_zarr_root_to_subset_pandas_dfs(group, self._input_start_rows[i], self._input_end_rows[i])
107+
91108
def _parse_worker_chunk_spec(self, worker_chunk_spec: typing.List[dict]):
92109
"""Parse worker chunk spec into Worker instance variables.
93110

terraform/modules/matrix-service/lambdas/worker_lambda.tf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ resource "aws_lambda_function" "matrix_service_worker_lambda" {
8585
handler = "app.worker_handler"
8686
runtime = "python3.6"
8787
timeout = 900
88-
memory_size = 1500
88+
memory_size = 3000
8989

9090
environment {
9191
variables = {

tests/unit/lambdas/daemons/test_mapper.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
import unittest
22
import uuid
3+
4+
import pytest
5+
36
from mock import call
47
from unittest import mock
58

@@ -99,8 +102,8 @@ def test_get_chunk_specs_ok(self, mock_dss_zarr_store, mock_zarr_group):
99102
bundle_uuid = str(uuid.uuid4())
100103
bundle_version = "version"
101104
bundle_fqids = ['.'.join([bundle_uuid, bundle_version])]
102-
chunk_size = 10
103-
nchunks = 5
105+
chunk_size = 1
106+
nchunks = 1
104107

105108
test_zarr_group = mock.Mock()
106109
test_expression_data = mock.Mock()
@@ -118,6 +121,9 @@ def test_get_chunk_specs_ok(self, mock_dss_zarr_store, mock_zarr_group):
118121
self.assertEqual(chunk_spec['start_row'], i * chunk_size)
119122
self.assertEqual(chunk_spec['num_rows'], chunk_size)
120123

124+
# TODO: Turn this back on once #95 is addressed
125+
@pytest.mark.skip(reason="Not needed while one-cell bundle assumption is in place.")
126+
@unittest.skip("Not needed while one-cell bundle assumption is in place.")
121127
@mock.patch("zarr.group")
122128
@mock.patch.object(DSSZarrStore, "__init__")
123129
def test_get_chunk_specs_no_chunks(self, mock_dss_zarr_store, mock_zarr_group):

0 commit comments

Comments
 (0)