Skip to content

Commit 46dae12

Browse files
Add blog post about Dask usage at Blue Yonder.
1 parent f953f10 commit 46dae12

File tree

4 files changed

+193
-2
lines changed

4 files changed

+193
-2
lines changed

_data/authors.yml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ Florian Jetter:
6767
Andreas Merkel:
6868
name: "Andreas Merkel"
6969

70-
7170
Marco Neumann:
7271
name: "Marco Neumann"
7372
avatar: "assets/images/authors/marco_neumann.jpg"
@@ -87,4 +86,7 @@ Lucas Rademaker & Nefta Kanilmaz:
8786
url: "https://github.com/lr4d"
8887
- label: "Nefta's Github"
8988
icon: "fab fa-fw fa-github-square"
90-
url: "https://github.com/NeroCorleone"
89+
url: "https://github.com/NeroCorleone"
90+
91+
Andreas Merkel, Kshitij Mathur:
92+
name: "Andreas Merkel and Kshitij Mathur"
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
---
2+
layout: single
3+
title: "Dask Usage at Blue Yonder"
4+
date: 2020-06-19 10:00:00 +0100
5+
tags: technology python data-engineering
6+
header:
7+
overlay_image: assets/images/tech_gear_banner.jpg
8+
overlay_filter: 0.2
9+
show_overlay_excerpt: false
10+
author: Andreas Merkel, Kshitij Mathur
11+
author_profile: true
12+
---
13+
# Dask Usage at Blue Yonder
14+
15+
Back in 2017, Blue Yonder started to look into
16+
[Dask/Distributed](https://distributed.dask.org) and how we can leverage it to power
17+
our machine learning and data pipelines.
18+
It's 2020 now, and we are using it heavily in production for performing machine learning based
19+
forecasting and optimization for our customers.
20+
Time to take a a look at the way we are using Dask!
21+
22+
## Use cases
23+
24+
First, let's have a look at the use cases we have for Dask. The use of Dask at Blue Yonder
25+
is strongly coupled to the concept of [Datasets](https://tech.jda.com/introducing-kartothek/),
26+
tabular data stored as [Apache Parquet](https://parquet.apache.org/) files in blob storage.
27+
We use datasets for storing the input data for our computations as well as intermediate results and the final output data
28+
served again to our customers.
29+
Dask is used in managing/creating this data as well as performing the necessary computations.
30+
31+
The size of the data we work on varies strongly depending on the individual customer.
32+
The largest ones currently amount to about one billion rows per day.
33+
This corresponds to 30 GiB of compressed Parquet data, which is roughly 500 GiB of uncompressed data in memory.
34+
35+
### Downloading data from a database to a dataset
36+
37+
One typical use case that we have is downloading large amounts of data from a database into a
38+
dataset. We use Dask/Distributed here as a means to parallelize the download and the graph is
39+
an embarassingly parallel one: a lot of independent nodes downloading pieces of data into a blob and one
40+
reduction node writing the [dataset metadata](https://github.com/JDASoftwareGroup/kartothek)
41+
(which is basically a dictionary what data can be found in which blob).
42+
Since the database can be overloaded by too many parallel downloads, we need to limit concurrency.
43+
In the beginning, we accomplished this by using a Dask cluster with a limited the number of workers.
44+
In the meantime, we contributed a
45+
[Semaphore implementation](https://docs.dask.org/en/latest/futures.html?highlight=semaphore#id1)
46+
to Dask for solving this problem.
47+
48+
### Doing computations on a dataset
49+
50+
A lot of our algorithms for machine learning, forecasting, and optimization work on datasets.
51+
A source dataset is read, computations are performed on the data, and the result is written to
52+
a target dataset.
53+
In many cases, the layout of the source dataset (the partitioning, i.e., what data resides in which blob)
54+
is used for parallelization. This means the algorithms work independently on the individual
55+
blobs of the source dataset. Therefore, our respective Dask graphs again look very simple, with parallel nodes each performing
56+
the sequential operations of reading in the data from a source dataset blob, doing some computation on it,
57+
and writing it out to a target dataset blob. Again, there is a final reduction node writing the target
58+
dataset metadata. We typically use Dask's Delayed interface for these computations.
59+
60+
![Simple Dask graph with parallel nodes and final reduction](/assets/images/2020-03-16-dask-usage-at-by-graph.png)
61+
62+
### Dataset maintenance
63+
64+
We use Dask/Distributed for performing maintenance operations like garbage collection on datasets.
65+
Garbage collection is necessary since we use a lazy approach when updating datasets and only delete or update
66+
references to blobs in the dataset metadata, deferring the deletion of no longer used blobs.
67+
In this case, Dask is used to parallelize the delete requests to the storage provider (Azure blob service).
68+
In a similar fashion, we use Dask to parallelize copy operations for backing up datasets.
69+
70+
### Repartitioning a dataset
71+
72+
Our algorithms rely on source datasets that are partitioned in a particular way. It is not always
73+
the case that we have the data available in a suitable partitioning, for instance, if the data results
74+
from a computation that used a different partitioning. In this case, we have to repartition the data.
75+
This works by reading the dataset as a [Dask dataframe](https://docs.dask.org/en/latest/dataframe.html)
76+
repartitioning the dataframe using network shuffle, and writing it out again to a dataset.
77+
78+
## Ad hoc data analysis
79+
80+
Finally, our data scientists also use Dask for out-of-core analysis of data using Jupyter notebooks.
81+
This allows us to take advantage of all the nice features of Jupyter even for data that is too large to
82+
fit into an individual compute node's memory.
83+
84+
## Dask cluster setup at Blue Yonder
85+
86+
We run a somewhat unique setup of Dask clusters that is driven by the specific requirements
87+
of our domain (environment isolation for SaaS applications and data isolation between customers).
88+
We have a large number of individual clusters
89+
that are homogeneous in size with many of the clusters dynamically scaled. The peculiarities of this
90+
setup have in some cases triggered edge cases and uncovered bugs, which lead us to submit a number
91+
of upstream pull requests.
92+
93+
### Cluster separation
94+
95+
To provide isolation between customer environments we run separate Dask clusters per customer.
96+
For the same reason, we also run separate clusters for production, staging, and development environments.
97+
98+
But it does not stop there.
99+
The service we provide to our customers is comprised of several products that build upon each other and
100+
and maintained by different teams. We typically perform daily batch runs with these products running sequentially
101+
in separated environments.
102+
For performance reasons, we install the Python packages holding the code needed for the computations on each worker.
103+
We do not want to synchronize the dependencies and release cycles of our different products, which
104+
means we have to run a separate Dask cluster for each of the steps in the batch run.
105+
This results in us operating more than
106+
ten Dask clusters per customer and environment, with most of the time, only one of the clusters being active
107+
and computing something. While this leads to overhead in terms of administration and hardware resouces,
108+
it also gives us a lot of flexibility. For instance, we can update the software on the cluster of one part of the compute pipeline
109+
while another part of the pipeline is computing something on a different cluster.
110+
111+
### Some numbers
112+
113+
The number and size of the workers varies from cluster to cluster depending on the degree of parallelism
114+
of the computation being performed, its resource requirements, and the available timeframe for the computation.
115+
At the time of writing, we are running more than 500 distinct clusters.
116+
Our clusters have between one and 225 workers, with worker size varying between 1GiB and 64GiB of memory.
117+
We typically configure one CPU for the smaller workers and two for the larger ones.
118+
While our Python computations do not leverage thread-level parallelism, the Parquet serialization part,
119+
which is implemented in C++, can benefit from the additional CPU.
120+
Our total memory use (sum over all clusters) goes up to as much as 15TiB.
121+
The total number of dask workers we run varies between 1000 and 2000.
122+
123+
![Simple Dask graph with parallel nodes and final reduction](/assets/images/2020-03-16-dask-usage-at-by-n-workers.png)
124+
125+
## Cluster scaling and resilience
126+
127+
To improve manageability, resilience, and resource utilization, we run the Dask clusters on top
128+
of [Apache Mesos](http://mesos.apache.org/)/[Aurora](http://aurora.apache.org/)
129+
and [Kubernetes](https://kubernetes.io/). This means every worker as well as the scheduler and client
130+
each run in an isolated container. Communication happens via reverse proxies to make the communication
131+
endpoints independent of the actual container instance.
132+
133+
Running on top of a system like Mesos or Kubernetes provides us with resilience since a failing worker
134+
(for instance, as result of a failing hardware node)
135+
can simply be restarted on another node of the system.
136+
It also enables us to easily commission or decommission Dask clusters, making the amount of clusters we run
137+
manageable in the first place.
138+
139+
Running 500 Dask clusters also requires a lot of hardware. We have put two measures in place to improve
140+
the utilization of hardware resources: oversubscription and autoscaling.
141+
142+
### Oversubscription
143+
144+
[Oversubscription](http://mesos.apache.org/documentation/latest/oversubscription/) is a feature of Mesos
145+
that allows allocating more resources than physically present to services running on the system.
146+
This is based on the assumption that not all services exhaust all of their allocated resources at the same time.
147+
If the assumption is violated, we prioritize the resouces to the more important ones.
148+
We use this to re-purpose the resources allocated for production clusters but not utilized the whole time
149+
and use them for development and staging systems.
150+
151+
### Autoscaling
152+
153+
Autoscaling is a mechanism we implemented to dynamically adapt the number of workers in a Dask cluster
154+
to the load on the cluster. This is possible since Mesos/Kubernetes . This
155+
makes it really easy to add or remove worker instances from an existing Dask cluster.
156+
157+
To determine the optimum number of worker instances to run, we added the ``desired_workers`` metric to Distributed.
158+
The metric exposes
159+
the degree of parallelism that a computation has and thus allows us to infer how much workers a cluster should
160+
ideally have. Based on this metric, as well as on the overall resources available and on fairness criteria
161+
(remember, we run a lot of Dask/Distributed clusters), we add or remove workers to our clusters.
162+
To resolve the problem of balancing the conflicting requirements for different resouces like RAM or CPUs
163+
using [Dominant Resource Fairness](https://cs.stanford.edu/~matei/papers/2011/nsdi_drf.pdf).
164+
165+
## Dask issues
166+
167+
The particular way we use Dask, especially running it in containers connected by reverse proxies and the fact
168+
that we dynamically add/remove workers from a cluster quite frequently for autoscaling has lead us to hit some
169+
edge cases and instabilities and given us the chance to contribute some fixes and improvements to Dask.
170+
For instance, we were able to
171+
[improve stability after connection failures](https://github.com/dask/distributed/pull/3246) or when
172+
[workers are removed from the cluster](https://github.com/dask/distributed/pull/3366).
173+
174+
If you are interested in our contributions to Dask and our commitment to the dask community, please
175+
also check out our blog post [Karlsruhe to D.C. ― a Dask story](2020-05-28-dask-developer-workshop.markdown).
176+
177+
## Conclusion
178+
179+
Overall, we are very happy with Dask and the capabilities it offers.
180+
Having migrated to Dask from a proprietary compute framework that was developed within our company,
181+
we noticed that we have had similar pain points with both solutions: running into edge cases and robustness
182+
issues in daily operations.
183+
However, with Dask being an open source solution, we have the confidence that others can also profit from
184+
the fixes that we contribute, and that there are problems we **don't** run into because other people have
185+
already experienced and fixed them.
186+
187+
For the future, we envision adding even more robustness to Dask:
188+
Topics like scheduler/worker resilience (that is, surviving the loss of a worker or even the scheduler without losing computation results)
189+
and flexible scaling of the cluster are of great interest to us.
1.48 MB
Loading
89.1 KB
Loading

0 commit comments

Comments
 (0)