@@ -17,86 +17,55 @@ Back in 2017, Blue Yonder started to look into
1717our machine learning and data pipelines.
1818It's 2020 now, and we are using it heavily in production for performing machine learning based
1919forecasting and optimization for our customers.
20- Time to take a a look at the way we are using Dask!
20+ Over time, we discovered that the way we are using Dask differs from how it is typically
21+ used in the community.
22+ (Teaser: For instance, we are running about 500 Dask clusters in total and dynamically scale up and down
23+ the number of workers.)
24+ So we think it's time to take a a look at the way we are using Dask!
2125
22- ## Use cases
26+ ## Use case
2327
24- First, let's have a look at the use cases we have for Dask. The use of Dask at Blue Yonder
25- is strongly coupled to the concept of [ Datasets ] ( https://tech.jda.com/introducing-kartothek/ ) ,
28+ First, let's have a look at the main use case we have for Dask. The use of Dask at Blue Yonder
29+ is strongly coupled to the concept of [ datasets ] ( https://tech.jda.com/introducing-kartothek/ ) ,
2630tabular data stored as [ Apache Parquet] ( https://parquet.apache.org/ ) files in blob storage.
2731We use datasets for storing the input data for our computations as well as intermediate results and the final output data
2832served again to our customers.
2933Dask is used in managing/creating this data as well as performing the necessary computations.
3034
31- The size of the data we work on varies strongly depending on the individual customer.
32- The largest ones currently amount to about one billion rows per day.
33- This corresponds to 30 GiB of compressed Parquet data, which is roughly 500 GiB of uncompressed data in memory.
34-
35- ### Downloading data from a database to a dataset
35+ Our pipeline consists of downloading data from a relational database into a dataset
36+ (we use Dask here for parallelization of the download) and then of several steps that each read
37+ the data from an input dataset, do a computations on it, and write it out to another dataset.
3638
37- One typical use case that we have is downloading large amounts of data from a database into a
38- dataset. We use Dask/Distributed here as a means to parallelize the download and the graph is
39- an embarassingly parallel one: a lot of independent nodes downloading pieces of data into a blob and one
40- reduction node writing the [ dataset metadata] ( https://github.com/JDASoftwareGroup/kartothek )
41- (which is basically a dictionary what data can be found in which blob).
42- Since the database can be overloaded by too many parallel downloads, we need to limit concurrency.
43- In the beginning, we accomplished this by using a Dask cluster with a limited the number of workers.
44- In the meantime, we contributed a
45- [ Semaphore implementation] ( https://docs.dask.org/en/latest/futures.html?highlight=semaphore#id1 )
46- to Dask for solving this problem.
47-
48- ### Doing computations on a dataset
49-
50- A lot of our algorithms for machine learning, forecasting, and optimization work on datasets.
51- A source dataset is read, computations are performed on the data, and the result is written to
52- a target dataset.
5339In many cases, the layout of the source dataset (the partitioning, i.e., what data resides in which blob)
54- is used for parallelization. This means the algorithms work independently on the individual
55- blobs of the source dataset. Therefore, our respective Dask graphs again look very simple, with parallel nodes each performing
40+ is used for parallelization.
41+ This means the algorithms work independently on the individual blobs of the source dataset.
42+ Therefore, our respective Dask graphs are embarassingly parallel.
43+ The individual nodes perfom
5644the sequential operations of reading in the data from a source dataset blob, doing some computation on it,
57- and writing it out to a target dataset blob. Again, there is a final reduction node writing the target
58- dataset metadata. We typically use Dask's Delayed interface for these computations.
45+ and writing it out to a target dataset blob.
46+ Again, there is a final reduction node writing the target
47+ dataset metadata.
48+ We typically use Dask's Delayed interface for these computations.
5949
6050![ Simple Dask graph with parallel nodes and final reduction] ( /assets/images/2020-03-16-dask-usage-at-by-graph.png )
6151
62- ### Dataset maintenance
63-
64- We use Dask/Distributed for performing maintenance operations like garbage collection on datasets.
65- Garbage collection is necessary since we use a lazy approach when updating datasets and only delete or update
66- references to blobs in the dataset metadata, deferring the deletion of no longer used blobs.
67- In this case, Dask is used to parallelize the delete requests to the storage provider (Azure blob service).
68- In a similar fashion, we use Dask to parallelize copy operations for backing up datasets.
69-
70- ### Repartitioning a dataset
71-
72- Our algorithms rely on source datasets that are partitioned in a particular way. It is not always
73- the case that we have the data available in a suitable partitioning, for instance, if the data results
74- from a computation that used a different partitioning. In this case, we have to repartition the data.
52+ In between, we have intermediate steps for re-shuffling the data.
7553This works by reading the dataset as a [ Dask dataframe] ( https://docs.dask.org/en/latest/dataframe.html )
7654repartitioning the dataframe using network shuffle, and writing it out again to a dataset.
7755
78- ## Ad hoc data analysis
79-
80- Finally, our data scientists also use Dask for out-of-core analysis of data using Jupyter notebooks.
81- This allows us to take advantage of all the nice features of Jupyter even for data that is too large to
82- fit into an individual compute node's memory.
56+ The size of the data we work on varies strongly depending on the individual customer.
57+ The largest ones currently amount to about one billion rows per day.
58+ This corresponds to 30 GiB of compressed Parquet data, which is roughly 500 GiB of uncompressed data in memory.
8359
8460## Dask cluster setup at Blue Yonder
8561
8662We run a somewhat unique setup of Dask clusters that is driven by the specific requirements
87- of our domain (environment isolation for SaaS applications and data isolation between customers).
88- We have a large number of individual clusters
89- that are homogeneous in size with many of the clusters dynamically scaled. The peculiarities of this
90- setup have in some cases triggered edge cases and uncovered bugs, which lead us to submit a number
91- of upstream pull requests.
92-
93- ### Cluster separation
94-
95- To provide isolation between customer environments we run separate Dask clusters per customer.
96- For the same reason, we also run separate clusters for production, staging, and development environments.
63+ of our domain.
64+ For reasons of data isolation between customers and environment isolation for SaaS applications
65+ we run separate Dask clusters per customer and per environment (production, staging, and development).
9766
9867But it does not stop there.
99- The service we provide to our customers is comprised of several products that build upon each other and
68+ The service we provide to our customers is comprised of several products that build upon each other
10069and maintained by different teams. We typically perform daily batch runs with these products running sequentially
10170in separated environments.
10271For performance reasons, we install the Python packages holding the code needed for the computations on each worker.
@@ -105,9 +74,11 @@ means we have to run a separate Dask cluster for each of the steps in the batch
10574This results in us operating more than
10675ten Dask clusters per customer and environment, with most of the time, only one of the clusters being active
10776and computing something. While this leads to overhead in terms of administration and hardware resouces,
77+ (which we have to mitigate, as outlined below)
10878it also gives us a lot of flexibility. For instance, we can update the software on the cluster of one part of the compute pipeline
10979while another part of the pipeline is computing something on a different cluster.
11080
81+
11182### Some numbers
11283
11384The number and size of the workers varies from cluster to cluster depending on the degree of parallelism
@@ -127,7 +98,8 @@ The total number of dask workers we run varies between 1000 and 2000.
12798To improve manageability, resilience, and resource utilization, we run the Dask clusters on top
12899of [ Apache Mesos] ( http://mesos.apache.org/ ) /[ Aurora] ( http://aurora.apache.org/ )
129100and [ Kubernetes] ( https://kubernetes.io/ ) . This means every worker as well as the scheduler and client
130- each run in an isolated container. Communication happens via reverse proxies to make the communication
101+ each run in an isolated container. Communication happens via a simple service mesh
102+ implemented via reverse proxies to make the communication
131103endpoints independent of the actual container instance.
132104
133105Running on top of a system like Mesos or Kubernetes provides us with resilience since a failing worker
0 commit comments