Source code for nums.core.array.selection

# Copyright (C) 2020 NumS Development Team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from typing import List, Tuple, Union

import numpy as np

import nums.core.array.utils as array_utils


[docs]def get_array_order(array, axis=0): # An axis is ordered if all diffs have same sign, and no diffs of 0 (no repetitions). if array.shape[axis] == 1: if array[axis] >= 0: return 1 else: return -1 diffs = np.diff(array, axis=axis) if np.all(diffs > 0): return 1 elif np.all(diffs < 0): return -1 else: # Order is undefined. return 0
[docs]def is_advanced_selection(subscript: tuple): assert isinstance(subscript, tuple) num_ordered_arrays = 0 num_indexes = 0 for obj in subscript: if array_utils.is_array_like(obj): if isinstance(obj, np.ndarray): array = obj else: # TODO (hme): This is inefficient. array = np.array(obj, dtype=int) if len(array.shape) > 1: return True elif get_array_order(array, axis=0) == 0: return True num_ordered_arrays += 1 elif array_utils.is_index_subscript(obj): num_indexes += 1 if num_ordered_arrays > 1: return True if num_ordered_arrays == 1: # In this case, indexes is considered an advanced index. return num_indexes > 0 # We can compute ordered arrays fast, so don't consider ordered arrays as advanced. return False
[docs]def pos_step_slice_to_range(n, start_bound, stop_bound): if n < 0: n += stop_bound - start_bound n = np.clip(n, start_bound, stop_bound) return n
[docs]def neg_step_slice_to_range(n, start_bound, stop_bound): # Any start, stop above (start_bound - stop_bound) # is outside the bounds of the slice parameters # for backward steps. if 0 <= n: n -= start_bound - stop_bound # Similar to positive steps, # any start, stop below the stop bounds is clipped for negative steps. n = np.clip(n, stop_bound, start_bound) return n
[docs]def trim_slice_bounds(s: slice, size: int): start, stop, step = s.start, s.stop, s.step if step is None or step > 0: start_bound = 0 stop_bound = size if start is None: start = start_bound if stop is None: stop = stop_bound # For positive steps, clip between -size and size, and compute size + n if n is < 0. # For positive slice parameters, Python slice semantics always clip stop by size, # so it makes sense to do the same in the negative direction. # We end up with start, stop >= 0 after adding size to n after clipping. # This matches NumPy's slice semantics for positive steps because # we simply select a sequence of elements in the forward direction. start = pos_step_slice_to_range(start, start_bound, stop_bound) stop = pos_step_slice_to_range(stop, start_bound, stop_bound) elif step < 0: # For negative steps, the start bound is -1, # and the stop bound is -size-1. The stop bound is offset by -1 # to match a start bound of -1, so that the maximum number of items # touched is at most size == start_bound - stop_bound == -1 - (-size - 1). start_bound = -1 stop_bound = -size - 1 if start is None: start = start_bound if stop is None: stop = stop_bound start = neg_step_slice_to_range(start, start_bound, stop_bound) stop = neg_step_slice_to_range(stop, start_bound, stop_bound) else: raise ValueError("A step of 0 is invalid.") return slice(start, stop, step)
[docs]def slice_to_range(s: slice, size: int): st = trim_slice_bounds(s, size) return np.arange(st.start, st.stop, st.step)
[docs]class AxisSelection:
[docs] def is_empty(self): raise NotImplementedError()
[docs] def selector(self): raise NotImplementedError()
[docs] def shape(self): raise NotImplementedError()
[docs] def order(self): raise NotImplementedError()
[docs]class AxisSlice(AxisSelection):
[docs] @classmethod def from_size(cls, size: int, s: Union[slice, None] = None): if s is None: s = slice(None, None, None) st = trim_slice_bounds(s, size) return cls(st.start, st.stop, st.step)
def __init__( self, start: Union[int, None], stop: Union[int, None], step: Union[int, None] ): self.start = start self.stop = stop self.step = step
[docs] def is_empty(self): if self.order() > 0: return self.stop <= self.start else: return self.start <= self.stop
[docs] def selector(self): return self.to_slice()
[docs] def to_slice(self): return slice(self.start, self.stop, self.step)
[docs] def shape(self): if self.step is None or self.step > 0: step_mag = 1 if self.step is None else abs(self.step) interval = max(0, self.stop - self.start) else: step_mag = abs(self.step) interval = max(0, self.start - self.stop) if interval == 0: return (0,) if step_mag == 1: return (interval,) if step_mag > interval: return (1,) # TODO (hme): Make this faster by not instantiating a range. return slice_to_range(slice(0, interval, step_mag), interval).shape
[docs] def order(self): return np.sign(1 if self.step is None else self.step)
[docs]class AxisIndex(AxisSelection):
[docs] @classmethod def from_index(cls, index: int, size: int): if 0 <= index < size: pass elif -size <= index < 0: index = pos_step_slice_to_range(index, 0, size) else: raise IndexError("Index %d is out of bounds for size %d" % (index, size)) return cls(index)
def __init__(self, index: int): self.index = index
[docs] def to_index(self): return self.index
[docs] def selector(self): return self.to_index()
[docs] def is_empty(self): # Index can never be empty. return False
[docs] def shape(self): return ()
[docs] def order(self): return np.sign(self.index)
[docs]class AxisArray(AxisSelection): # See https://numpy.org/doc/stable/reference/arrays.indexing.html#integer-array-indexing def __init__(self, array_like): if isinstance(array_like, np.ndarray): assert array_like.dtype.kind == "i" or array_like.dtype.kind == "b" arr: np.ndarray = array_like else: # Must infer type. is_bool = True for entry in array_like: if not ( array_utils.is_index_subscript(entry) or array_utils.is_bool(entry) ): raise Exception("Only integer or boolean arrays are valid indices.") if array_utils.is_index_subscript(entry): is_bool = False arr: np.ndarray = np.array(array_like, dtype=(bool if is_bool else np.intp)) self.array = arr
[docs] def is_empty(self): return len(self.array) == 0
[docs] def selector(self): return self.array
[docs] def shape(self): return self.array.shape
[docs] def order(self): return get_array_order(self.array)
[docs]class AxisEmpty(AxisSelection):
[docs] def selector(self): return slice(0, 0, None)
[docs] def is_empty(self): return True
[docs] def shape(self): return (0,)
[docs] def order(self): return 0
[docs]class BasicSelection:
[docs] @classmethod def from_shape(cls, shape: Tuple): return cls.from_subscript(shape, (...,))
[docs] @classmethod def block_selection( cls, shape: Tuple, block_shape: Tuple, order=None, block_order=None ): if order is None: order = tuple([1] * len(shape)) grid = array_utils.OrderedGrid( shape=shape, block_shape=block_shape, order=order, block_order=block_order ) selection_grid = np.empty(grid.grid_shape, dtype=cls) for index in grid.index_iterator(): slices: Tuple[slice] = tuple(grid.slices[index]) # This must be shape, since slices are bounded by shape. selection_grid[index] = cls.from_subscript(shape, slices) return selection_grid
[docs] @classmethod def from_subscript(cls, shape: Tuple, subscript: Tuple): axis_sels: List[AxisSelection] = [] contains_ellipsis = False contains_array = False i = 0 for axis_ss in subscript: if axis_ss is Ellipsis: if contains_ellipsis: raise ValueError("Subscripts may contain at most 1 ellipsis.") contains_ellipsis = True ellipsis_length = len(shape) - (len(subscript) - 1) for j in list(range(i, i + ellipsis_length)): # We instantiate a list, so okay to modify i. axis_sels.append(AxisSlice.from_size(shape[j])) i += 1 elif array_utils.is_index_subscript(axis_ss): axis_sels.append(AxisIndex.from_index(axis_ss, shape[i])) i += 1 elif isinstance(axis_ss, slice): axis_sels.append(AxisSlice.from_size(shape[i], axis_ss)) i += 1 elif isinstance(axis_ss, (list, tuple, np.ndarray)): if contains_array: raise ValueError( "%s may contain at most 1 array." % BasicSelection.__name__ ) contains_array = True axis_array = AxisArray(axis_ss) if len(axis_array.array.shape) > 1: raise ValueError( "%s does not support multi-axis" " arrays." % BasicSelection.__name__ ) if axis_array.array.dtype.kind == "i" and np.any( axis_array.array >= shape[i] ): raise ValueError("Some indices exceed dimension of axis.") if get_array_order(axis_array.array, axis=0) == 0: # The sequence must be monotonic with strict order. raise ValueError( "Input array cannot be interpreted as ordered set." ) axis_sels.append(axis_array) i += 1 else: raise Exception( "Unknown subscript type %s for axis %d." % (type(axis_ss), i) ) while len(axis_sels) < len(shape): size = shape[len(axis_sels)] # Select rest of axes. axis_sels.append(AxisSlice.from_size(size)) assert len(axis_sels) <= len( shape ), "More objects in selection tuple than axes in array." return BasicSelection(tuple(axis_sels), shape)
def __init__(self, axes: Tuple[AxisSelection], shape: tuple): self.axes = axes self.shape = shape # Compute this lazily. self._output_shape = None
[docs] def get_broadcastable_shape(self): oshape = [] for axis in self.axes: axis_shape = axis.shape() if len(axis_shape) == 0: oshape.append(1) elif len(axis_shape) == 1: oshape = oshape + list(axis_shape) else: raise ValueError("Invalid axis shape %s" % str(axis_shape)) return tuple(oshape)
[docs] def get_broadcastable_block_shape(self, partial_block_shape): # Construct a block shape equal in length to this selection's shape. # In what follows, a dimension is "empty" if the axis selector is an integer. # Starting from the last axis, # the resulting block shape is constructed as follows: # - If this dim is non-empty and partial is non-empty, emit partial's dimension. # - If this dim is non-empty and partial is empty, emit 1. # - If this dim is empty and partial is non-empty, then emit 1 if partial is 1. # - If this dim is empty and partial is empty, then emit 1. # All else raises an exception. # Partial may be longer than this shape, so long as the starting axis dims are 1. obs = [] partial_len = len(partial_block_shape) sel_len = len(self.axes) # Maintain separate indices. We don't decrement partial if we encounter an empty dim. j = -1 for i in range(-1, -sel_len - 1, -1): axis_shape = self.axes[i].shape() if len(axis_shape) == 0: obs.insert(0, 1) elif len(axis_shape) == 1: if j < -partial_len: # The rest of the axes have to be empty or of dimension 1. assert axis_shape[0] == 1 obs.insert(0, 1) else: obs.insert(0, partial_block_shape[j]) j -= 1 else: raise ValueError("Invalid axis shape %s" % str(axis_shape)) for k in range(j, -partial_len - 1, -1): assert partial_block_shape[k] == 1 return obs
[docs] def get_output_shape(self, include_indexes=False): if self._output_shape is None or include_indexes: oshape = [] for axis in self.axes: axis_shape = axis.shape() if len(axis_shape) == 0: if include_indexes: oshape.append(1) else: continue elif len(axis_shape) == 1: oshape = oshape + list(axis_shape) else: raise ValueError("Invalid axis shape %s" % str(axis_shape)) if include_indexes: # Never store output shape that includes indexes. return tuple(oshape) self._output_shape = tuple(oshape) return self._output_shape
[docs] def order(self): return np.array([axis.order() for axis in self.axes], dtype=np.intp)
[docs] def is_empty(self): for i in range(len(self.axes)): axis: AxisSelection = self.axes[i] if axis.is_empty(): return True return False
[docs] def selector(self): sel = [] for i in range(len(self.axes)): if isinstance(self[i], AxisEmpty): sel.append(slice(0, 0)) else: sel.append(self.axes[i].selector()) return tuple(sel)
[docs] def position(self, compute_stop=False): return Position.from_selection(self, compute_stop)
[docs] def basic_steps(self): for item in self.axes: if isinstance(item, AxisSlice): if item.step not in (None, 1): return False return True
[docs] def is_aligned(self, block_shape): start_pos: Position = self.position() stop_pos: Position = self.position(compute_stop=True) assert len(start_pos.value) == len(block_shape) aligned = True output_shape = self.get_output_shape(include_indexes=True) for i in range(len(start_pos.value)): start_aligned = (start_pos.value[i] % block_shape[i]) == 0 shape_aligned = (output_shape[i] % block_shape[i]) == 0 stop_aligned = stop_pos.value[i] == self.shape[i] # Check that (self - pos).get_output_shape() % block_shape == 0 # OR self.end() == self.shape, which means the last block # may not be the same shape as block_shape. # In the latter case, when using this method to do a reference # copy it's enough for the block shapes to be equal, # because the source/destination shapes are broadcasted # at the block-level. # If all conditions are met for each axis, then the selection is block-aligned # with the provided block shape. This assumes the provided block shape # is associated with the shape originally provided to this selection. aligned = aligned and start_aligned and (shape_aligned or stop_aligned) if not aligned: return False return True
def __repr__(self): return str(self.selector()) def __getitem__(self, item): assert array_utils.is_index_subscript(item) return self.axes[item] def __or__(self, other): # Union. raise NotImplementedError() def __xor__(self, other): # Union - intersection. raise NotImplementedError() def __and__(self, other): # We think of each axis of a selection as a partially ordered set. # The intersection of axis selectors preserve partial order. # The notion of intersection over selection operations # preserves result_axes: List[AxisSelection] = [] result_shape: List[int] = [] assert len(self.axes) == len(other.axes) for i in range(len(self.axes)): self_axis, other_axis = self.axes[i], other.axes[i] self_size, other_size = self.shape[i], other.shape[i] result_shape.append(min(self_size, other_size)) if isinstance(self_axis, AxisSlice) and isinstance(other_axis, AxisSlice): result_axes.append( self._slice_and_slice(self_axis, other_axis, self_size, other_size) ) elif isinstance(self_axis, AxisIndex) and isinstance(other_axis, AxisIndex): if self_axis.index == other_axis.index: result_axis: AxisIndex = AxisIndex(self_axis.index) else: result_axis: AxisEmpty = AxisEmpty() result_axes.append(result_axis) elif isinstance(self_axis, AxisArray) and isinstance(other_axis, AxisArray): result_axis: AxisArray = self._array_and_array(self_axis, other_axis) result_axes.append(result_axis) if len(result_axis.array.shape) != 1: raise Exception( "Unexpected output shape length " "of %d for array intersection." % len(result_axis.array.shape) ) elif isinstance(self_axis, AxisEmpty) or isinstance(other_axis, AxisEmpty): result_axes.append(AxisEmpty()) elif isinstance(self_axis, AxisIndex) or isinstance(other_axis, AxisIndex): # If one of the operands is an index, then the intersection is simple. # The result must be an index. res: AxisArray = self._array_and_array( self._to_array_axis(self_axis, self_size), self._to_array_axis(other_axis, other_size), ) assert len(res.array) <= 1 if len(res.array) == 1: result_axes.append(AxisIndex(res.array[0])) else: result_axes.append(AxisEmpty()) elif isinstance(self_axis, AxisArray) or isinstance(other_axis, AxisArray): # If one of the operands is an array, # then the resulting intersection must be an array. result_axes.append( self._array_and_array( self._to_array_axis(self_axis, self_size), self._to_array_axis(other_axis, other_size), ) ) else: # This should be impossible, since we've dealt with all potential mixed types. raise NotImplementedError("Operation not supported.") return BasicSelection(tuple(result_axes), tuple(result_shape)) def _to_array_axis(self, anything: AxisSelection, size): if isinstance(anything, AxisSlice): return AxisArray(slice_to_range(anything.to_slice(), size)) elif isinstance(anything, AxisIndex): return AxisArray([anything.index]) elif isinstance(anything, AxisEmpty): return AxisArray([]) else: assert isinstance(anything, AxisArray) return anything def _slice_and_slice(self, a: AxisSlice, b: AxisSlice, a_size, b_size): a_step = 1 if a.step is None else a.step b_step = 1 if b.step is None else b.step if a_step == 1 and b_step == 1: start = b.start if a.start < b.start else a.start stop = a.stop if a.stop < b.stop else b.stop step = None if a.step is None and b.step is None else 1 elif (a_step < 0 < b_step) or (b_step < 0 < a_step): # Steps running in opposite directions. # Based on intersection of partial orders, the order is undefined. start, stop, step = 0, 0, None elif 0 < a_step and 0 < b_step: assert a.start >= 0 and b.start >= 0 and a.stop >= 0 and b.stop >= 0 # TODO(hme): Optimize this by avoiding array instantiation. arr = self._array_and_array( self._to_array_axis(a, a_size), self._to_array_axis(b, b_size) ) if arr.array.size == 0: start, stop, step = 0, 0, None else: steps = np.diff(arr.array) if steps.size == 0: # Occurs only if one item is selected. # This could be from same start value, # or slice(0, 4, 3) # slice(1, 4, 2), # or slice(0, 10, 3) # slice(2, 10, 2). # The safe thing to do is set step large enough # so that the resulting slice only selects # the first value. start = int(arr.array[0]) stop = min(a.stop, b.stop) step = stop else: # Steps should be all eq. assert not np.any(np.diff(steps)) step = int(steps[0]) start = int(arr.array[0]) stop = min(a.stop, b.stop) elif a_step < 0 and b_step < 0: assert a.start < 0 and b.start < 0 and a.stop < 0 and b.stop < 0 arr = self._array_and_array( self._to_array_axis(a, a_size), self._to_array_axis(b, b_size) ) if arr.array.size == 0: start, stop, step = 0, 0, None else: steps = np.diff(arr.array) if steps.size == 0: # Same logic as with positive steps. start = int(arr.array[0]) stop = max(a.stop, b.stop) step = stop else: # Steps should be all eq. assert not np.any(np.diff(steps)) start = int(arr.array[0]) stop = max(a.stop, b.stop) step = int(steps[0]) else: raise Exception("Impossible.") return AxisSlice(start, stop, step) def _array_and_array(self, a: AxisArray, b: AxisArray): return AxisArray(self._np_array_and_array(a.array, b.array)) def _get_order(self, arr: np.ndarray): assert len(arr.shape) == 1 order = get_array_order(arr, axis=0) if order == 0: # Includes diffs containing 0, # which imply repetitions. # This is not allowed for basic selection. raise ValueError("Input array cannot be interpreted as ordered set.") return order def _np_array_and_array(self, a: np.ndarray, b: np.ndarray): # We can only support intersection of single-axis arrays, # which we interpret as ordered sets. # Consider intersection of a matrix. We may have rows of # different shape. a_order, b_order = self._get_order(a), self._get_order(b) if a_order != b_order: # Such an order exists, but the order # is undefined for elements x,y \in S, # where x \neq y. # For our purposes, this is undefined. return np.array([], dtype=np.intp) else: return np.sort(np.array(list(set(a) & set(b)), dtype=np.intp))[ :: np.sign(a_order) ]
[docs]class AdvancedSelection: # Allow mixture of slices and advanced indexing. # Will require some basic intersection operations, # but not as general as BasicSelection. def __init__(self): raise NotImplementedError()
[docs]class Position:
[docs] @classmethod def from_selection(cls, sel: BasicSelection, compute_stop=False): shape_dim = len(sel.shape) value: np.ndarray = np.empty(shape_dim, dtype=np.intp) for i in range(shape_dim): axsel: AxisSelection = sel[i] if isinstance(sel[i], AxisSlice): axsel: AxisSlice = sel[i] value[i] = axsel.stop if compute_stop else axsel.start elif isinstance(sel[i], AxisIndex): axsel: AxisIndex = sel[i] value[i] = axsel.index elif isinstance(sel[i], AxisArray): axsel: AxisArray = sel[i] if compute_stop: value[i] = ( max(axsel.array) if axsel.order() > 0 else min(axsel.array) ) else: value[i] = ( min(axsel.array) if axsel.order() > 0 else max(axsel.array) ) else: ValueError("Unexpected axis type %s" % type(sel[i])) if axsel.order() < 0: value[i] += 1 return cls(value)
[docs] @classmethod def from_dim(cls, dim: int): return cls(np.zeros(dim, dtype=np.intp))
def __init__(self, value: np.ndarray): self.dim = len(value) assert value.dtype == np.intp self.value = value def __repr__(self): return str(self.value.tolist()) def __add__(self, other): return self.bop(other, "add") __radd__ = __add__ def __sub__(self, other): return self.bop(other, "sub") def __rsub__(self, other): return self.bop(other, "rsub")
[docs] def bop(self, other, bop): if isinstance(other, Position): assert self.dim == other.dim result = Position.from_dim(self.dim) if bop == "add": result.value = self.value + other.value elif bop == "sub": result.value = self.value - other.value elif bop == "rsub": result.value = other.value - self.value return result elif isinstance(other, BasicSelection): if bop == "sub": # This is not allowed for selections. # The new subscript bounds are unclear in this case. raise ValueError("Cannot subtract BasicSelection from Position.") elif bop not in ("add", "rsub"): raise ValueError("Unsupported op encountered %s" % bop) assert self.dim == len(other.shape) # TODO (hme): Test shape addition / subtraction under various circumstances. # This should work okay, but does it work because of prior assumptions # on AxisSelection types? This is okay, but we should answer this # question. shape: list = list(other.shape) sel: list = list(other.selector()) for i in range(self.dim): if isinstance(sel[i], slice): if bop == "add": sel[i] = slice( sel[i].start + self.value[i], sel[i].stop + self.value[i], sel[i].step, ) shape[i] += self.value[i] elif bop == "rsub": shape[i] -= self.value[i] sel[i] = slice( sel[i].start - self.value[i], sel[i].stop - self.value[i], sel[i].step, ) elif array_utils.is_index_subscript(sel[i]): if bop == "add": sel[i] = sel[i] + self.value[i] elif bop == "rsub": sel[i] = sel[i] - self.value[i] elif isinstance(sel[i], np.ndarray): if bop == "add": sel[i] = sel[i].copy() + self.value[i] elif bop == "rsub": sel[i] = sel[i].copy() - self.value[i] else: raise ValueError( "Unsupported instance encountered %s" % type(sel[i]) ) shape: tuple = tuple(shape) sel: tuple = tuple(sel) return BasicSelection.from_subscript(shape, sel)