Skip to content
This repository was archived by the owner on Jan 22, 2026. It is now read-only.

Commit 26e5deb

Browse files
authored
Add a utility function for reading main_summary. (#177)
* Add a utility function for reading main_summary. This supports merging schemas for only a subset of partitions. * Replace findspark with pyspark * Remove pyspark from setup.py
1 parent bf624ef commit 26e5deb

15 files changed

Lines changed: 287 additions & 17 deletions

File tree

Dockerfile

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,20 @@ ENV HBASE_VERSION=1.2.3
88
RUN apt-get update && apt-get install -y g++ libpython-dev libsnappy-dev
99

1010
# setup conda environment
11-
RUN wget https://repo.continuum.io/miniconda/Miniconda-latest-Linux-x86_64.sh -O miniconda.sh
11+
12+
# temporary workaround, pin miniconda version until it's fixed.
13+
RUN wget https://repo.continuum.io/miniconda/Miniconda2-4.3.21-Linux-x86_64.sh -O miniconda.sh
1214
RUN bash miniconda.sh -b -p /miniconda
1315
ENV PATH="/miniconda/bin:${PATH}"
1416
RUN hash -r
1517
RUN conda config --set always_yes yes --set changeps1 no
16-
RUN conda update -q conda
18+
# TODO: uncomment
19+
# RUN conda update -q conda
1720
RUN conda info -a # Useful for debugging any issues with conda
1821

1922
# install spark/hbase
20-
RUN wget -nv https://d3kbcqa49mib13.cloudfront.net/spark-$SPARK_VERSION-bin-hadoop2.6.tgz
2123
RUN wget -nv https://archive.apache.org/dist/hbase/$HBASE_VERSION/hbase-$HBASE_VERSION-bin.tar.gz
22-
RUN tar -zxf spark-$SPARK_VERSION-bin-hadoop2.6.tgz
2324
RUN tar -zxf hbase-$HBASE_VERSION-bin.tar.gz
24-
ENV SPARK_HOME="/spark-${SPARK_VERSION}-bin-hadoop2.6"
2525

2626
# build + activate conda environment
2727
COPY ./environment.yml /python_moztelemetry/

environment.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,6 @@ name: test-environment
22
dependencies:
33
- python=2.7
44
- pandas
5+
- pyspark
56
- python-snappy
67
- snappy

moztelemetry/standards.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,3 +136,65 @@ def get_last_month_range():
136136
end_of_last_month = snap_to_beginning_of_month(today) - timedelta(days=1)
137137
start_of_last_month = snap_to_beginning_of_month(end_of_last_month)
138138
return (start_of_last_month, end_of_last_month)
139+
140+
141+
def read_main_summary(spark,
142+
submission_date_s3=None,
143+
sample_id=None,
144+
mergeSchema=True,
145+
path='s3://telemetry-parquet/main_summary/v4'):
146+
""" Efficiently read main_summary parquet data.
147+
148+
Read data from the given path, optionally filtering to a
149+
specified set of partition values first. This can save a
150+
time, particularly if `mergeSchema` is True.
151+
152+
Args:
153+
spark: Spark context
154+
submission_date_s3: Optional list of values to filter the
155+
`submission_date_s3` partition. Default is to read all
156+
partitions.
157+
sample_id: Optional list of values to filter the `sample_id`
158+
partition. Default is to read all partitions.
159+
mergeSchema (bool): Determines whether or not to merge the
160+
schemas of the resulting parquet files (ie. whether to
161+
support schema evolution or not). Default is to merge
162+
schemas.
163+
path (str): Location (disk or S3) from which to read data.
164+
Default is to read from the "production" location on S3.
165+
166+
Returns:
167+
A DataFrame loaded from the specified partitions.
168+
169+
"""
170+
base_path = path
171+
172+
# Specifying basePath retains the partition fields even
173+
# if we read a bunch of paths separately.
174+
reader = spark.read.option("basePath", base_path)
175+
if mergeSchema:
176+
reader = reader.option("mergeSchema", "true")
177+
178+
if submission_date_s3 is not None and sample_id is None:
179+
paths = ["{}/submission_date_s3={}/".format(base_path, s) for s in submission_date_s3]
180+
return reader.parquet(*paths)
181+
182+
if submission_date_s3 is not None and sample_id is not None:
183+
paths = []
184+
for sd in submission_date_s3:
185+
for si in sample_id:
186+
paths.append("{}/submission_date_s3={}/sample_id={}/".format(
187+
base_path, sd, si))
188+
return reader.parquet(*paths)
189+
190+
if submission_date_s3 is None and sample_id is not None:
191+
# Ugh, why? We would have to iterate the entire path to identify
192+
# all the submission_date_s3 partitions, which may end up being
193+
# slower.
194+
data = reader.parquet(base_path)
195+
sids = ["{}".format(s) for s in sample_id]
196+
criteria = "sample_id IN ({})".format(",".join(sids))
197+
return data.where(criteria)
198+
199+
# Neither partition is filtered.
200+
return reader.parquet(base_path)

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package_dir={'moztelemetry': 'moztelemetry'},
1818
install_requires=['boto', 'boto3', 'ujson', 'requests', 'protobuf',
1919
'expiringdict', 'functools32', 'futures', 'py4j',
20-
'pandas>=0.14.1', 'numpy>=1.8.2', 'findspark',
20+
'pandas>=0.14.1', 'numpy>=1.8.2',
2121
'happybase>=1.1.0', 'PyYAML', 'python-snappy'],
2222
setup_requires=['pytest-runner', 'setuptools_scm'],
2323
# put pytest last to workaround this bug

tests/conftest.py

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,7 @@
88
import pytest
99
from moto import mock_s3
1010
from concurrent import futures
11-
12-
import findspark
13-
findspark.init()
14-
import pyspark # noqa
11+
from pyspark.sql import SparkSession
1512

1613

1714
@pytest.fixture
@@ -48,17 +45,26 @@ def dummy_bucket(my_mock_s3):
4845
return bucket
4946

5047

51-
@pytest.fixture(scope='session')
52-
def spark_context(request):
48+
@pytest.fixture(scope="session")
49+
def spark():
50+
spark = (
51+
SparkSession
52+
.builder
53+
.master("local")
54+
.appName("python_moztelemetry_test")
55+
.getOrCreate()
56+
)
57+
5358
logger = logging.getLogger("py4j")
5459
logger.setLevel(logging.ERROR)
55-
sc = pyspark.SparkContext(master="local[1]")
5660

57-
def finalizer():
58-
return sc.stop()
59-
request.addfinalizer(finalizer)
61+
yield spark
62+
spark.stop()
6063

61-
return sc
64+
65+
@pytest.fixture
66+
def spark_context(spark):
67+
return spark.sparkContext
6268

6369

6470
@pytest.fixture(scope='session')
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.

0 commit comments

Comments
 (0)