Skip to content

Commit 120eab8

Browse files
authored
Merge pull request #10 from RustedBytes/win-iocp
iocp in windows
2 parents 5df8272 + c828a4e commit 120eab8

11 files changed

Lines changed: 756 additions & 56 deletions

File tree

Cargo.lock

Lines changed: 60 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "rsloop"
3-
version = "0.1.6"
3+
version = "0.1.7"
44
edition = "2021"
55
description = "An event loop for asyncio written in Rust"
66
license = "Apache-2.0"
@@ -33,6 +33,8 @@ signal-hook = "0.4"
3333
socket2 = "0.6"
3434

3535
[target.'cfg(windows)'.dependencies]
36+
tokio = { version = "1", default-features = false, features = ["io-util"] }
37+
vibeio = { version = "0.2.5", default-features = false }
3638
windows-sys = { version = "0.61", features = ["Win32_Foundation", "Win32_Networking_WinSock", "Win32_System_Pipes", "Win32_System_Threading"] }
3739

3840
[build-dependencies]

benchmarks/compare_event_loops.py

Lines changed: 110 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33

44
import argparse
55
import asyncio
6+
import ctypes
7+
from ctypes import wintypes
68
import gc
79
import importlib
810
import json
@@ -34,6 +36,7 @@ class ChildResult:
3436
workload: str
3537
seconds: float
3638
operations: int
39+
peak_rss_bytes: int
3740

3841
@property
3942
def ops_per_sec(self) -> float:
@@ -195,6 +198,65 @@ def is_loop_available(loop_name: str) -> tuple[bool, str | None]:
195198
return True, None
196199

197200

201+
def get_peak_rss_bytes() -> int:
202+
if sys.platform == "win32":
203+
class PROCESS_MEMORY_COUNTERS(ctypes.Structure):
204+
_fields_ = [
205+
("cb", wintypes.DWORD),
206+
("PageFaultCount", wintypes.DWORD),
207+
("PeakWorkingSetSize", ctypes.c_size_t),
208+
("WorkingSetSize", ctypes.c_size_t),
209+
("QuotaPeakPagedPoolUsage", ctypes.c_size_t),
210+
("QuotaPagedPoolUsage", ctypes.c_size_t),
211+
("QuotaPeakNonPagedPoolUsage", ctypes.c_size_t),
212+
("QuotaNonPagedPoolUsage", ctypes.c_size_t),
213+
("PagefileUsage", ctypes.c_size_t),
214+
("PeakPagefileUsage", ctypes.c_size_t),
215+
]
216+
217+
kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
218+
psapi = ctypes.WinDLL("psapi", use_last_error=True)
219+
get_current_process = kernel32.GetCurrentProcess
220+
get_current_process.restype = wintypes.HANDLE
221+
222+
get_process_memory_info = psapi.GetProcessMemoryInfo
223+
get_process_memory_info.argtypes = [
224+
wintypes.HANDLE,
225+
ctypes.POINTER(PROCESS_MEMORY_COUNTERS),
226+
wintypes.DWORD,
227+
]
228+
get_process_memory_info.restype = wintypes.BOOL
229+
230+
counters = PROCESS_MEMORY_COUNTERS()
231+
counters.cb = ctypes.sizeof(PROCESS_MEMORY_COUNTERS)
232+
if not get_process_memory_info(
233+
get_current_process(),
234+
ctypes.byref(counters),
235+
counters.cb,
236+
):
237+
raise ctypes.WinError(ctypes.get_last_error())
238+
return int(counters.PeakWorkingSetSize)
239+
240+
import resource
241+
242+
peak_rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
243+
if sys.platform == "darwin":
244+
return int(peak_rss)
245+
return int(peak_rss * 1024)
246+
247+
248+
def format_bytes(num_bytes: int) -> str:
249+
units = ["B", "KiB", "MiB", "GiB", "TiB"]
250+
value = float(num_bytes)
251+
for unit in units:
252+
if value < 1024.0 or unit == units[-1]:
253+
if unit == "B":
254+
return f"{int(value)} {unit}"
255+
return f"{value:.1f} {unit}"
256+
value /= 1024.0
257+
raise AssertionError("unreachable")
258+
259+
198260
def run_with_loop(loop_name: str, coro: asyncio.coroutines) -> ChildResult:
199261
loop_factory = loop_factory_for(loop_name)
200262
if sys.version_info[:2] >= (3, 12):
@@ -225,7 +287,13 @@ def callback() -> None:
225287
start = time.perf_counter()
226288
loop.call_soon(callback)
227289
await done
228-
return ChildResult(loop_name, "callbacks", time.perf_counter() - start, iterations)
290+
return ChildResult(
291+
loop_name,
292+
"callbacks",
293+
time.perf_counter() - start,
294+
iterations,
295+
peak_rss_bytes=0,
296+
)
229297

230298

