Skip to content

Commit 4e90a70

Browse files
author
Emma Ai
committed
add benchmark against numpy in multithreading
1 parent 0d21093 commit 4e90a70

File tree

1 file changed

+152
-0
lines changed

1 file changed

+152
-0
lines changed

bench/large_array_vs_numpy.py

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
#################################################################################
2+
# To mimic the scenario that computation is i/o bound and constrained by memory
3+
#
4+
# It's a much simplified version that the chunk is computed in a loop,
5+
# and expression is evaluated in a sequence, which is not true in reality.
6+
# Neverthless, numexpr outperforms numpy.
7+
#################################################################################
8+
"""
9+
Benchmarking Expression 1:
10+
NumPy time (threaded over 32 chunks with 2 threads): 4.612313 seconds
11+
numexpr time (threaded with re_evaluate over 32 chunks with 2 threads): 0.951172 seconds
12+
numexpr speedup: 4.85x
13+
----------------------------------------
14+
Benchmarking Expression 2:
15+
NumPy time (threaded over 32 chunks with 2 threads): 23.862752 seconds
16+
numexpr time (threaded with re_evaluate over 32 chunks with 2 threads): 2.182058 seconds
17+
numexpr speedup: 10.94x
18+
----------------------------------------
19+
Benchmarking Expression 3:
20+
NumPy time (threaded over 32 chunks with 2 threads): 20.594895 seconds
21+
numexpr time (threaded with re_evaluate over 32 chunks with 2 threads): 2.927881 seconds
22+
numexpr speedup: 7.03x
23+
----------------------------------------
24+
Benchmarking Expression 4:
25+
NumPy time (threaded over 32 chunks with 2 threads): 12.834101 seconds
26+
numexpr time (threaded with re_evaluate over 32 chunks with 2 threads): 5.392480 seconds
27+
numexpr speedup: 2.38x
28+
----------------------------------------
29+
"""
30+
31+
import os
32+
33+
os.environ["NUMEXPR_NUM_THREADS"] = "16"
34+
import numpy as np
35+
import numexpr as ne
36+
import timeit
37+
import threading
38+
39+
array_size = 10**8
40+
num_runs = 10
41+
num_chunks = 32 # Number of chunks
42+
num_threads = 2 # Number of threads constrained by how many chunks memory can hold
43+
44+
a = np.random.rand(array_size).reshape(10**4, -1)
45+
b = np.random.rand(array_size).reshape(10**4, -1)
46+
c = np.random.rand(array_size).reshape(10**4, -1)
47+
48+
chunk_size = array_size // num_chunks
49+
50+
expressions_numpy = [
51+
lambda a, b, c: a + b * c,
52+
lambda a, b, c: a**2 + b**2 - 2 * a * b * np.cos(c),
53+
lambda a, b, c: np.sin(a) + np.log(b) * np.sqrt(c),
54+
lambda a, b, c: np.exp(a) + np.tan(b) - np.sinh(c),
55+
]
56+
57+
expressions_numexpr = [
58+
"a + b * c",
59+
"a**2 + b**2 - 2 * a * b * cos(c)",
60+
"sin(a) + log(b) * sqrt(c)",
61+
"exp(a) + tan(b) - sinh(c)",
62+
]
63+
64+
65+
def benchmark_numpy_chunk(func, a, b, c, results, indices):
66+
for index in indices:
67+
start = index * chunk_size
68+
end = (index + 1) * chunk_size
69+
time_taken = timeit.timeit(
70+
lambda: func(a[start:end], b[start:end], c[start:end]), number=num_runs
71+
)
72+
results.append(time_taken)
73+
74+
75+
def benchmark_numexpr_re_evaluate(expr, a, b, c, results, indices):
76+
for index in indices:
77+
start = index * chunk_size
78+
end = (index + 1) * chunk_size
79+
if index == 0:
80+
# Evaluate the first chunk with evaluate
81+
time_taken = timeit.timeit(
82+
lambda: ne.evaluate(
83+
expr,
84+
local_dict={
85+
"a": a[start:end],
86+
"b": b[start:end],
87+
"c": c[start:end],
88+
},
89+
),
90+
number=num_runs,
91+
)
92+
else:
93+
# Re-evaluate subsequent chunks with re_evaluate
94+
time_taken = timeit.timeit(
95+
lambda: ne.re_evaluate(
96+
local_dict={"a": a[start:end], "b": b[start:end], "c": c[start:end]}
97+
),
98+
number=num_runs,
99+
)
100+
results.append(time_taken)
101+
102+
103+
def run_benchmark_threaded():
104+
chunk_indices = list(range(num_chunks))
105+
106+
for i in range(len(expressions_numpy)):
107+
print(f"Benchmarking Expression {i+1}:")
108+
109+
results_numpy = []
110+
results_numexpr = []
111+
112+
threads_numpy = []
113+
for j in range(num_threads):
114+
indices = chunk_indices[j::num_threads] # Distribute chunks across threads
115+
thread = threading.Thread(
116+
target=benchmark_numpy_chunk,
117+
args=(expressions_numpy[i], a, b, c, results_numpy, indices),
118+
)
119+
threads_numpy.append(thread)
120+
thread.start()
121+
122+
for thread in threads_numpy:
123+
thread.join()
124+
125+
numpy_time = sum(results_numpy)
126+
print(
127+
f"NumPy time (threaded over {num_chunks} chunks with {num_threads} threads): {numpy_time:.6f} seconds"
128+
)
129+
130+
threads_numexpr = []
131+
for j in range(num_threads):
132+
indices = chunk_indices[j::num_threads] # Distribute chunks across threads
133+
thread = threading.Thread(
134+
target=benchmark_numexpr_re_evaluate,
135+
args=(expressions_numexpr[i], a, b, c, results_numexpr, indices),
136+
)
137+
threads_numexpr.append(thread)
138+
thread.start()
139+
140+
for thread in threads_numexpr:
141+
thread.join()
142+
143+
numexpr_time = sum(results_numexpr)
144+
print(
145+
f"numexpr time (threaded with re_evaluate over {num_chunks} chunks with {num_threads} threads): {numexpr_time:.6f} seconds"
146+
)
147+
print(f"numexpr speedup: {numpy_time / numexpr_time:.2f}x")
148+
print("-" * 40)
149+
150+
151+
if __name__ == "__main__":
152+
run_benchmark_threaded()

0 commit comments

Comments
 (0)