-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathCombine.py
More file actions
193 lines (157 loc) · 6 KB
/
Combine.py
File metadata and controls
193 lines (157 loc) · 6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
from typing import Callable, Tuple, Sequence, Any
import numpy
import copy
from .DelayedOp import DelayedOp
from ._mask import _concatenate_unmasked_ndarrays, _concatenate_maybe_masked_ndarrays
from .extract_dense_array import extract_dense_array
from .SparseNdarray import _concatenate_SparseNdarrays
from .extract_sparse_array import extract_sparse_array
from .create_dask_array import create_dask_array
from .chunk_grid import chunk_grid
from .Grid import CompositeGrid
from .is_sparse import is_sparse
from .is_masked import is_masked
__author__ = "ltla"
__copyright__ = "ltla"
__license__ = "MIT"
class Combine(DelayedOp):
"""Delayed combine operation, based on Bioconductor's ``DelayedArray::DelayedAbind`` class.
This will combine multiple arrays along a specified dimension, provided the extents of all other dimensions are
the same.
This class is intended for developers to construct new :py:class:`~delayedarray.DelayedArray.DelayedArray`
instances. In general, end users should not be interacting with ``Combine`` objects directly.
"""
def __init__(self, seeds: list, along: int):
"""
Args:
seeds:
List of objects that satisfy the seed contract,
see :py:class:`~delayedarray.DelayedArray.DelayedArray` for details.
along:
Dimension along which the seeds are to be combined.
"""
self._seeds = seeds
if len(seeds) == 0:
raise ValueError("expected at least one object in 'seeds'")
shape = list(seeds[0].shape)
ndim = len(shape)
for i in range(1, len(seeds)):
curshape = seeds[i].shape
for d in range(ndim):
if d == along:
shape[d] += curshape[d]
elif shape[d] != curshape[d]:
raise ValueError(
"expected seeds to have the same extent for non-'along' dimensions"
)
self._shape = (*shape,)
self._along = along
# Guessing the dtype.
to_combine = []
for i in range(len(seeds)):
to_combine.append(numpy.ndarray((0,), dtype=seeds[i].dtype))
self._dtype = _concatenate_unmasked_ndarrays(to_combine, axis=0).dtype
self._is_masked = any(is_masked(y) for y in self._seeds)
@property
def shape(self) -> Tuple[int, ...]:
"""
Returns:
Tuple of integers specifying the extent of each dimension of the
object after seeds were combined along the specified dimension.
"""
return self._shape
@property
def dtype(self) -> numpy.dtype:
"""
Returns:
NumPy type for the combined data. This may or may not be
the same as those in ``seeds``, depending on casting rules.
"""
return self._dtype
@property
def seeds(self) -> list:
"""
Returns:
List of seed objects to be combined.
"""
return self._seeds
@property
def along(self) -> int:
"""
Returns:
Dimension along which the seeds are combined.
"""
return self._along
def _simplify_combine(x: Combine) -> Any:
if len(x.seeds) == 1:
return x.seeds[0]
all_seeds = []
simplified = False
for ss in x.seeds:
if type(ss) is Combine and x.along == ss.along:
# Don't use isinstance, we don't want to collapse for Combine
# subclasses that might be doing god knows what.
all_seeds += ss.seeds
simplified = True
else:
all_seeds.append(ss)
if not simplified:
return x
new_x = copy.copy(x)
new_x._seeds = all_seeds
return new_x
def _extract_subarrays(x: Combine, subset: Tuple[Sequence[int], ...], f: Callable):
# Figuring out which slices belong to who.
chosen = subset[x._along]
limit = 0
fragmented = []
position = 0
for s in x._seeds:
start = limit
limit += s.shape[x._along]
current = []
while position < len(chosen) and chosen[position] < limit:
current.append(chosen[position] - start)
position += 1
fragmented.append(current)
# Extracting the desired slice from each seed.
extracted = []
flexargs = list(subset)
for i, s in enumerate(x._seeds):
if len(fragmented[i]):
flexargs[x._along] = fragmented[i]
extracted.append(f(s, (*flexargs,)))
return extracted
@extract_dense_array.register
def extract_dense_array_Combine(x: Combine, subset: Tuple[Sequence[int], ...]):
"""See :py:meth:`~delayedarray.extract_dense_array.extract_dense_array`."""
fragments = _extract_subarrays(x, subset, extract_dense_array)
return _concatenate_maybe_masked_ndarrays(fragments, axis=x._along, masked=x._is_masked)
@extract_sparse_array.register
def extract_sparse_array_Combine(x: Combine, subset: Tuple[Sequence[int], ...]):
"""See :py:meth:`~delayedarray.extract_sparse_array.extract_sparse_array`."""
fragments = _extract_subarrays(x, subset, extract_sparse_array)
return _concatenate_SparseNdarrays(fragments, along=x._along)
@create_dask_array.register
def create_dask_array_Combine(x: Combine):
"""See :py:meth:`~delayedarray.create_dask_array.create_dask_array`."""
extracted = []
for s in x._seeds:
extracted.append(create_dask_array(s))
return numpy.concatenate((*extracted,), axis=x._along)
@chunk_grid.register
def chunk_grid_Combine(x: Combine):
"""See :py:meth:`~delayedarray.chunk_grid.chunk_grid`."""
chunks = [chunk_grid(s) for s in x._seeds]
return CompositeGrid(chunks, x._along)
@is_sparse.register
def is_sparse_Combine(x: Combine):
"""See :py:meth:`~delayedarray.is_sparse.is_sparse`."""
for s in x._seeds:
if not is_sparse(s):
return False
return len(x._seeds) > 0
@is_masked.register
def is_masked_Combine(x: Combine):
"""See :py:meth:`~delayedarray.is_masked.is_masked`."""
return x._is_masked