Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
725 changes: 682 additions & 43 deletions content/dask.md

Large diffs are not rendered by default.

186 changes: 186 additions & 0 deletions content/dask_opt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
# Dask (II)

:::::{challenge} Testing different schedulers

We will test different schedulers and compare the performance on a simple task calculating
the mean of a random generated array.

Here is the code using NumPy:

```{literalinclude} example/dask_gil.py
:language: ipython
:lines: 1-7
```

Here we run the same code using different schedulers from Dask:

::::{tabs}

:::{group-tab} Serial

```{literalinclude} example/dask_gil.py
:language: ipython
:lines: 9-12
```

:::

:::{group-tab} Threads

```{literalinclude} example/dask_gil_threads.py
:language: ipython
:lines: 1-10
```

```{literalinclude} example/dask_gil_threads.py
:language: ipython
:lines: 12-15
```

```{literalinclude} example/dask_gil_threads.py
:language: ipython
:lines: 17-20
```

```{literalinclude} example/dask_gil_threads.py
:language: ipython
:lines: 22-25
```

:::

:::{group-tab} Processes

```{literalinclude} example/dask_gil_processes.py
:language: ipython
:lines: 1-10
```

```{literalinclude} example/dask_gil_processes.py
:language: ipython
:lines: 12-15
```

```{literalinclude} example/dask_gil_processes.py
:language: ipython
:lines: 17-20
```

```{literalinclude} example/dask_gil_processes.py
:language: ipython
:lines: 22-25
```

:::

:::{group-tab} Distributed

```{literalinclude} example/dask_gil_distributed.py
:language: ipython
:lines: 1-14
```

```literalinclude} example/dask_gil_distributed.py
:language: ipython
:lines: 16-17
```

```{literalinclude} example/dask_gil_distributed.py
:language: ipython
:lines: 19-21
```

```{literalinclude} example/dask_gil_distributed.py
:language: ipython
:lines: 23-25
```

```{literalinclude} example/dask_gil_distributed.py
:language: ipython
:lines: 27
```

:::

::::

:::{solution} Testing different schedulers

Comparing profiling from mt_1, mt_2 and mt_4: Using ``threads`` scheduler is limited by the GIL on pure Python code.
In our case, although it is not a pure Python function, it is still limited by GIL, therefore no multi-core speedup

Comparing profiling from mt_1, mp_1 and dis_1: Except for ``threads``, the other two schedulers copy data between processes
and this can introduce performance penalties, particularly when the data being transferred between processes is large.

Comparing profiling from serial, mt_1, mp_1 and dis_1: Creating and destroying threads and processes have overheads,
``processes`` have even more overhead than ``threads``

Comparing profiling from mp_1, mp_2 and mp_4: Running multiple processes is only effective when there is enough computational
work to do i.e. CPU-bound tasks. In this very example, most of the time is actually spent on transferring the data
rather than computing the mean

Comparing profiling from ``processes`` and ``distributed``: Using ``distributed`` scheduler has advantages over ``processes``,
this is related to better handling of data copying, i.e. ``processes`` scheduler copies data for every task, while
``distributed`` scheduler copies data for each worker.
:::
:::::

:::{challenge} SVD with large skinny matrix using ``distributed`` scheduler

We can use dask to compute SVD of a large matrix which does not fit into the
memory of a normal laptop/desktop. While it is computing, you should switch to
the Dask dashboard and watch column "Workers" and "Graph", so you must run this
using ``distributed`` scheduler

```python
import dask
import dask.array as da
X = da.random.random((2000000, 100), chunks=(10000, 100))
X
u, s, v = da.linalg.svd(X)
dask.visualize(u, s, v)
s.compute()

```

SVD is only supported for arrays with chunking in one dimension, which requires that the matrix
is either *tall-and-skinny* or *short-and-fat*.
If chunking in both dimensions is needed, one should use approximate algorithm.

```python
import dask
import dask.array as da
X = da.random.random((10000, 10000), chunks=(2000, 2000))
u, s, v = da.linalg.svd_compressed(X, k=5)
dask.visualize(u, s, v)
s.compute()

```

:::

:::{callout} Memory management

You may observe that there are different memory categories showing on the dashboard:

- process: Overall memory used by the worker process, as measured by the OS
- managed: Size of data that Dask holds in RAM, but most probably inaccurate, excluding spilled data.
- unmanaged: Memory that Dask is not directly aware of, this can be e.g. Python modules,
temporary arrays, memory leasks, memory not yet free()'d by the Python memory manager to the OS
- unmanaged recent: Unmanaged memory that has appeared within the last 30 seconds whch is not included
in the "unmanaged" memory measure
- spilled: Memory spilled to disk

The sum of managed + unmanaged + unmanaged recent is equal by definition to the process memory.

When the managed memory exceeds 60% of the memory limit (target threshold),
the worker will begin to dump the least recently used data to disk.
Above 70% of the target memory usage based on process memory measurment (spill threshold),
the worker will start dumping unused data to disk.

At 80% process memory load, currently executing tasks continue to run, but no additional tasks
in the worker's queue will be started.

At 95% process memory load (terminate threshold), all workers will be terminated. Tasks will be cancelled
as well and data on the worker will be lost and need to be recomputed.
:::
7 changes: 7 additions & 0 deletions content/example/chunk_dask.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import dask
import dask.array as da

%%time
x = da.random.random((20000, 20000), chunks=(1000, 1000))
y = x.mean(axis=0)
y.compute()
5 changes: 5 additions & 0 deletions content/example/chunk_np.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import numpy as np

