diff --git a/content/dask.md b/content/dask.md index 222131e..da70763 100644 --- a/content/dask.md +++ b/content/dask.md @@ -1,93 +1,732 @@ -# Dask +(dask)= -:::{questions} -- What syntax is used to make a lesson? -- How do you structure a lesson effectively for teaching? -- `questions` are at the top of a lesson and provide a starting - point for what you might learn. It is usually a bulleted list. -::: +# Dask for Scalable Analytics :::{objectives} -- Show a complete lesson page with all of the most common - structures. -- ... -This is also a holdover from the carpentries-style. It could -usually be left off. +- Understand how Dask achieves parallelism +- Learn a few common workflows with Dask +- Understand lazy execution + +:::{instructor-note} + +- 40 min teaching/type-along +- 40 min exercises + ::: +## Overview + +An increasingly common problem faced by researchers and data scientists +today is that datasets are becoming larger and larger and modern data analysis +is thus becoming more and more computationally demanding. The first +difficulty to deal with is when the volume of data exceeds one's computer's RAM. +Modern laptops/desktops have about 10 GB of RAM. Beyond this threshold, +some special care is required to carry out data analysis. +The next threshold of difficulty is when the data can not even +fit on the hard drive, which is about a couple of TB on a modern laptop. +In this situation, it is better to use an HPC system or a cloud-based solution, +and Dask is a tool that helps us easily extend our familiar data analysis +tools to work with big data. In addition, Dask can also speeds up +our analysis by using multiple CPU cores which makes our work run +faster on laptop, HPC and cloud platforms. + +## What is Dask? + +Dask is composed of two parts: + +- Dynamic task scheduling optimized for computation. Similar to other workflow + management systems, but optimized for interactive computational workloads. +- "Big Data" collections like parallel arrays, dataframes, and lists that extend + common interfaces like NumPy, Pandas, or Python iterators to larger-than-memory + or distributed environments. These parallel collections run on top of dynamic + task schedulers. + +:::{figure} img/dask-overview.svg +High level collections are used to generate task graphs which can be executed +by schedulers on a single machine or a cluster. From the +[Dask documentation](https://docs.dask.org/en/stable/). +::: + +## Dask clusters + +Dask needs computing resources in order to perform parallel computations. +"Dask Clusters" have different names corresponding to different computing environments, +for example: +> - `LocalCluster` on laptop/desktop/cluster +> - `PBSCluster` or `SLURMCluster` on HPC +> - `Kubernetes` cluster in the cloud + +Each cluster will be allocated with a given number of "workers" associated with +CPU and RAM and the Dask scheduling system automatically maps jobs to each worker. + +Dask provides four different schedulers: + +```{csv-table} +:widths: auto +:delim: ; + + Type ; Multi-node ; Description + ``threads`` ; No ; A single-machine scheduler backed by a thread pool + ``processes`` ; No ; A single-machine scheduler backed by a process pool + ``synchronous`` ; No ; A single-threaded scheduler, used for debugging + ``distributed`` ; yes ; A distributed scheduler for executing on multiple nodes/machines + +``` +Here we will focus on using a `LocalCluster`, and it is recommended to use +a distributed scheduler `dask.distributed`. It is more sophisticated, offers more features, +but requires minimum effort to set up. It can run locally on a laptop and scale up to a cluster. -The introduction should be a high level overview of what is on the -page and why it is interesting. +:::{callout} Alternative 1: Initializing a Dask ``LocalCluster`` via JupyterLab +:class: dropdown +This makes use of the ``dask-labextension`` which is pre-installed in our conda environment. -The lines below (only in the source) will set the default highlighting -language for the entire page. +1. Start New Dask Cluster from the sidebar and by clicking on ``+ NEW`` button. +2. Click on the ``< >`` button to inject the client code into a notebook cell. Execute it. -:::{highlight} python + ![](img/jlab-dask-1.png) ![](img/jlab-dask-2.png) + +3. You can scale the cluster for more resources or launch the dashboard. +![](img/jlab-dask-3.png) ::: +:::::{callout} Alternative 2: Manual `LocalCluster` + +We can also start a `LocalCluster` scheduler manually, which can use all +available resources or just a subset. + +::::{tabs} + +:::{group-tab} All resources + +We can use all the cores and RAM we have on the machine by: + +```python +from dask.distributed import Client, LocalCluster +# create a local cluster +cluster = LocalCluster() +# connect to the cluster we just created +client = Client(cluster) +client +``` + +Or you can simply launch a `Client()` call which is shorthand for what is +described above. + +```python +from dask.distributed import Client +client = Client() # same as Client(processes=True) +client +``` +::: -## Section +:::{group-tab} Specified resources -A section. +This option limits the compute resources available as follows: -:::{discussion} -Discuss the following. +```python +from dask.distributed import Client, LocalCluster + +cluster = LocalCluster( + n_workers=4, + threads_per_worker=1, + memory_limit='4GiB' # memory limit per worker +) +client = Client(cluster) +client +``` -- A discussion section -- Another discussion topic +::: +:::: +::::: + +:::{note} +When setting up the cluster, one should consider the balance between the number of workers +and threads per worker with different workloads by setting the parameter `processes`. +By default `processes=True` and this is a good choice for workloads that have the GIL, +thus it is better to have more workers and fewer threads per worker. Otherwise, when `processes=False`, +in this case all workers run as threads within the same process as the client, +and they share memory resources. This works well for large datasets. ::: +Cluster managers also provide useful utilities: for example if a cluster manager supports scaling, +you can modify the number of workers manually or automatically based on workload: +```python +cluster.scale(10) # Sets the number of workers to 10 +cluster.adapt(minimum=1, maximum=10) # Allows the cluster to auto scale to 10 when tasks are computed +``` -## Section +Dask distributed scheduler also provides live feedback via its interactive dashboard. +A link that redirects to the dashboard will prompt in the terminal +where the scheduler is created, and it is also shown when you create a Client and connect the scheduler. +By default, when starting a scheduler on your local machine the dashboard will be served at + and can be always queried from commond line by: +```python +cluster.dashboard_link +http://127.0.0.1:8787/status +# or +client.dashboard_link ``` -print("hello world") -# This uses the default highlighting language + +When everything finishes, you can shut down the connected scheduler and workers +by calling the {meth}`shutdown` method: + +```python +client.shutdown() ``` +## Dask collections + +Dask provides dynamic parallel task scheduling and +three main high-level collections: + +> - `dask.array`: Parallel NumPy arrays +> - `dask.dataframe`: Parallel Pandas DataFrames +> - `dask.bag`: Parallel Python Lists + +### Dask arrays + +A Dask array looks and feels a lot like a NumPy array. +However, a Dask array uses the so-called "lazy" execution mode, +which allows one to build up complex, large calculations symbolically +before turning them over the scheduler for execution. + +:::{callout} Lazy evaluation + + Contrary to normal computation, lazy execution mode is when all the computations + needed to generate results are symbolically represented, forming a queue of + tasks mapped over data blocks. Nothing is actually computed until the actual + numerical values are needed, e.g. plotting, to print results to the screen or write to disk. + At that point, data is loaded into memory and computation proceeds in a streaming + fashion, block-by-block. The actual computation is controlled by a multi-processing + or thread pool, which allows Dask to take full advantage of multiple processors + available on the computers. + +::: + ```python -print("hello world) +import numpy as np +shape = (1000, 4000) +ones_np = np.ones(shape) +ones_np +ones_np.nbytes / 1e6 ``` +Now let's create the same array using Dask's array interface. +```python +import dask.array as da +shape = (1000, 4000) +ones = da.ones(shape) +ones +``` + +Although this works, it is not optimized for parallel computation. In order to use all +available computing resources, we also specify the `chunks` argument with Dask, +which describes how the array is split up into sub-arrays: -## Exercises: description +```python +import dask.array as da +shape = (1000, 4000) +chunk_shape = (1000, 1000) +ones = da.ones(shape, chunks=chunk_shape) +ones +``` + +:::{note} +In this course, we will use a chunk shape, but other ways to specify `chunks` +size can be found +[here](). +::: + +Let us further calculate the sum of the dask array: + +```python +sum_da = ones.sum() +``` + +So far, only a task graph of the computation is prepared. +We can visualize the task graph by calling {meth}`visualize`: + +```python +dask.visualize(sum_da) +# or +sum_da.visualize() +``` + +One way to trigger the computation is to call {meth}`compute`: + +```python +dask.compute(sum_da) +# or +sum_da.compute() +``` + +You can find additional details and examples [here](https://examples.dask.org/array.html). + +### Dask dataframe + +Dask dataframes split a dataframe into partitions along an index and can be used +in situations where one would normally use Pandas, but this fails due to data size or +insufficient computational efficiency. Specifically, you can use Dask dataframes to: + +- manipulate large datasets, even when these don't fit in memory +- accelerate long computations by using many cores +- perform distributed computing on large datasets with standard Pandas operations + like groupby, join, and time series computations. + +Let us revisit the dataset containing the Titanic passenger list, and now transform it to +a Dask dataframe: + +```python +import pandas as pd +import dask.dataframe as dd + +url = "https://raw.githubusercontent.com/pandas-dev/pandas/master/doc/data/titanic.csv" + +df = pd.read_csv(url, index_col="Name") +# read a Dask Dataframe from a Pandas Dataframe +ddf = dd.from_pandas(df, npartitions=10) +``` + +Alternatively you can directly read into a Dask dataframe, whilst also modifying +how the dataframe is partitioned in terms of `blocksize`: + +```python +# blocksize=None which means a single chunk is used +df = dd.read_csv(url,blocksize=None).set_index('Name') +ddf= df.repartition(npartitions=10) + +# blocksize="4MB" or blocksize=4e6 +ddf = dd.read_csv(url,blocksize="4MB").set_index('Name') +ddf.npartitions + +# blocksize="default" means the chunk is computed based on +# available memory and cores with a maximum of 64MB +ddf = dd.read_csv(url,blocksize="default").set_index('Name') +ddf.npartitions +``` + +Dask dataframes do not support the entire interface of Pandas dataframes, but +the most [commonly used methods are available](https://docs.dask.org/en/stable/dataframe.html#scope). +For a full listing refer to the +[dask dataframe API](https://docs.dask.org/en/stable/dataframe-api.html). + +We can for example perform the group-by operation we did earlier, but this time in parallel: + +```python +# add a column +ddf["Child"] = ddf["Age"] < 12 +ddf.groupby(["Sex", "Child"])["Survived"].mean().compute() +``` + +However, for a small dataframe like this the overhead of parallelisation will far +outweigh the benefit. + +You can find additional details and examples here +. + +### Dask bag + +A Dask bag enables processing data that can be represented as a sequence of arbitrary +inputs ("messy data"), like in a Python list. Dask Bags are often used to for +preprocessing log files, JSON records, or other user defined Python objects. + +We will content ourselves with implementing a dask version of the word-count problem, +specifically the step where we count words in a text. + +(word-count-problem)= + +:::{demo} Demo: Dask version of word-count + +If you have not already cloned or downloaded ``word-count-hpda`` repository, +[get it from here](https://github.com/ENCCS/word-count-hpda). +Then, navigate to the ``word-count-hpda`` directory. The serial version (wrapped in +multiple functions in the ``source/wordcount.py`` code) looks like this: + +```python + +filename = './data/pg10.txt' +DELIMITERS = ". , ; : ? $ @ ^ < > # % ` ! * - = ( ) [ ] { } / \" '".split() + +with open(filename, "r") as input_fd: + lines = input_fd.read().splitlines() + +counts = {} +for line in lines: + for purge in DELIMITERS: + line = line.replace(purge, " ") + words = line.split() + for word in words: + word = word.lower().strip() + if word in counts: + counts[word] += 1 + else: + counts[word] = 1 + +sorted_counts = sorted( + list(counts.items()), + key=lambda key_value: key_value[1], + reverse=True +) + +sorted_counts[:10] + + ``` + + A very compact ``dask.bag`` version of this code is as follows: + +```python + +import dask.bag as db +filename = './data/pg10.txt' +DELIMITERS = ". , ; : ? $ @ ^ < > # % ` ! * - = ( ) [ ] { } / \" '".split() + +text = db.read_text(filename, blocksize='1MiB') +sorted_counts = ( + text + .filter(lambda word: word not in DELIMITERS) + .str.lower() + .str.strip() + .str.split() + .flatten() + .frequencies().topk(10,key=1) + .compute() +) + +sorted_counts +``` + +The last two steps of the pipeline could also have been done with a dataframe: + +```python + :emphasize-lines: 9-10 + + filtered = ( + text + .filter(lambda word: word not in DELIMITERS) + .str.lower() + .str.strip() + .str.split() + .flatten() + ) + ddf = filtered.to_dataframe(columns=['words']) + ddf['words'].value_counts().compute()[:10] +``` + +::: + +:::{callout} When to use Dask + + There is no benefit from using Dask on small datasets. But imagine we were + analysing a very large text file (all tweets in a year? a genome?). Dask provides + both parallelisation and the ability to utilize RAM on multiple machines. + +::: + +## Exercise set 1 + +Choose an exercise with the data structure that you are most interested in: +{ref}`ex-dask-array`, {ref}`ex-dask-df` or {ref}`ex-dask-bag`. + +(ex-dask-array)= + +### 1.1. Using dask.array + +:::::{challenge} Chunk size + +The following example calculate the mean value of a random generated array. +Run the example and see the performance improvement by using dask. + +::::{tabs} + +:::{group-tab} NumPy + +```{literalinclude} example/chunk_np.py +:language: python +``` + +::: +:::{group-tab} Dask + +```{literalinclude} example/chunk_dask.py +:language: python +``` + +::: +:::: + +But what happens if we use different chunk sizes? +Try out with different chunk sizes: + +- What happens if the dask chunks=(20000,20000) + +- What happens if the dask chunks=(250,250) + +:::{solution} Choice of chunk size + +The choice is problem dependent, but here are a few things to consider: + +Each chunk of data should be small enough so that it fits comforably in each worker's available memory. +Chunk sizes between 10MB-1GB are common, depending on the availability of RAM. Dask will likely +manipulate as many chunks in parallel on one machine as you have cores on that machine. +So if you have a machine with 10 cores and you choose chunks in the 1GB range, Dask is likely to use at least +10 GB of memory. Additionally, there should be enough chunks available so that each worker always has something to work on. + +On the otherhand, you also want to avoid chunk sizes that are too small as we see in the exercise. +Every task comes with some overhead which is somewhere between 200us and 1ms. Very large graphs +with millions of tasks will lead to overhead being in the range from minutes to hours which is not recommended. +::: + +::::: + +(ex-dask-df)= + +### 1.2. Using dask.dataframe + +::::{exercise} Benchmarking DataFrame.apply() + +Recall the :ref:`word count ` project that we encountered +earlier and the :func:`scipy.optimize.curve_fit` function. The +:download:`results.csv ` file contains word counts of the 10 +most frequent words in different texts, and we want to fit a power law to the +individual distributions in each row. + +Here are our fitting functions: + +```python +from scipy.optimize import curve_fit + +def powerlaw(x, A, s): + return A * np.power(x, s) + +def fit_powerlaw(row): + X = np.arange(row.shape[0]) + 1.0 + params, cov = curve_fit(f=powerlaw, xdata=X, ydata=row, p0=[100, -1], bounds=(-np.inf, np.inf)) + return params[1] +``` + +Compare the performance of :meth:`dask.dataframe.DataFrame.apply` with +:meth:`pandas.DataFrame.apply` for the this example. You will probably see a +slowdown due to the parallelisation overhead. But what if you add a +``time.sleep(0.01)`` inside :meth:`fit_powerlaw` to emulate a time-consuming +calculation? + +:::{callout} Hints +:class: dropdown + +- You will need to call :meth:`apply` on the dataframe starting from column 1: ``dataframe.iloc[:,1:].apply()`` +- Remember that both Pandas and Dask have the :meth:`read_csv` function. +- Try repartitioning the dataframe into 4 partitions with ``ddf4=ddf.repartition(npartitions=4)``. +- You will probably get a warning in your Dask version that `You did not provide metadata`. + To remove the warning, add the ``meta=(None, "float64")`` flag to :meth:`apply`. For the + current data, this does not affect the performance. +::: + +:::{callout} More hints with Pandas code +:class: dropdown + +You need to reimplement the highlighted part which creates the +dataframe and applies the :func:`fit_powerlaw` function. + +```{literalinclude} exercise/apply_pd.py +:language: ipython +:emphasize-lines: 16-17 +``` -:::{exercise} Exercise Topic-1: imperative description of exercise -Exercise text here. ::: :::{solution} -Solution text here + +```{literalinclude} exercise/apply_dask.py +:language: ipython +``` + ::: +:::: +(ex-dask-bag)= +### 1.3. Using dask.bag -## Summary +:::{exercise} Break down the dask.bag computational pipeline -A Summary of what you learned and why it might be useful. Maybe a -hint of what comes next. +Revisit the +:ref:`word count problem ` +and the implementation with a ``dask.bag`` that we saw above. +- To get a feeling for the computational pipeline, break down the computation into + separate steps and investigate intermediate results using :meth:`.compute`. +- Benchmark the serial and ``dask.bag`` versions. Do you see any speedup? + What if you have a larger textfile? You can for example concatenate all texts into + a single file: ``cat data/*.txt > data/all.txt``. +::: + +## Low level interface: delayed + +Sometimes problems don't fit into one of the collections like +`dask.array` or `dask.dataframe`, they are not as simple as just a big array or dataframe. +In these cases, `dask.delayed` may be the right choice. If the problem is paralellisable, +we can use `dask.delayed` which allows users to make function calls lazy +and thus can be put into a task graph with dependencies. +Consider the following example. The functions are very simple, and they *sleep* +for a prescribed time to simulate real work: -## See also +```{literalinclude} example/delay.py +``` -- Other relevant links -- Other link +Let us run the example first, one after the other in sequence: +```ipython +%%timeit +x = inc(1) +y = dec(2) +z = add(x, y) +# 902 ms ± 367 µs per loop (mean ± std. dev. of 7 runs, 1 loop each) +``` +Note that the first two functions `inc` and `dec` don't depend on each other, +we could have called them in parallel. We can call `dask.delayed` on these functions +to make them lazy and tasks into a graph which we will run later on parallel hardware. -:::{keypoints} -- What the learner should take away -- point 2 -- ... +```ipython +import dask +inc_delay = dask.delayed(inc) +dec_delay = dask.delayed(dec) +add_delay = dask.delayed(add) +``` + +```ipython +%%timeit +x = inc_delay(1) +y = dec_delay(2) +z = add_delay(x, y) +# 59.6 µs ± 356 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each) +``` + +```ipython +%%timeit +x = inc_delay(1) +y = dec_delay(2) +z = add_delay(x, y) +z.compute() +# 603 ms ± 181 µs per loop (mean ± std. dev. of 7 runs, 1 loop each) +``` + +:::{callout} Default scheduler for dask collections + +``dask.array`` and ``dask.dataframe`` use the ``threads`` scheduler + +``dask.bag`` uses the ``processes`` scheduler + +In case to change the default scheduler, using `dask.config.set` is recommended: + +```ipython +# To set globally +dask.config.set(scheduler='processes') +x.compute() + +# To set it as a context manager +with dask.config.set(scheduler='threads'): + x.compute() +``` -This is another holdover from the carpentries style. This perhaps -is better done in a "summary" section. ::: + +## Comparison to Spark + +Dask has much in common with the [Apache Spark](https://spark.apache.org/). +Here are [some differences](https://docs.dask.org/en/stable/spark.html) +between the two frameworks: + +- Dask is smaller and more lightweight but is used together with other packages in + the Python ecosystem. Spark is an all-in-one project with its own ecosystem. +- Spark is written in Scala, with some support for Python and R, while Dask is in Python. +- Spark is more focused on business intelligence (SQL, lightweight machine learning) while + Dask is more general and is used more in scientific applications. +- Both Dask and Spark can scale from one to thousands of nodes. +- Dask supports the NumPy model for multidimensional arrays which Spark doesn't. +- Spark generally expects users to compose computations out of high-level primitives + (map, reduce, groupby, join, etc.), while Dask allows to specify arbitrary task + graphs for more complex and custom systems. + +## Exercise set 2 + +::::{challenge} Dask delay + +We extend the previous example a little bit more by applying the function +on a data array using for loop and adding an *if* condition: + +```{literalinclude} example/delay_more.py +``` + +Please add ``dask.delayed`` to parallelize the program as much as possible +and check graph visualizations. + +:::{solution} + +```{literalinclude} example/delay_more_solution.py +``` + +::: +:::: + +::::{challenge} Climate simulation data using Xarray and Dask + +This exercise is working with NetCDF files using Xarray. The files contain +monthly global 2m air temperature for 10 years. +Xarray is chosen due to its ability to seamlessly integrate with Dask +to support parallel computations on datasets. + +We will first read data with Dask and Xarray. See + for more details. + +Note that the NetCDF files are here , +you need to ``git clone`` the repository or download the files to your laptop first. +Then depending on where you put the files, +you may need to adapt the path to the data folder in the Python code. + +```ipython +import dask +import xarray as xr +import matplotlib.pyplot as plt +%matplotlib inline +ds=xr.open_mfdataset('./data/tas*.nc', parallel=True,use_cftime=True) +``` + +:func:`xarray.open_mfdataset` is for reading multiple files and will chunk each +file into a single Dask array by default. One could supply the chunks keyword +argument to control the size of the resulting Dask arrays. Passing the keyword +argument ``parallel=True`` to :func:`xarray.open_mfdataset` will speed up the +reading of large multi-file datasets by executing those read tasks in parallel +using ``dask.delayed``. + +Explore the following operations line-by-line: + +```ipython +ds +ds.tas +#dsnew = ds.chunk({"time": 1,"lat": 80,"lon":80}) # you can further rechunk the data +#dask.visualize(ds.tas) # do not visualize, the graph is too big +ds['tas'] = ds['tas'] - 273.15 # convert from Kelvin to degree Celsius +mean_tas=ds.tas.mean("time") # lazy compuation +mean_tas.plot(cmap=plt.cm.RdBu_r,vmin=-50,vmax=50) # plotting triggers computation +tas_ann=ds.tas.groupby('time.year').mean() # lazy compuation +tas_sto=tas_ann.sel(lon=18.07, lat=59.33,method='nearest') # slicing is lazy as well +plt.plot(tas_sto.year,tas_sto) # plotting trigers computation +``` + +:::: + +```{keypoints} + - Dask uses lazy execution + - Dask can parallelize and perform out-of-memory computation. + That is, handle data that would not fit in the memory if loaded at once. + - Only use Dask for processing very large amount of data diff --git a/content/dask_opt.md b/content/dask_opt.md new file mode 100644 index 0000000..dbb897f --- /dev/null +++ b/content/dask_opt.md @@ -0,0 +1,186 @@ +# Dask (II) + +:::::{challenge} Testing different schedulers + +We will test different schedulers and compare the performance on a simple task calculating +the mean of a random generated array. + +Here is the code using NumPy: + +```{literalinclude} example/dask_gil.py +:language: ipython +:lines: 1-7 +``` + +Here we run the same code using different schedulers from Dask: + +::::{tabs} + +:::{group-tab} Serial + +```{literalinclude} example/dask_gil.py +:language: ipython +:lines: 9-12 +``` + +::: + +:::{group-tab} Threads + +```{literalinclude} example/dask_gil_threads.py +:language: ipython +:lines: 1-10 +``` + +```{literalinclude} example/dask_gil_threads.py +:language: ipython +:lines: 12-15 +``` + +```{literalinclude} example/dask_gil_threads.py +:language: ipython +:lines: 17-20 +``` + +```{literalinclude} example/dask_gil_threads.py +:language: ipython +:lines: 22-25 +``` + +::: + +:::{group-tab} Processes + +```{literalinclude} example/dask_gil_processes.py +:language: ipython +:lines: 1-10 +``` + +```{literalinclude} example/dask_gil_processes.py +:language: ipython +:lines: 12-15 +``` + +```{literalinclude} example/dask_gil_processes.py +:language: ipython +:lines: 17-20 +``` + +```{literalinclude} example/dask_gil_processes.py +:language: ipython +:lines: 22-25 +``` + +::: + +:::{group-tab} Distributed + +```{literalinclude} example/dask_gil_distributed.py +:language: ipython +:lines: 1-14 +``` + +```literalinclude} example/dask_gil_distributed.py +:language: ipython +:lines: 16-17 +``` + +```{literalinclude} example/dask_gil_distributed.py +:language: ipython +:lines: 19-21 +``` + +```{literalinclude} example/dask_gil_distributed.py +:language: ipython +:lines: 23-25 +``` + +```{literalinclude} example/dask_gil_distributed.py +:language: ipython +:lines: 27 +``` + +::: + +:::: + +:::{solution} Testing different schedulers + +Comparing profiling from mt_1, mt_2 and mt_4: Using ``threads`` scheduler is limited by the GIL on pure Python code. +In our case, although it is not a pure Python function, it is still limited by GIL, therefore no multi-core speedup + +Comparing profiling from mt_1, mp_1 and dis_1: Except for ``threads``, the other two schedulers copy data between processes +and this can introduce performance penalties, particularly when the data being transferred between processes is large. + +Comparing profiling from serial, mt_1, mp_1 and dis_1: Creating and destroying threads and processes have overheads, +``processes`` have even more overhead than ``threads`` + +Comparing profiling from mp_1, mp_2 and mp_4: Running multiple processes is only effective when there is enough computational +work to do i.e. CPU-bound tasks. In this very example, most of the time is actually spent on transferring the data +rather than computing the mean + +Comparing profiling from ``processes`` and ``distributed``: Using ``distributed`` scheduler has advantages over ``processes``, +this is related to better handling of data copying, i.e. ``processes`` scheduler copies data for every task, while +``distributed`` scheduler copies data for each worker. +::: +::::: + +:::{challenge} SVD with large skinny matrix using ``distributed`` scheduler + +We can use dask to compute SVD of a large matrix which does not fit into the +memory of a normal laptop/desktop. While it is computing, you should switch to +the Dask dashboard and watch column "Workers" and "Graph", so you must run this +using ``distributed`` scheduler + +```python +import dask +import dask.array as da +X = da.random.random((2000000, 100), chunks=(10000, 100)) +X +u, s, v = da.linalg.svd(X) +dask.visualize(u, s, v) +s.compute() + +``` + +SVD is only supported for arrays with chunking in one dimension, which requires that the matrix +is either *tall-and-skinny* or *short-and-fat*. +If chunking in both dimensions is needed, one should use approximate algorithm. + +```python +import dask +import dask.array as da +X = da.random.random((10000, 10000), chunks=(2000, 2000)) +u, s, v = da.linalg.svd_compressed(X, k=5) +dask.visualize(u, s, v) +s.compute() + +``` + +::: + +:::{callout} Memory management + +You may observe that there are different memory categories showing on the dashboard: + +- process: Overall memory used by the worker process, as measured by the OS +- managed: Size of data that Dask holds in RAM, but most probably inaccurate, excluding spilled data. +- unmanaged: Memory that Dask is not directly aware of, this can be e.g. Python modules, + temporary arrays, memory leasks, memory not yet free()'d by the Python memory manager to the OS +- unmanaged recent: Unmanaged memory that has appeared within the last 30 seconds whch is not included + in the "unmanaged" memory measure +- spilled: Memory spilled to disk + +The sum of managed + unmanaged + unmanaged recent is equal by definition to the process memory. + +When the managed memory exceeds 60% of the memory limit (target threshold), +the worker will begin to dump the least recently used data to disk. +Above 70% of the target memory usage based on process memory measurment (spill threshold), +the worker will start dumping unused data to disk. + +At 80% process memory load, currently executing tasks continue to run, but no additional tasks +in the worker's queue will be started. + +At 95% process memory load (terminate threshold), all workers will be terminated. Tasks will be cancelled +as well and data on the worker will be lost and need to be recomputed. +::: diff --git a/content/example/chunk_dask.py b/content/example/chunk_dask.py new file mode 100644 index 0000000..3420f97 --- /dev/null +++ b/content/example/chunk_dask.py @@ -0,0 +1,7 @@ +import dask +import dask.array as da + +%%time +x = da.random.random((20000, 20000), chunks=(1000, 1000)) +y = x.mean(axis=0) +y.compute() diff --git a/content/example/chunk_np.py b/content/example/chunk_np.py new file mode 100644 index 0000000..57eaabb --- /dev/null +++ b/content/example/chunk_np.py @@ -0,0 +1,5 @@ +import numpy as np + +%%time +x = np.random.random((20000, 20000)) +y = x.mean(axis=0) diff --git a/content/example/dask_gil.py b/content/example/dask_gil.py new file mode 100644 index 0000000..f4ce647 --- /dev/null +++ b/content/example/dask_gil.py @@ -0,0 +1,12 @@ +import dask +import time +import numpy as np + +def calc_mean(i, n): + data = np.mean(np.random.normal(size = n)) + return(data) + +n = 100000 +%%timeit +rs=[calc_mean(i, n) for i in range(100)] +#352 ms ± 925 µs per loop (mean ± std. dev. of 7 runs, 1 loop each) diff --git a/content/example/dask_gil_distributed.py b/content/example/dask_gil_distributed.py new file mode 100644 index 0000000..79b1279 --- /dev/null +++ b/content/example/dask_gil_distributed.py @@ -0,0 +1,27 @@ +import dask +import time +import numpy as np +from dask.distributed import Client, LocalCluster + +def calc_mean(i, n): + data = np.mean(np.random.normal(size = n)) + return(data) + +n = 100000 +output = [dask.delayed(calc_mean)(i, n) for i in range(100)] + +cluster = LocalCluster(n_workers = 1,threads_per_worker=1) +c = Client(cluster) + +%timeit dis_1 = dask.compute(output,n_workers = 1) +#619 ms ± 253 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) + +cluster.scale(2) +%timeit dis_2 = dask.compute(output,n_workers = 2) +#357 ms ± 131 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) + +cluster.scale(4) +%timeit dis_4 = dask.compute(output,n_workers = 4) +#265 ms ± 53.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) + +c.shutdown() diff --git a/content/example/dask_gil_processes.py b/content/example/dask_gil_processes.py new file mode 100644 index 0000000..a71a219 --- /dev/null +++ b/content/example/dask_gil_processes.py @@ -0,0 +1,25 @@ +import dask +import time +import numpy as np + +def calc_mean(i, n): + data = np.mean(np.random.normal(size = n)) + return(data) + +n = 100000 +output = [dask.delayed(calc_mean)(i, n) for i in range(100)] + +%%timeit +with dask.config.set(scheduler='processes',num_workers=1): + mp_1 = dask.compute(output) +#990 ms ± 39.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) + +%%timeit +with dask.config.set(scheduler='processes',num_workers=2): + mp_2 = dask.compute(output) +#881 ms ± 17.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) + +%%timeit +with dask.config.set(scheduler='processes',num_workers=4): + mp_4 = dask.compute(output) +#836 ms ± 10.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) diff --git a/content/example/dask_gil_threads.py b/content/example/dask_gil_threads.py new file mode 100644 index 0000000..3736c89 --- /dev/null +++ b/content/example/dask_gil_threads.py @@ -0,0 +1,25 @@ +import dask +import time +import numpy as np + +def calc_mean(i, n): + data = np.mean(np.random.normal(size = n)) + return(data) + +n = 100000 +output = [dask.delayed(calc_mean)(i, n) for i in range(100)] + +%%timeit +with dask.config.set(scheduler='threads',num_workers=1): + mt_1 = dask.compute(output) +#395 ms ± 18.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) + +%%timeit +with dask.config.set(scheduler='threads',num_workers=2): + mt_2 = dask.compute(output) +#1.28 s ± 1.46 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) + +%%timeit +with dask.config.set(scheduler='threads',num_workers=4): + mt_4 = dask.compute(output) +#1.28 s ± 3.84 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) diff --git a/content/example/delay_more.py b/content/example/delay_more.py new file mode 100644 index 0000000..2b59c0e --- /dev/null +++ b/content/example/delay_more.py @@ -0,0 +1,28 @@ +import time +import dask + +def inc(x): + time.sleep(0.5) + return x + 1 + +def dec(x): + time.sleep(0.3) + return x - 1 + +def add(x, y): + time.sleep(0.1) + return x + y + + +data = [1, 2, 3, 4, 5] +output = [] +for x in data: + if x % 2: + a = inc(x) + b = dec(x) + c = add(a, b) + else: + c = 10 + output.append(c) + +total = sum(output) diff --git a/content/example/delay_more_solution.py b/content/example/delay_more_solution.py new file mode 100644 index 0000000..6cd1007 --- /dev/null +++ b/content/example/delay_more_solution.py @@ -0,0 +1,28 @@ +import time +import dask + +def inc(x): + time.sleep(0.5) + return x + 1 + +def dec(x): + time.sleep(0.3) + return x - 1 + +def add(x, y): + time.sleep(0.1) + return x + y + + +data = [1, 2, 3, 4, 5] +output = [] +for x in data: + if x % 2: + a = dask.delayed(inc)(x) + b = dask.delayed(dec)(x) + c = dask.delayed(add)(a, b) + else: + c = dask.delayed(10) + output.append(c) + +total = dask.delayed(sum)(output) diff --git a/content/exercise/apply_dask.py b/content/exercise/apply_dask.py new file mode 100644 index 0000000..972e9ca --- /dev/null +++ b/content/exercise/apply_dask.py @@ -0,0 +1,26 @@ +import numpy as np +import dask.dataframe as dd +from scipy.optimize import curve_fit +import time + +def powerlaw(x, A, s): + return A * np.power(x, s) + +def fit_powerlaw(row): + X = np.arange(row.shape[0]) + 1.0 + params, cov = curve_fit(f=powerlaw, xdata=X, ydata=row, p0=[100, -1], bounds=(-np.inf, np.inf)) + time.sleep(0.01) + return params[1] + +ddf = dd.read_csv("https://raw.githubusercontent.com/ENCCS/hpda-python/main/content/data/results.csv") +ddf4=ddf.repartition(npartitions=4) + +# Note the optional argument ``meta`` which is recommended for dask dataframes. +# It should contain an empty ``pandas.DataFrame`` or ``pandas.Series`` +# that matches the dtypes and column names of the output, +# or a dict of ``{name: dtype}`` or iterable of ``(name, dtype)``. + +results = ddf4.iloc[:,1:].apply(fit_powerlaw, axis=1, meta=(None, "float64")) +%timeit results.compute() +results.visualize() + diff --git a/content/exercise/apply_pd.py b/content/exercise/apply_pd.py new file mode 100644 index 0000000..ae68109 --- /dev/null +++ b/content/exercise/apply_pd.py @@ -0,0 +1,17 @@ +import numpy as np +import pandas as pd +from scipy.optimize import curve_fit +import time + +def powerlaw(x, A, s): + return A * np.power(x, s) + +def fit_powerlaw(row): + X = np.arange(row.shape[0]) + 1.0 + params, cov = curve_fit(f=powerlaw, xdata=X, ydata=row, p0=[100, -1], bounds=(-np.inf, np.inf)) + time.sleep(0.01) + return params[1] + + +df = pd.read_csv("https://raw.githubusercontent.com/ENCCS/hpda-python/main/content/data/results.csv") +%timeit results = df.iloc[:,1:].apply(fit_powerlaw, axis=1) diff --git a/content/img/dag.png b/content/img/dag.png new file mode 100644 index 0000000..ae13ef6 Binary files /dev/null and b/content/img/dag.png differ diff --git a/content/img/dask-overview.svg b/content/img/dask-overview.svg new file mode 100644 index 0000000..c88c73e --- /dev/null +++ b/content/img/dask-overview.svg @@ -0,0 +1,419 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/content/img/jlab-dask-1.png b/content/img/jlab-dask-1.png new file mode 100644 index 0000000..3833594 Binary files /dev/null and b/content/img/jlab-dask-1.png differ diff --git a/content/img/jlab-dask-2.png b/content/img/jlab-dask-2.png new file mode 100644 index 0000000..c30d2ba Binary files /dev/null and b/content/img/jlab-dask-2.png differ diff --git a/content/img/jlab-dask-3.png b/content/img/jlab-dask-3.png new file mode 100644 index 0000000..60af478 Binary files /dev/null and b/content/img/jlab-dask-3.png differ diff --git a/content/index.md b/content/index.md index 7e6dd4e..31f7320 100644 --- a/content/index.md +++ b/content/index.md @@ -28,6 +28,13 @@ dask ``` +```{toctree} +:caption: Optional material +:maxdepth: 1 + +dask_opt +``` + ```{toctree} :caption: Reference :maxdepth: 1