|
| 1 | +--- |
| 2 | +layout: single |
| 3 | +title: "Dask Usage at Blue Yonder" |
| 4 | +date: 2020-06-19 10:00:00 +0100 |
| 5 | +tags: technology python data-engineering |
| 6 | +header: |
| 7 | + overlay_image: assets/images/tech_gear_banner.jpg |
| 8 | + overlay_filter: 0.2 |
| 9 | + show_overlay_excerpt: false |
| 10 | +author: Andreas Merkel, Kshitij Mathur |
| 11 | +author_profile: true |
| 12 | +--- |
| 13 | +# Dask Usage at Blue Yonder |
| 14 | + |
| 15 | +Back in 2017, Blue Yonder started to look into |
| 16 | +[Dask/Distributed](https://distributed.dask.org) and how we can leverage it to power |
| 17 | +our machine learning and data pipelines. |
| 18 | +It's 2020 now, and we are using it heavily in production for performing machine learning based |
| 19 | +forecasting and optimization for our customers. |
| 20 | +Over time, we discovered that the way we are using Dask differs from how it is typically |
| 21 | +used in the community. |
| 22 | +(Teaser: For instance, we are running about 500 Dask clusters in total and dynamically scale up and down |
| 23 | +the number of workers.) |
| 24 | +So we think it's time to take a a look at the way we are using Dask! |
| 25 | + |
| 26 | +## Use case |
| 27 | + |
| 28 | +First, let's have a look at the main use case we have for Dask. The use of Dask at Blue Yonder |
| 29 | +is strongly coupled to the concept of [datasets](https://tech.jda.com/introducing-kartothek/), |
| 30 | +tabular data stored as [Apache Parquet](https://parquet.apache.org/) files in blob storage. |
| 31 | +We use datasets for storing the input data for our computations as well as intermediate results and the final output data |
| 32 | +served again to our customers. |
| 33 | +Dask is used in managing/creating this data as well as performing the necessary computations. |
| 34 | + |
| 35 | +Our pipeline consists of downloading data from a relational database into a dataset |
| 36 | +(we use Dask here for parallelization of the download) and then of several steps that each read |
| 37 | +the data from an input dataset, do a computations on it, and write it out to another dataset. |
| 38 | + |
| 39 | +In many cases, the layout of the source dataset (the partitioning, i.e., what data resides in which blob) |
| 40 | +is used for parallelization. |
| 41 | +This means the algorithms work independently on the individual blobs of the source dataset. |
| 42 | +Therefore, our respective Dask graphs are embarassingly parallel. |
| 43 | +The individual nodes perfom |
| 44 | +the sequential operations of reading in the data from a source dataset blob, doing some computation on it, |
| 45 | +and writing it out to a target dataset blob. |
| 46 | +Again, there is a final reduction node writing the target |
| 47 | +dataset metadata. |
| 48 | +We typically use Dask's Delayed interface for these computations. |
| 49 | + |
| 50 | + |
| 51 | + |
| 52 | +In between, we have intermediate steps for re-shuffling the data. |
| 53 | +This works by reading the dataset as a [Dask dataframe](https://docs.dask.org/en/latest/dataframe.html) |
| 54 | +repartitioning the dataframe using network shuffle, and writing it out again to a dataset. |
| 55 | + |
| 56 | +The size of the data we work on varies strongly depending on the individual customer. |
| 57 | +The largest ones currently amount to about one billion rows per day. |
| 58 | +This corresponds to 30 GiB of compressed Parquet data, which is roughly 500 GiB of uncompressed data in memory. |
| 59 | + |
| 60 | +## Dask cluster setup at Blue Yonder |
| 61 | + |
| 62 | +We run a somewhat unique setup of Dask clusters that is driven by the specific requirements |
| 63 | +of our domain. |
| 64 | +For reasons of data isolation between customers and environment isolation for SaaS applications |
| 65 | +we run separate Dask clusters per customer and per environment (production, staging, and development). |
| 66 | + |
| 67 | +But it does not stop there. |
| 68 | +The service we provide to our customers is comprised of several products that build upon each other |
| 69 | +and maintained by different teams. We typically perform daily batch runs with these products running sequentially |
| 70 | +in separated environments. |
| 71 | +For performance reasons, we install the Python packages holding the code needed for the computations on each worker. |
| 72 | +We do not want to synchronize the dependencies and release cycles of our different products, which |
| 73 | +means we have to run a separate Dask cluster for each of the steps in the batch run. |
| 74 | +This results in us operating more than |
| 75 | +ten Dask clusters per customer and environment, with most of the time, only one of the clusters being active |
| 76 | +and computing something. While this leads to overhead in terms of administration and hardware resouces, |
| 77 | +(which we have to mitigate, as outlined below) |
| 78 | +it also gives us a lot of flexibility. For instance, we can update the software on the cluster of one part of the compute pipeline |
| 79 | +while another part of the pipeline is computing something on a different cluster. |
| 80 | + |
| 81 | + |
| 82 | +### Some numbers |
| 83 | + |
| 84 | +The number and size of the workers varies from cluster to cluster depending on the degree of parallelism |
| 85 | +of the computation being performed, its resource requirements, and the available timeframe for the computation. |
| 86 | +At the time of writing, we are running more than 500 distinct clusters. |
| 87 | +Our clusters have between one and 225 workers, with worker size varying between 1GiB and 64GiB of memory. |
| 88 | +We typically configure one CPU for the smaller workers and two for the larger ones. |
| 89 | +While our Python computations do not leverage thread-level parallelism, the Parquet serialization part, |
| 90 | +which is implemented in C++, can benefit from the additional CPU. |
| 91 | +Our total memory use (sum over all clusters) goes up to as much as 15TiB. |
| 92 | +The total number of dask workers we run varies between 1000 and 2000. |
| 93 | + |
| 94 | + |
| 95 | + |
| 96 | +## Cluster scaling and resilience |
| 97 | + |
| 98 | +To improve manageability, resilience, and resource utilization, we run the Dask clusters on top |
| 99 | +of [Apache Mesos](http://mesos.apache.org/)/[Aurora](http://aurora.apache.org/) |
| 100 | +and [Kubernetes](https://kubernetes.io/). This means every worker as well as the scheduler and client |
| 101 | +each run in an isolated container. Communication happens via a simple service mesh |
| 102 | +implemented via reverse proxies to make the communication |
| 103 | +endpoints independent of the actual container instance. |
| 104 | + |
| 105 | +Running on top of a system like Mesos or Kubernetes provides us with resilience since a failing worker |
| 106 | +(for instance, as result of a failing hardware node) |
| 107 | +can simply be restarted on another node of the system. |
| 108 | +It also enables us to easily commission or decommission Dask clusters, making the amount of clusters we run |
| 109 | +manageable in the first place. |
| 110 | + |
| 111 | +Running 500 Dask clusters also requires a lot of hardware. We have put two measures in place to improve |
| 112 | +the utilization of hardware resources: oversubscription and autoscaling. |
| 113 | + |
| 114 | +### Oversubscription |
| 115 | + |
| 116 | +[Oversubscription](http://mesos.apache.org/documentation/latest/oversubscription/) is a feature of Mesos |
| 117 | +that allows allocating more resources than physically present to services running on the system. |
| 118 | +This is based on the assumption that not all services exhaust all of their allocated resources at the same time. |
| 119 | +If the assumption is violated, we prioritize the resouces to the more important ones. |
| 120 | +We use this to re-purpose the resources allocated for production clusters but not utilized the whole time |
| 121 | + and use them for development and staging systems. |
| 122 | + |
| 123 | +### Autoscaling |
| 124 | + |
| 125 | +Autoscaling is a mechanism we implemented to dynamically adapt the number of workers in a Dask cluster |
| 126 | +to the load on the cluster. This is possible since Mesos/Kubernetes . This |
| 127 | +makes it really easy to add or remove worker instances from an existing Dask cluster. |
| 128 | + |
| 129 | +To determine the optimum number of worker instances to run, we added the ``desired_workers`` metric to Distributed. |
| 130 | +The metric exposes |
| 131 | +the degree of parallelism that a computation has and thus allows us to infer how much workers a cluster should |
| 132 | +ideally have. Based on this metric, as well as on the overall resources available and on fairness criteria |
| 133 | +(remember, we run a lot of Dask/Distributed clusters), we add or remove workers to our clusters. |
| 134 | +To resolve the problem of balancing the conflicting requirements for different resouces like RAM or CPUs |
| 135 | + using [Dominant Resource Fairness](https://cs.stanford.edu/~matei/papers/2011/nsdi_drf.pdf). |
| 136 | + |
| 137 | +## Dask issues |
| 138 | + |
| 139 | +The particular way we use Dask, especially running it in containers connected by reverse proxies and the fact |
| 140 | +that we dynamically add/remove workers from a cluster quite frequently for autoscaling has lead us to hit some |
| 141 | +edge cases and instabilities and given us the chance to contribute some fixes and improvements to Dask. |
| 142 | +For instance, we were able to |
| 143 | +[improve stability after connection failures](https://github.com/dask/distributed/pull/3246) or when |
| 144 | +[workers are removed from the cluster](https://github.com/dask/distributed/pull/3366). |
| 145 | + |
| 146 | +If you are interested in our contributions to Dask and our commitment to the dask community, please |
| 147 | +also check out our blog post [Karlsruhe to D.C. ― a Dask story](2020-05-28-dask-developer-workshop.markdown). |
| 148 | + |
| 149 | +## Conclusion |
| 150 | + |
| 151 | +Overall, we are very happy with Dask and the capabilities it offers. |
| 152 | +Having migrated to Dask from a proprietary compute framework that was developed within our company, |
| 153 | +we noticed that we have had similar pain points with both solutions: running into edge cases and robustness |
| 154 | +issues in daily operations. |
| 155 | +However, with Dask being an open source solution, we have the confidence that others can also profit from |
| 156 | +the fixes that we contribute, and that there are problems we **don't** run into because other people have |
| 157 | +already experienced and fixed them. |
| 158 | + |
| 159 | +For the future, we envision adding even more robustness to Dask: |
| 160 | +Topics like scheduler/worker resilience (that is, surviving the loss of a worker or even the scheduler without losing computation results) |
| 161 | +and flexible scaling of the cluster are of great interest to us. |
0 commit comments