%%time
x = np.random.random((20000, 20000))
y = x.mean(axis=0)
12 changes: 12 additions & 0 deletions content/example/dask_gil.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import dask
import time
import numpy as np

def calc_mean(i, n):
data = np.mean(np.random.normal(size = n))
return(data)

n = 100000
%%timeit
rs=[calc_mean(i, n) for i in range(100)]
#352 ms ± 925 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)
27 changes: 27 additions & 0 deletions content/example/dask_gil_distributed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import dask
import time
import numpy as np
from dask.distributed import Client, LocalCluster

def calc_mean(i, n):
data = np.mean(np.random.normal(size = n))
return(data)

n = 100000
output = [dask.delayed(calc_mean)(i, n) for i in range(100)]

cluster = LocalCluster(n_workers = 1,threads_per_worker=1)
c = Client(cluster)

%timeit dis_1 = dask.compute(output,n_workers = 1)
#619 ms ± 253 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

cluster.scale(2)
%timeit dis_2 = dask.compute(output,n_workers = 2)
#357 ms ± 131 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

cluster.scale(4)
%timeit dis_4 = dask.compute(output,n_workers = 4)
#265 ms ± 53.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

c.shutdown()
25 changes: 25 additions & 0 deletions content/example/dask_gil_processes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import dask
import time
import numpy as np

def calc_mean(i, n):
data = np.mean(np.random.normal(size = n))
return(data)

n = 100000
output = [dask.delayed(calc_mean)(i, n) for i in range(100)]

%%timeit
with dask.config.set(scheduler='processes',num_workers=1):
mp_1 = dask.compute(output)
#990 ms ± 39.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%%timeit
with dask.config.set(scheduler='processes',num_workers=2):
mp_2 = dask.compute(output)
#881 ms ± 17.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%%timeit
with dask.config.set(scheduler='processes',num_workers=4):
mp_4 = dask.compute(output)
#836 ms ± 10.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
25 changes: 25 additions & 0 deletions content/example/dask_gil_threads.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import dask
import time
import numpy as np

def calc_mean(i, n):
data = np.mean(np.random.normal(size = n))
return(data)

n = 100000
output = [dask.delayed(calc_mean)(i, n) for i in range(100)]

%%timeit
with dask.config.set(scheduler='threads',num_workers=1):
mt_1 = dask.compute(output)
#395 ms ± 18.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%%timeit
with dask.config.set(scheduler='threads',num_workers=2):
mt_2 = dask.compute(output)
#1.28 s ± 1.46 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%%timeit
with dask.config.set(scheduler='threads',num_workers=4):
mt_4 = dask.compute(output)
#1.28 s ± 3.84 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
28 changes: 28 additions & 0 deletions content/example/delay_more.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import time
import dask

def inc(x):
time.sleep(0.5)
return x + 1

def dec(x):
time.sleep(0.3)
return x - 1

def add(x, y):
time.sleep(0.1)
return x + y


data = [1, 2, 3, 4, 5]
output = []
for x in data:
if x % 2:
a = inc(x)
b = dec(x)
c = add(a, b)
else:
c = 10
output.append(c)

total = sum(output)
28 changes: 28 additions & 0 deletions content/example/delay_more_solution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import time
import dask

def inc(x):
time.sleep(0.5)
return x + 1

def dec(x):
time.sleep(0.3)
return x - 1

def add(x, y):
time.sleep(0.1)
return x + y


data = [1, 2, 3, 4, 5]
output = []
for x in data:
if x % 2:
a = dask.delayed(inc)(x)
b = dask.delayed(dec)(x)
c = dask.delayed(add)(a, b)
else:
c = dask.delayed(10)
output.append(c)

total = dask.delayed(sum)(output)
26 changes: 26 additions & 0 deletions content/exercise/apply_dask.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import numpy as np
import dask.dataframe as dd
from scipy.optimize import curve_fit
import time

def powerlaw(x, A, s):
return A * np.power(x, s)

def fit_powerlaw(row):
X = np.arange(row.shape[0]) + 1.0
params, cov = curve_fit(f=powerlaw, xdata=X, ydata=row, p0=[100, -1], bounds=(-np.inf, np.inf))
time.sleep(0.01)
return params[1]

ddf = dd.read_csv("https://raw.githubusercontent.com/ENCCS/hpda-python/main/content/data/results.csv")
ddf4=ddf.repartition(npartitions=4)

# Note the optional argument ``meta`` which is recommended for dask dataframes.
# It should contain an empty ``pandas.DataFrame`` or ``pandas.Series``
# that matches the dtypes and column names of the output,
# or a dict of ``{name: dtype}`` or iterable of ``(name, dtype)``.

results = ddf4.iloc[:,1:].apply(fit_powerlaw, axis=1, meta=(None, "float64"))
%timeit results.compute()
results.visualize()

17 changes: 17 additions & 0 deletions content/exercise/apply_pd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import numpy as np
import pandas as pd
from scipy.optimize import curve_fit
import time

def powerlaw(x, A, s):
return A * np.power(x, s)

def fit_powerlaw(row):
X = np.arange(row.shape[0]) + 1.0
params, cov = curve_fit(f=powerlaw, xdata=X, ydata=row, p0=[100, -1], bounds=(-np.inf, np.inf))
time.sleep(0.01)
return params[1]


df = pd.read_csv("https://raw.githubusercontent.com/ENCCS/hpda-python/main/content/data/results.csv")
%timeit results = df.iloc[:,1:].apply(fit_powerlaw, axis=1)
Binary file added content/img/dag.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Loading