231299
async def bench_tasks(loop_name: str, iterations: int, batch_size: int) -> ChildResult:
@@ -238,7 +306,13 @@ async def tiny_task() -> None:
238306
current_batch = min(batch_size, remaining)
239307
await asyncio.gather(*(tiny_task() for _ in range(current_batch)))
240308
remaining -= current_batch
241-
return ChildResult(loop_name, "tasks", time.perf_counter() - start, iterations)
309+
return ChildResult(
310+
loop_name,
311+
"tasks",
312+
time.perf_counter() - start,
313+
iterations,
314+
peak_rss_bytes=0,
315+
)
242316

243317

244318
async def maybe_wait_closed(writer: asyncio.StreamWriter) -> None:
@@ -285,7 +359,13 @@ async def handle_echo(
285359
duration = time.perf_counter() - start
286360
writer.close()
287361
await maybe_wait_closed(writer)
288-
return ChildResult(loop_name, "tcp_streams", duration, roundtrips)
362+
return ChildResult(
363+
loop_name,
364+
"tcp_streams",
365+
duration,
366+
roundtrips,
367+
peak_rss_bytes=0,
368+
)
289369
finally:
290370
server.close()
291371
await server.wait_closed()
@@ -315,6 +395,14 @@ def child_main(args: argparse.Namespace) -> int:
315395
finally:
316396
gc.enable()
317397

398+
result = ChildResult(
399+
loop=result.loop,
400+
workload=result.workload,
401+
seconds=result.seconds,
402+
operations=result.operations,
403+
peak_rss_bytes=get_peak_rss_bytes(),
404+
)
405+
318406
print(
319407
json.dumps(
320408
{
@@ -323,6 +411,7 @@ def child_main(args: argparse.Namespace) -> int:
323411
"seconds": result.seconds,
324412
"operations": result.operations,
325413
"ops_per_sec": result.ops_per_sec,
414+
"peak_rss_bytes": result.peak_rss_bytes,
326415
}
327416
)
328417
)
@@ -387,35 +476,48 @@ def run_child(
387476
workload=payload["workload"],
388477
seconds=payload["seconds"],
389478
operations=payload["operations"],
479+
peak_rss_bytes=payload["peak_rss_bytes"],
390480
)
391481

392482

393483
def print_workload_table(workload: str, runs: dict[str, list[ChildResult]]) -> None:
394-
rows: list[tuple[str, float, float, float, int]] = []
484+
rows: list[tuple[str, float, float, float, int, int]] = []
395485
for loop_name, loop_runs in runs.items():
396486
seconds = [item.seconds for item in loop_runs]
487+
peak_rss_values = [item.peak_rss_bytes for item in loop_runs]
397488
median_seconds = statistics.median(seconds)
489+
median_peak_rss = int(statistics.median(peak_rss_values))
398490
operations = loop_runs[0].operations
399491
ops_per_sec = (
400492
operations / median_seconds if median_seconds > 0 else float("inf")
401493
)
402-
rows.append((loop_name, median_seconds, ops_per_sec, min(seconds), operations))
494+
rows.append(
495+
(
496+
loop_name,
497+
median_seconds,
498+
ops_per_sec,
499+
min(seconds),
500+
operations,
501+
median_peak_rss,
502+
)
503+
)
403504

404505
rows.sort(key=lambda item: item[1])
405506
fastest = rows[0][1]
406507

407508
print()
408509
print(f"{workload} ({rows[0][4]:,} ops)")
409510
print(
410-
f"{'loop':<10} {'median_s':>12} {'best_s':>12} {'ops_per_s':>14} {'vs_fastest':>12}"
511+
f"{'loop':<10} {'median_s':>12} {'best_s':>12} {'ops_per_s':>14} {'peak_rss':>12} {'vs_fastest':>12}"
411512
)
412-
for loop_name, median_seconds, ops_per_sec, best_seconds, _ in rows:
513+
for loop_name, median_seconds, ops_per_sec, best_seconds, _, median_peak_rss in rows:
413514
relative = median_seconds / fastest if fastest > 0 else 1.0
414515
print(
415516
f"{loop_name:<10} "
416517
f"{median_seconds:>12.6f} "
417518
f"{best_seconds:>12.6f} "
418519
f"{ops_per_sec:>14,.0f} "
520+
f"{format_bytes(median_peak_rss):>12} "
419521
f"{relative:>11.2f}x"
420522
)
421523

@@ -489,6 +591,7 @@ def parent_main(args: argparse.Namespace) -> int:
489591
"seconds": item.seconds,
490592
"operations": item.operations,
491593
"ops_per_sec": item.ops_per_sec,
594+
"peak_rss_bytes": item.peak_rss_bytes,
492595
}
493596
for item in measured
494597
],

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "maturin"
44

55
[project]
66
name = "rsloop"
7-
version = "0.1.6"
7+
version = "0.1.7"
88
description = "An event loop for asyncio written in Rust"
99
readme = "README.md"
1010
license = { file = "LICENSE" }

0 commit comments

Comments
 (0)