From 100d10d64a8d8c1799b9bb703ff9b6f91db41f64 Mon Sep 17 00:00:00 2001 From: kaushal Date: Tue, 12 Nov 2024 14:26:43 +0530 Subject: [PATCH 1/6] fastload --- src/scanpy/readwrite.py | 121 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 117 insertions(+), 4 deletions(-) diff --git a/src/scanpy/readwrite.py b/src/scanpy/readwrite.py index 3c958a1e50..2d3c535f9c 100644 --- a/src/scanpy/readwrite.py +++ b/src/scanpy/readwrite.py @@ -5,7 +5,7 @@ import json from pathlib import Path, PurePath from typing import TYPE_CHECKING - +import anndata import anndata.utils import h5py import numpy as np @@ -33,6 +33,10 @@ read_text, ) from anndata import AnnData +import multiprocessing as mp +import time +import numba +import scipy from matplotlib.image import imread from . import logging as logg @@ -45,6 +49,12 @@ from ._utils import Empty +indices_type = np.int64 +indices_shm_type = "l" + +semDataLoaded = None # will be initialized later +semDataCopied = None # will be initialized later + # .gz and .bz2 suffixes are also allowed for text formats text_exts = { "csv", @@ -65,6 +75,109 @@ } | text_exts """Available file formats for reading data. """ +def _load_helper(fname, i, k, datalen, dataArray, indicesArray, startsArray, endsArray): + f = h5py.File(fname,'r') + dataA = np.frombuffer(dataArray,dtype=np.float32) + indicesA = np.frombuffer(indicesArray,dtype=indices_type) + startsA = np.frombuffer(startsArray,dtype=np.int64) + endsA = np.frombuffer(endsArray,dtype=np.int64) + for j in range(datalen//(k*1024*1024)+1): + # compute start, end + s = i*datalen//k + j*1024*1024 + e = min(s+1024*1024, (i+1)*datalen//k) + length = e-s + startsA[i]=s + endsA[i]=e + # read direct + f['X']['data'].read_direct(dataA, np.s_[s:e], np.s_[i*1024*1024:i*1024*1024+length]) + f['X']['indices'].read_direct(indicesA, np.s_[s:e], np.s_[i*1024*1024:i*1024*1024+length]) + + # coordinate with copy threads + semDataLoaded[i].release() # done data load + semDataCopied[i].acquire() # wait until data copied + +def _waitload(i): + semDataLoaded[i].acquire() + +def _signalcopy(i): + semDataCopied[i].release() + +@numba.njit(parallel=True) +def _fast_copy(data,dataA,indices,indicesA,starts,ends,k,m): + for i in numba.prange(k): + for _ in range(m): + with numba.objmode(): + _waitload(i) + length = ends[i]-starts[i] + data[starts[i]:ends[i]] = dataA[i*1024*1024:i*1024*1024+length] + indices[starts[i]:ends[i]] = indicesA[i*1024*1024:i*1024*1024+length] + with numba.objmode(): + _signalcopy(i) + +def fastload(fname): #, firstn=1): + t0 = time.time() + f = h5py.File(fname,'r') + assert ('X' in f.keys() and 'var' in f.keys() and 'obs' in f.keys()) + + # get obs dataframe + rows = f['obs'][ list(f['obs'].keys())[0] ].size + if '_index' in f['obs'].keys(): + dfobsind = pd.Series(f['obs']['_index'].asstr()[0:rows]) + dfobs = pd.DataFrame(index=dfobsind) + else: + dfobs = pd.DataFrame() + for k in f['obs'].keys(): + if k=='_index': continue + dfobs[k] = f['obs'][k].asstr()[...] + + # get var dataframe + if '_index' in f['var'].keys(): + dfvarind = pd.Series(f['var']['_index'].asstr()[...]) + dfvar = pd.DataFrame(index=dfvarind) + else: + dfvar = pd.DataFrame() + for k in f['var'].keys(): + if k=='_index': continue + dfvar[k] = f['var'][k].asstr()[...] + + # load index pointers, prepare shared arrays + indptr = f['X']['indptr'][0:rows+1] + datalen = int(indptr[-1]) + + f.close() + k = numba.get_num_threads() + dataArray = mp.Array('f',k*1024*1024,lock=False) # should be in shared memory + indicesArray = mp.Array(indices_shm_type,k*1024*1024,lock=False) # should be in shared memory + startsArray = mp.Array('l',k,lock=False) # start index of data read + endsArray = mp.Array('l',k,lock=False) # end index (noninclusive) of data read + global semDataLoaded + global semDataCopied + semDataLoaded = [mp.Semaphore(0) for _ in range(k)] + semDataCopied = [mp.Semaphore(0) for _ in range(k)] + dataA = np.frombuffer(dataArray,dtype=np.float32) + indicesA = np.frombuffer(indicesArray,dtype=indices_type) + startsA = np.frombuffer(startsArray, dtype=np.int64) + endsA = np.frombuffer(endsArray, dtype=np.int64) + data = np.empty(datalen, dtype=np.float32) + indices = np.empty(datalen, dtype=indices_type) + + procs = [mp.Process(target=_load_helper, args=(fname, i, k, datalen, dataArray, indicesArray, startsArray, endsArray)) for i in range(k)] + for p in procs: p.start() + + _fast_copy(data,dataA,indices,indicesA,startsA,endsA,k,datalen//(k*1024*1024)+1) + + for p in procs: p.join() + + X = scipy.sparse.csr_matrix((0,0)) + X.data = data + X.indices = indices + X.indptr = indptr + X._shape = ((rows, dfvar.shape[0])) + + # create AnnData + adata = anndata.AnnData(X, dfobs, dfvar) + return adata + # -------------------------------------------------------------------------------- # Reading and Writing data files and AnnData objects @@ -162,7 +275,7 @@ def read( f"ending on one of the available extensions {avail_exts} " "or pass the parameter `ext`." ) - return read_h5ad(filename, backed=backed) + return fastload(filename) @old_positionals("genome", "gex_only", "backup_url") @@ -774,7 +887,7 @@ def _read( # read hdf5 files if ext in {"h5", "h5ad"}: if sheet is None: - return read_h5ad(filename, backed=backed) + return fastload(filename) else: logg.debug(f"reading sheet {sheet} from file {filename}") return read_hdf(filename, sheet) @@ -786,7 +899,7 @@ def _read( path_cache = path_cache.with_suffix("") if cache and path_cache.is_file(): logg.info(f"... reading from cache file {path_cache}") - return read_h5ad(path_cache) + return fastload(path_cache) if not is_present: raise FileNotFoundError(f"Did not find file {filename}.") From f89af6a2de63cb5be87ca1602444901d0640424d Mon Sep 17 00:00:00 2001 From: kaushal Date: Tue, 19 Nov 2024 23:35:13 +0530 Subject: [PATCH 2/6] fastload or read_h5ad based on datalen --- src/scanpy/readwrite.py | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/src/scanpy/readwrite.py b/src/scanpy/readwrite.py index 2d3c535f9c..e4be12780a 100644 --- a/src/scanpy/readwrite.py +++ b/src/scanpy/readwrite.py @@ -114,13 +114,22 @@ def _fast_copy(data,dataA,indices,indicesA,starts,ends,k,m): with numba.objmode(): _signalcopy(i) -def fastload(fname): #, firstn=1): +def fastload(fname, backed): #, firstn=1): t0 = time.time() - f = h5py.File(fname,'r') + f = h5py.File(fname,backed) assert ('X' in f.keys() and 'var' in f.keys() and 'obs' in f.keys()) # get obs dataframe rows = f['obs'][ list(f['obs'].keys())[0] ].size + # load index pointers, prepare shared arrays + indptr = f['X']['indptr'][0:rows+1] + datalen = int(indptr[-1]) + + + print(f"datalen {datalen} {1024*1024}") + if datalen<1024*1024: + f.close() + return read_h5ad(fname, backed=backed) if '_index' in f['obs'].keys(): dfobsind = pd.Series(f['obs']['_index'].asstr()[0:rows]) dfobs = pd.DataFrame(index=dfobsind) @@ -140,10 +149,6 @@ def fastload(fname): #, firstn=1): if k=='_index': continue dfvar[k] = f['var'][k].asstr()[...] - # load index pointers, prepare shared arrays - indptr = f['X']['indptr'][0:rows+1] - datalen = int(indptr[-1]) - f.close() k = numba.get_num_threads() dataArray = mp.Array('f',k*1024*1024,lock=False) # should be in shared memory @@ -275,7 +280,7 @@ def read( f"ending on one of the available extensions {avail_exts} " "or pass the parameter `ext`." ) - return fastload(filename) + return fastload(filename, backed) @old_positionals("genome", "gex_only", "backup_url") @@ -887,7 +892,7 @@ def _read( # read hdf5 files if ext in {"h5", "h5ad"}: if sheet is None: - return fastload(filename) + return fastload(filename, backed) else: logg.debug(f"reading sheet {sheet} from file {filename}") return read_hdf(filename, sheet) @@ -899,7 +904,7 @@ def _read( path_cache = path_cache.with_suffix("") if cache and path_cache.is_file(): logg.info(f"... reading from cache file {path_cache}") - return fastload(path_cache) + return fastload(path_cache, backed) if not is_present: raise FileNotFoundError(f"Did not find file {filename}.") From 3d2f06f70d5471314fe2848b0905d813cb2f6ff3 Mon Sep 17 00:00:00 2001 From: kaushal Date: Wed, 20 Nov 2024 08:39:36 +0530 Subject: [PATCH 3/6] default backed value --- src/scanpy/readwrite.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/scanpy/readwrite.py b/src/scanpy/readwrite.py index e4be12780a..85a19214ae 100644 --- a/src/scanpy/readwrite.py +++ b/src/scanpy/readwrite.py @@ -200,7 +200,7 @@ def fastload(fname, backed): #, firstn=1): ) def read( filename: Path | str, - backed: Literal["r", "r+"] | None = None, + backed: Literal["r", "r+"] | None = 'r+', *, sheet: str | None = None, ext: str | None = None, From 49ae1d3d91c6dd83d046c4212143e8e3efe32e04 Mon Sep 17 00:00:00 2001 From: kaushal Date: Tue, 3 Dec 2024 13:51:38 +0530 Subject: [PATCH 4/6] thread_workload --- src/scanpy/readwrite.py | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/src/scanpy/readwrite.py b/src/scanpy/readwrite.py index 85a19214ae..5e9242106a 100644 --- a/src/scanpy/readwrite.py +++ b/src/scanpy/readwrite.py @@ -55,6 +55,8 @@ semDataLoaded = None # will be initialized later semDataCopied = None # will be initialized later +thread_workload = 4000000 # experimented value + # .gz and .bz2 suffixes are also allowed for text formats text_exts = { "csv", @@ -81,16 +83,16 @@ def _load_helper(fname, i, k, datalen, dataArray, indicesArray, startsArray, end indicesA = np.frombuffer(indicesArray,dtype=indices_type) startsA = np.frombuffer(startsArray,dtype=np.int64) endsA = np.frombuffer(endsArray,dtype=np.int64) - for j in range(datalen//(k*1024*1024)+1): + for j in range(datalen//(k*thread_workload)+1): # compute start, end - s = i*datalen//k + j*1024*1024 - e = min(s+1024*1024, (i+1)*datalen//k) + s = i*datalen//k + j*thread_workload + e = min(s+thread_workload, (i+1)*datalen//k) length = e-s startsA[i]=s endsA[i]=e # read direct - f['X']['data'].read_direct(dataA, np.s_[s:e], np.s_[i*1024*1024:i*1024*1024+length]) - f['X']['indices'].read_direct(indicesA, np.s_[s:e], np.s_[i*1024*1024:i*1024*1024+length]) + f['X']['data'].read_direct(dataA, np.s_[s:e], np.s_[i*thread_workload:i*thread_workload+length]) + f['X']['indices'].read_direct(indicesA, np.s_[s:e], np.s_[i*thread_workload:i*thread_workload+length]) # coordinate with copy threads semDataLoaded[i].release() # done data load @@ -109,8 +111,8 @@ def _fast_copy(data,dataA,indices,indicesA,starts,ends,k,m): with numba.objmode(): _waitload(i) length = ends[i]-starts[i] - data[starts[i]:ends[i]] = dataA[i*1024*1024:i*1024*1024+length] - indices[starts[i]:ends[i]] = indicesA[i*1024*1024:i*1024*1024+length] + data[starts[i]:ends[i]] = dataA[i*thread_workload:i*thread_workload+length] + indices[starts[i]:ends[i]] = indicesA[i*thread_workload:i*thread_workload+length] with numba.objmode(): _signalcopy(i) @@ -125,9 +127,7 @@ def fastload(fname, backed): #, firstn=1): indptr = f['X']['indptr'][0:rows+1] datalen = int(indptr[-1]) - - print(f"datalen {datalen} {1024*1024}") - if datalen<1024*1024: + if datalen Date: Mon, 23 Dec 2024 04:11:42 +0000 Subject: [PATCH 5/6] parallel read with threads --- src/scanpy/readwrite.py | 229 ++++++++++++++++++++++++++++++---------- 1 file changed, 171 insertions(+), 58 deletions(-) diff --git a/src/scanpy/readwrite.py b/src/scanpy/readwrite.py index 5e9242106a..7c4904cf10 100644 --- a/src/scanpy/readwrite.py +++ b/src/scanpy/readwrite.py @@ -5,6 +5,7 @@ import json from pathlib import Path, PurePath from typing import TYPE_CHECKING + import anndata import anndata.utils import h5py @@ -32,11 +33,13 @@ read_mtx, read_text, ) -from anndata import AnnData import multiprocessing as mp -import time +import threading +from dataclasses import dataclass + import numba import scipy +from anndata import AnnData from matplotlib.image import imread from . import logging as logg @@ -55,7 +58,7 @@ semDataLoaded = None # will be initialized later semDataCopied = None # will be initialized later -thread_workload = 4000000 # experimented value +thread_workload = 4000000 # experimented value # .gz and .bz2 suffixes are also allowed for text formats text_exts = { @@ -77,111 +80,219 @@ } | text_exts """Available file formats for reading data. """ -def _load_helper(fname, i, k, datalen, dataArray, indicesArray, startsArray, endsArray): - f = h5py.File(fname,'r') - dataA = np.frombuffer(dataArray,dtype=np.float32) - indicesA = np.frombuffer(indicesArray,dtype=indices_type) - startsA = np.frombuffer(startsArray,dtype=np.int64) - endsA = np.frombuffer(endsArray,dtype=np.int64) - for j in range(datalen//(k*thread_workload)+1): + +def get_1d_index(row: int, col: int, num_cols: int) -> int: + """ + Convert 2D coordinates to 1D index. + + Parameters: + row (int): Row index in the 2D array. + col (int): Column index in the 2D array. + num_cols (int): Number of columns in the 2D array. + + Returns: + int: Corresponding 1D index. + """ + return row * num_cols + col + + +@dataclass +class LoadHelperData: + i: int + k: int + datalen: int + dataArray: mp.Array + indicesArray: mp.Array + startsArray: mp.Array + endsArray: mp.Array + + +def _load_helper(fname: str, helper_data: LoadHelperData): + i = helper_data.i + k = helper_data.k + datalen = helper_data.datalen + dataArray = helper_data.dataArray + indicesArray = helper_data.indicesArray + startsArray = helper_data.startsArray + endsArray = helper_data.endsArray + + f = h5py.File(fname, "r") + dataA = np.frombuffer(dataArray, dtype=np.float32) + indicesA = np.frombuffer(indicesArray, dtype=indices_type) + startsA = np.frombuffer(startsArray, dtype=np.int64) + endsA = np.frombuffer(endsArray, dtype=np.int64) + for j in range(datalen // (k * thread_workload) + 1): # compute start, end - s = i*datalen//k + j*thread_workload - e = min(s+thread_workload, (i+1)*datalen//k) - length = e-s - startsA[i]=s - endsA[i]=e + s = i * datalen // k + j * thread_workload + e = min(s + thread_workload, (i + 1) * datalen // k) + length = e - s + startsA[i] = s + endsA[i] = e # read direct - f['X']['data'].read_direct(dataA, np.s_[s:e], np.s_[i*thread_workload:i*thread_workload+length]) - f['X']['indices'].read_direct(indicesA, np.s_[s:e], np.s_[i*thread_workload:i*thread_workload+length]) - + f["X"]["data"].read_direct( + dataA, np.s_[s:e], np.s_[i * thread_workload : i * thread_workload + length] + ) + f["X"]["indices"].read_direct( + indicesA, + np.s_[s:e], + np.s_[i * thread_workload : i * thread_workload + length], + ) + # coordinate with copy threads semDataLoaded[i].release() # done data load semDataCopied[i].acquire() # wait until data copied + def _waitload(i): semDataLoaded[i].acquire() + def _signalcopy(i): semDataCopied[i].release() -@numba.njit(parallel=True) -def _fast_copy(data,dataA,indices,indicesA,starts,ends,k,m): - for i in numba.prange(k): - for _ in range(m): + +@dataclass +class CopyData: + data: np.ndarray + dataA: np.ndarray + indices: np.ndarray + indicesA: np.ndarray + startsA: np.ndarray + endsA: np.ndarray + + +def _fast_copy(copy_data: CopyData, k: int, m: int): + # Access the arrays through copy_data + data = copy_data.data + dataA = copy_data.dataA + indices = copy_data.indices + indicesA = copy_data.indicesA + starts = copy_data.startsA + ends = copy_data.endsA + + def thread_fun(i, m): + for j in range(m): with numba.objmode(): _waitload(i) - length = ends[i]-starts[i] - data[starts[i]:ends[i]] = dataA[i*thread_workload:i*thread_workload+length] - indices[starts[i]:ends[i]] = indicesA[i*thread_workload:i*thread_workload+length] + length = ends[i] - starts[i] + data[starts[i] : ends[i]] = dataA[ + i * thread_workload : i * thread_workload + length + ] + indices[starts[i] : ends[i]] = indicesA[ + i * thread_workload : i * thread_workload + length + ] with numba.objmode(): _signalcopy(i) -def fastload(fname, backed): #, firstn=1): - t0 = time.time() - f = h5py.File(fname,backed) - assert ('X' in f.keys() and 'var' in f.keys() and 'obs' in f.keys()) + threads = [threading.Thread(target=thread_fun, args=(i, m)) for i in range(k)] + for thread in threads: + thread.start() + for thread in threads: + thread.join() + + +def fastload(fname, backed): + f = h5py.File(fname, backed) + assert "X" in f, "'X' is missing from f" + assert "var" in f, "'var' is missing from f" + assert "obs" in f, "'obs' is missing from f" # get obs dataframe - rows = f['obs'][ list(f['obs'].keys())[0] ].size + rows = f["obs"][list(f["obs"].keys())[0]].size # load index pointers, prepare shared arrays - indptr = f['X']['indptr'][0:rows+1] + indptr = f["X"]["indptr"][0 : rows + 1] datalen = int(indptr[-1]) - if datalen Date: Thu, 16 Jan 2025 03:51:50 +0000 Subject: [PATCH 6/6] check issparse --- src/scanpy/readwrite.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/scanpy/readwrite.py b/src/scanpy/readwrite.py index 48ae437845..5a4aaf6f28 100644 --- a/src/scanpy/readwrite.py +++ b/src/scanpy/readwrite.py @@ -197,6 +197,10 @@ def fastload(fname, backed): assert "var" in f, "'var' is missing from f" assert "obs" in f, "'obs' is missing from f" + if not scipy.sparse.issparse(f["X"]): + f.close() + return read_h5ad(fname, backed=backed) + # get obs dataframe rows = f["obs"][list(f["obs"].keys())[0]].size # load index pointers, prepare shared arrays