Source code for nums.core.array.blockarray

# Copyright (C) 2020 NumS Development Team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import warnings
import itertools

import numpy as np

from nums.core.array import utils as array_utils
from nums.core.array.base import BlockArrayBase, Block
from nums.core.array.view import ArrayView
from nums.core.grid.grid import ArrayGrid
from nums.core.compute.compute_manager import ComputeManager


# pylint: disable=too-many-lines


[docs]class BlockArray(BlockArrayBase):
[docs] @classmethod def empty(cls, shape, block_shape, dtype, cm: ComputeManager): return BlockArray.create("empty", shape, block_shape, dtype, cm)
[docs] @classmethod def create(cls, create_op_name, shape, block_shape, dtype, cm: ComputeManager): grid = ArrayGrid(shape=shape, block_shape=block_shape, dtype=dtype.__name__) grid_meta = grid.to_meta() arr = BlockArray(grid, cm) for grid_entry in grid.get_entry_iterator(): arr.blocks[grid_entry].oid = cm.new_block( create_op_name, grid_entry, grid_meta, syskwargs={"grid_entry": grid_entry, "grid_shape": grid.grid_shape}, ) return arr
[docs] @classmethod def from_scalar(cls, val, cm): if not array_utils.is_scalar(val): raise ValueError("%s is not a scalar." % val) return BlockArray.from_np(np.array(val), block_shape=(), copy=False, cm=cm)
[docs] @classmethod def from_oid(cls, oid, shape, dtype, cm): block_shape = shape dtype = array_utils.to_dtype_cls(dtype) grid = ArrayGrid(shape, block_shape, dtype.__name__) ba = BlockArray(grid, cm) for i, grid_entry in enumerate(grid.get_entry_iterator()): assert i == 0 ba.blocks[grid_entry].oid = oid return ba
[docs] @classmethod def from_np(cls, arr, block_shape, copy, cm): dtype_str = str(arr.dtype) grid = ArrayGrid(arr.shape, block_shape, dtype_str) rarr = BlockArray(grid, cm) grid_entry_iterator = grid.get_entry_iterator() for grid_entry in grid_entry_iterator: grid_slice = grid.get_slice(grid_entry) block = arr[grid_slice] if copy: block = np.copy(block) rarr.blocks[grid_entry].oid = cm.put( block, syskwargs={"grid_entry": grid_entry, "grid_shape": grid.grid_shape}, ) rarr.blocks[grid_entry].dtype = getattr(np, dtype_str) return rarr
[docs] @classmethod def from_blocks(cls, arr: np.ndarray, result_shape, cm): sample_idx = tuple(0 for dim in arr.shape) if isinstance(arr, Block): sample_block = arr result_shape = () else: sample_block = arr[sample_idx] if result_shape is None: result_shape = array_utils.shape_from_block_array(arr) result_block_shape = sample_block.shape result_dtype_str = sample_block.dtype.__name__ result_grid = ArrayGrid( shape=result_shape, block_shape=result_block_shape, dtype=result_dtype_str ) assert arr.shape == result_grid.grid_shape result = BlockArray(result_grid, cm) for grid_entry in result_grid.get_entry_iterator(): if isinstance(arr, Block): block: Block = arr else: block: Block = arr[grid_entry] result.blocks[grid_entry] = block return result
[docs] def copy(self): grid_copy = self.grid.from_meta(self.grid.to_meta()) rarr_copy = BlockArray(grid_copy, self.cm) for grid_entry in grid_copy.get_entry_iterator(): rarr_copy.blocks[grid_entry] = self.blocks[grid_entry].copy() return rarr_copy
[docs] def touch(self): """ "Touch" an array. This is an efficient distributed "wait" operation. """ oids = [] for grid_entry in self.grid.get_entry_iterator(): block: Block = self.blocks[grid_entry] oids.append( self.cm.touch( block.oid, syskwargs={ "grid_entry": block.grid_entry, "grid_shape": block.grid_shape, }, ) ) self.cm.get(oids) return self
[docs] def is_single_block(self): return self.blocks.size == 1
[docs] def to_single_block(self, replicate=False): res: BlockArray = self.reshape(*self.shape, block_shape=self.shape) if replicate: block: Block = res.blocks.item() num_devices: int = len(self.cm.devices()) for i in range(num_devices): self.cm.touch( block.oid, syskwargs={ "grid_entry": (i,), "grid_shape": (num_devices,), }, ) return res
[docs] def reshape(self, *shape, **kwargs): block_shape = kwargs.get("block_shape", None) if array_utils.is_int(shape): shape = (shape,) elif len(shape) == 0: shape = self.shape elif isinstance(shape[0], (tuple, list)): assert len(shape) == 1 shape = shape[0] else: assert all(np.issubdtype(type(n), int) for n in shape) shape = Reshape.compute_shape(self.shape, shape) if block_shape is None: if shape == self.shape: # This is a noop. block_shape = self.block_shape else: block_shape = self.cm.get_block_shape(shape, self.dtype) return Reshape()(self, shape, block_shape)
[docs] def expand_dims(self, axis): """ This function refers to the numpy implementation of expand_dims. """ if type(axis) not in (tuple, list): axis = (axis,) out_ndim = len(axis) + self.ndim axis = np.core.numeric.normalize_axis_tuple(axis, out_ndim) shape_it = iter(self.shape) block_shape_it = iter(self.block_shape) shape = [1 if ax in axis else next(shape_it) for ax in range(out_ndim)] block_shape = [ 1 if ax in axis else next(block_shape_it) for ax in range(out_ndim) ] return self.reshape(shape, block_shape=block_shape)
[docs] def squeeze(self): shape = self.shape block_shape = self.block_shape new_shape = [] new_block_shape = [] for s, b in zip(shape, block_shape): if s == 1: assert b == 1 continue new_shape.append(s) new_block_shape.append(b) return self.reshape(new_shape, block_shape=new_block_shape)
[docs] def swapaxes(self, axis1, axis2): meta_swap = self.grid.to_meta() shape = list(meta_swap["shape"]) block_shape = list(meta_swap["block_shape"]) dim = len(shape) if axis1 >= dim or axis2 >= dim: raise ValueError("axis is larger than the array dimension") shape[axis1], shape[axis2] = shape[axis2], shape[axis1] block_shape[axis1], block_shape[axis2] = block_shape[axis2], block_shape[axis1] meta_swap["shape"] = tuple(shape) meta_swap["block_shape"] = tuple(block_shape) grid_swap = ArrayGrid.from_meta(meta_swap) rarr_src = np.ndarray(self.blocks.shape, dtype="O") for grid_entry in self.grid.get_entry_iterator(): rarr_src[grid_entry] = self.blocks[grid_entry].swapaxes(axis1, axis2) rarr_src = rarr_src.swapaxes(axis1, axis2) rarr_swap = BlockArray(grid_swap, self.cm, rarr_src) return rarr_swap
[docs] def transpose(self, defer=False, redistribute=False): """ Transpose this matrix. Only use defer with arithmetic operations. Setting redistribute to True may significantly impact performance. :param defer: When true, the transpose operation will be applied with the next arithmetic operation. :param redistribute: If defer is false, setting this to true will redistribute the data according to the device grid (data placement policy). This parameter has no effect when defer is true. :return: The transposed matrix. """ if defer and redistribute: warnings.warn("defer is True, redistribute=True will be ignored.") metaT = self.grid.to_meta() metaT["shape"] = tuple(reversed(metaT["shape"])) metaT["block_shape"] = tuple(reversed(metaT["block_shape"])) gridT = ArrayGrid.from_meta(metaT) rarrT = BlockArray(gridT, self.cm) rarrT.blocks = np.copy(self.blocks.T) for grid_entry in rarrT.grid.get_entry_iterator(): rarrT.blocks[grid_entry] = rarrT.blocks[grid_entry].transpose( defer, redistribute ) return rarrT
def __getattr__(self, item): if item == "__array_priority__" or item == "__array_struct__": # This is triggered by a numpy array on the LHS. raise TypeError("Unexpected conversion attempt from BlockArray to ndarray.") elif item == "ndim": return len(self.shape) elif item == "T": return self.transpose() else: raise NotImplementedError(item) def _preprocess_subscript(self, item): if not isinstance(item, tuple): ss = (item,) else: ss = item # We need to fetch any block arrays. tmp = [] for entry in ss: if isinstance(entry, BlockArray): val = entry.get() else: val = entry if isinstance(val, list): val = np.array(val) if isinstance(val, np.ndarray): # If this is a Boolean mask, convert it to integers. if array_utils.is_bool(val.dtype, type_test=True): val = np.arange(len(val))[val] if val.shape == (): val = val.item() tmp.append(val) ss = tuple(tmp) is_handled_advanced = False array_encountered = False axis = None # Check if this is a supported advanced indexing operation. for i, entry in enumerate(ss): if isinstance(entry, slice) and entry.start is None and entry.stop is None: continue elif array_utils.is_int(entry) or array_utils.is_uint(entry): continue elif array_utils.is_array_like(entry): if array_encountered: raise NotImplementedError( "Advanced indexing is only supported along a single axis." ) is_handled_advanced = True array_encountered = True axis = i if not (np.all(0 <= entry) and np.all(entry < self.shape[axis])): raise IndexError( "Advanced indexing array along axis %s is out of bounds." % axis ) else: if array_encountered: raise NotImplementedError( "Advanced indexing is only supported " "with full slices and integers along other axes." ) is_handled_advanced = False break return ss, is_handled_advanced, axis def __getitem__(self, item): ss, is_handled_advanced, axis = self._preprocess_subscript(item) if is_handled_advanced: # Treat this as a shuffle. return self._advanced_single_array_select(ss, axis=axis) av: ArrayView = ArrayView.from_block_array(self) # TODO (hme): We don't have to create, but do so for now until we need to optimize. return av[ss].create(BlockArray) def _advanced_single_array_select(self, ss: tuple, axis: int = 0): # Create output array along the axis of the selection operation. # We don't allocate zeros for output array. Instead, we let the update kernel # create the initial set of zeros to save some memory. array = ss[axis] assert len(array.shape) == 1 # TODO: We may encounter block shape incompatability due to this. block_size = self.block_shape[axis] self.cm.update_block_shape_map(array.shape[0], block_size) dst_axis = None shape = [] block_shape = [] for i in range(len(self.shape)): if i == axis: dst_axis = len(shape) shape.append(array.shape[0]) block_shape.append(block_size) elif i < len(ss): if isinstance(ss[i], slice): shape.append(self.shape[i]) block_shape.append(self.block_shape[i]) else: # It's an index. We drop the indices. continue else: shape.append(self.shape[i]) block_shape.append(self.block_shape[i]) dst_arr = BlockArray( ArrayGrid( shape=tuple(shape), block_shape=tuple(block_shape), dtype=self.dtype.__name__, ), cm=self.cm, ) src_arr = self np_ss = ss ss = self.cm.put( ss, syskwargs={ "grid_entry": (0,), "grid_shape": (1,), }, ) for src_grid_entry in src_arr.grid.get_entry_iterator(): src_coord: tuple = src_arr.grid.get_entry_coordinates(src_grid_entry) src_block: Block = src_arr.blocks[src_grid_entry] # Make sure index values in subscript are within bounds of src_arr. # We also prepare dst_grid_entry here. dst_grid_entry_list = [] skip = False for curr_axis in range(len(np_ss)): if curr_axis == axis: dst_grid_entry_list.append(None) elif isinstance(np_ss[curr_axis], slice): dst_grid_entry_list.append(src_grid_entry[curr_axis]) elif not ( src_coord[curr_axis] <= np_ss[curr_axis] < src_coord[curr_axis] + src_block.shape[curr_axis] ): skip = True break if skip: continue for curr_axis in range(len(np_ss), len(src_grid_entry)): dst_grid_entry_list.append(src_grid_entry[curr_axis]) for j in range(dst_arr.grid.grid_shape[dst_axis]): dst_grid_entry_list[dst_axis] = j dst_grid_entry = tuple(dst_grid_entry_list) dst_block: Block = dst_arr.blocks[dst_grid_entry] dst_coord: tuple = dst_arr.grid.get_entry_coordinates(dst_grid_entry) if dst_block.oid is None: dst_arg = (dst_block.shape, dst_block.dtype) else: dst_arg = dst_block.oid dst_block.oid = self.cm.advanced_select_block_along_axis( dst_arg, src_block.oid, ss, dst_axis, axis, dst_coord, src_coord, syskwargs={ "grid_entry": dst_grid_entry, "grid_shape": dst_arr.grid.grid_shape, }, ) return dst_arr def __setitem__(self, key, value): value: BlockArray = BlockArray.to_block_array(value, self.cm) ss, is_handled_advanced, axis = self._preprocess_subscript(key) if is_handled_advanced: return self._advanced_single_array_assign(ss, value, axis) av: ArrayView = ArrayView.from_block_array(self) av[key] = value def _advanced_single_array_assign( self, ss: tuple, value, axis: int, ): array = ss[axis] assert len(array.shape) == 1 # The subscript contains a single array. We therefore know one of two things is true: # 1. value is the same shape as self along axes != axis. # 2. value is scalar or 1-dimensional. # We currently don't support the case where value may broadcasted if it has more dims. # This should be a straight-forward future task. value: BlockArray = value mode = None if len(value.shape) == 0: # subscripted value per block will broadcast to other dimensions. mode = "scalar" elif len(value.shape) == 1: # assert len(value.shape) == len(ss) mode = "single-dim" # Can broadcast if trailing dim matches. assert len(ss[axis]) == value.shape[0] for i in range(len(self.shape)): if i == axis: assert len(ss[i]) == value.shape[0] elif i < axis: # Nothing to check here. # These entries are : or integer. pass else: if i < len(ss): if not isinstance(ss[i], slice): # ss[i] is an integer. continue # If we're here, then the rest of the subscript operator # will resolve to :, which is not broadcastable. raise ValueError( "Cannot broadcast input array " "from shape %s into shape %s" % (value.shape, tuple([value.shape[0]] + list(self.shape[i:]))) ) elif len(value.shape) == len(self.shape): mode = "multi-dim" new_block_shape = [] for i in range(len(self.shape)): if i == axis: new_block_shape.append(value.block_shape[i]) elif i < len(ss) and ( array_utils.is_int(ss[i]) or array_utils.is_uint(ss[i]) ): # These entries are : or integer. # assert array_utils.is_int(ss[i]) or array_utils.is_uint(ss[i]) assert value.shape[i] == 1 new_block_shape.append(1) else: assert value.shape[i] == self.shape[i], "Shape mismatch." new_block_shape.append(self.block_shape[i]) new_block_shape = tuple(new_block_shape) if new_block_shape != value.block_shape: # TODO: This message occurs on X[idx[:n]] = X[idx[n:]] + 0.5, # even when n is a multiple of block_shape[0]. warnings.warn( ("Assigned value block shape %s " % str(value.block_shape)) + ( "does not match block shape %s of assignee. " % str(new_block_shape) ) + "Applying reshape to assigned value." ) value = value.reshape(block_shape=new_block_shape) # Like select, iterate over destination blocks along the axis being updated. # e.g. if self is 2-dim and axis=0, then fix the row and iterate over the columns. # If value has the same shape as self, then for each destination block, # iterate over the blocks in value along axis. # e.g. if self is 2-dim and axis=0, then for the given column, iterate over the rows # of value. # If value is scalar, then attempt to assign it to every destination block. # If value is 1-dim, the just iterate over the dim and assign accordingly. dst_arr = self src_arr = value src_grid_shape = src_arr.grid.grid_shape np_ss = ss ss = self.cm.put( ss, syskwargs={ "grid_entry": (0,), "grid_shape": (1,), }, ) for dst_grid_entry in dst_arr.grid.get_entry_iterator(): dst_block: Block = dst_arr.blocks[dst_grid_entry] dst_coord: tuple = dst_arr.grid.get_entry_coordinates(dst_grid_entry) # Make sure index values in subscript are within bounds of dst_arr. # We don't need to check src_arr: # 1) The block shapes of dst_arr and src_arr are the same except along axis # and indices in ss. We are not concerned with axes the indices in ss correspond to, # because they are of size 1 in src_arr => we only need to check that indices # fall within bounds of dst_arr. # 2) For each dst_arr, we test the values # to assign to dst_arr by traverse the src_arr along axis. # Thus, size along all other axes are equal or broadcasted. skip = False for curr_axis in range(len(np_ss)): if curr_axis == axis or isinstance(np_ss[curr_axis], slice): continue if not ( dst_coord[curr_axis] <= np_ss[curr_axis] < dst_coord[curr_axis] + dst_block.shape[curr_axis] ): skip = True break if skip: continue if mode == "scalar": src_block: Block = src_arr.blocks.item() src_coord: tuple = src_arr.grid.get_entry_coordinates( src_block.grid_entry ) dst_block.oid = self.cm.advanced_assign_block_along_axis( dst_block.oid, src_block.oid, ss, axis, dst_coord, src_coord, syskwargs={ "grid_entry": dst_grid_entry, "grid_shape": dst_arr.grid.grid_shape, }, ) elif mode == "single-dim": for src_grid_entry in src_arr.grid.get_entry_iterator(): src_block: Block = src_arr.blocks[src_grid_entry] src_coord: tuple = src_arr.grid.get_entry_coordinates( src_grid_entry ) dst_block.oid = self.cm.advanced_assign_block_along_axis( dst_block.oid, src_block.oid, ss, axis, dst_coord, src_coord, syskwargs={ "grid_entry": dst_grid_entry, "grid_shape": dst_arr.grid.grid_shape, }, ) elif mode == "multi-dim": for j in range(src_grid_shape[axis]): # Apply sel from each block along axis of src_arr. # e.g. for 2 dim array, we fix the column blocks # given by dst_grid_entry, and iterate over the rows. src_grid_entry = tuple( list(dst_grid_entry[:axis]) + [j] + list(dst_grid_entry[axis + 1 :]) ) src_block: Block = src_arr.blocks[src_grid_entry] src_coord: tuple = src_arr.grid.get_entry_coordinates( src_grid_entry ) dst_block.oid = self.cm.advanced_assign_block_along_axis( dst_block.oid, src_block.oid, ss, axis, dst_coord, src_coord, syskwargs={ "grid_entry": dst_grid_entry, "grid_shape": dst_arr.grid.grid_shape, }, ) return dst_arr
[docs] @staticmethod def to_block_array(obj, cm: ComputeManager, block_shape=None): if isinstance(obj, BlockArray): return obj if isinstance(obj, np.ndarray): np_array = obj elif isinstance(obj, list): np_array = np.array(obj) elif array_utils.is_scalar(obj): return BlockArray.from_scalar(obj, cm) else: raise Exception("Unsupported type %s" % type(obj)) if block_shape is None: block_shape = cm.get_block_shape(np_array.shape, np_array.dtype) return BlockArray.from_np(np_array, block_shape, False, cm)
[docs] def check_or_convert_other(self, other, compute_block_shape=False): block_shape = None if compute_block_shape else self.block_shape return BlockArray.to_block_array(other, self.cm, block_shape=block_shape)
[docs] def ufunc(self, op_name): result = self.copy() for grid_entry in self.grid.get_entry_iterator(): result.blocks[grid_entry] = self.blocks[grid_entry].ufunc(op_name) return result
def _tree_reduce( self, op_name, blocks_or_oids, result_grid_entry, result_grid_shape ): """ Basic tree reduce imp. Schedules op on same node as left operand. :param op_name: The reduction op. :param blocks_or_oids: A list of type Block or a list of tuples. Tuples must be of the form (oid, grid_entry, grid_shape, transposed) :param result_grid_entry: The grid entry of the result block. This will be used to compute the final reduction step. :param result_grid_shape: The grid entry of the result block. This will be used to compute the final reduction step. :return: The oid of the result. """ oid_list = blocks_or_oids if isinstance(blocks_or_oids[0], Block): oid_list = [ (b.oid, b.grid_entry, b.grid_shape, b.transposed) for b in blocks_or_oids ] if len(oid_list) == 1: return oid_list[0][0] q = oid_list while len(q) > 1: a_oid, a_ge, a_gs, a_T = q.pop(0) b_oid, _, _, b_T = q.pop(0) ge, gs = ( (result_grid_entry, result_grid_shape) if len(q) == 0 else (a_ge, a_gs) ) c_oid = self.cm.bop_reduce( op_name, a_oid, b_oid, a_T, b_T, syskwargs={ "grid_entry": ge, "grid_shape": gs, }, ) q.append((c_oid, ge, gs, False)) r_oid, r_ge, r_gs, _ = q.pop(0) assert r_ge == result_grid_entry assert r_gs == result_grid_shape return r_oid
[docs] def reduce_axis(self, op_name, axis, keepdims=False): if not (axis is None or isinstance(axis, (int, np.int32, np.int64))): raise NotImplementedError("Only integer axis is currently supported.") if 0 in self.shape: return BlockArray.create("zeros", (), (), float, self.cm) block_reduced_oids = np.empty_like(self.blocks, dtype=tuple) for grid_entry in self.grid.get_entry_iterator(): block = self.blocks[grid_entry] block_oid = self.cm.reduce_axis( op_name=op_name, arr=block.oid, axis=axis, keepdims=keepdims, transposed=block.transposed, syskwargs={ "grid_entry": block.grid_entry, "grid_shape": block.grid_shape, }, ) block_reduced_oids[grid_entry] = ( block_oid, block.grid_entry, block.grid_shape, False, ) result_shape = [] result_block_shape = [] for curr_axis in range(len(self.shape)): axis_size, axis_block_size = ( self.shape[curr_axis], self.block_shape[curr_axis], ) if curr_axis == axis or axis is None: if keepdims: axis_size, axis_block_size = 1, 1 else: continue result_shape.append(axis_size) result_block_shape.append(axis_block_size) result_shape = tuple(result_shape) result_block_shape = tuple(result_block_shape) result_dtype = array_utils.get_reduce_output_type(op_name, self.dtype) result_grid = ArrayGrid( shape=result_shape, block_shape=result_block_shape, dtype=result_dtype.__name__, ) result = BlockArray(result_grid, self.cm) if axis is None: if result.shape == (): result_block: Block = result.blocks[()] else: result_block: Block = result.blocks[:].item() result_block.oid = self._tree_reduce( op_name, block_reduced_oids.flatten().tolist(), result_block.grid_entry, result_block.grid_shape, ) else: for result_grid_entry in result_grid.get_entry_iterator(): block_reduced_oids_axis = [] for sum_dim in range(self.grid.grid_shape[axis]): grid_entry = list(result_grid_entry) if keepdims: grid_entry[axis] = sum_dim else: grid_entry = grid_entry[:axis] + [sum_dim] + grid_entry[axis:] grid_entry = tuple(grid_entry) block_reduced_oids_axis.append(block_reduced_oids[grid_entry]) result_block: Block = result.blocks[result_grid_entry] result_block.oid = self._tree_reduce( op_name, block_reduced_oids_axis, result_block.grid_entry, result_block.grid_shape, ) return result
################# # Linear Algebra ################# def _compute_tensordot_syskwargs(self, self_block: Block, other_block: Block): # Schedule on larger block. if np.product(self_block.shape) >= np.product(other_block.shape): return self_block.true_grid_entry(), self_block.true_grid_shape() else: return other_block.true_grid_entry(), other_block.true_grid_shape()
[docs] def tensordot(self, other, axes=2): if isinstance(axes, int): pass elif array_utils.is_array_like(axes): raise NotImplementedError("Non-integer axes is currently not supported.") else: raise TypeError(f"Unexpected axes type '{type(axes).__name__}'") other = self.check_or_convert_other(other, compute_block_shape=True) if array_utils.np_tensordot_param_test( self.shape, self.ndim, other.shape, other.ndim, axes ): raise ValueError("shape-mismatch for sum") this_axes = self.grid.grid_shape[:-axes] this_sum_axes = self.grid.grid_shape[-axes:] other_axes = other.grid.grid_shape[axes:] other_sum_axes = other.grid.grid_shape[:axes] assert this_sum_axes == other_sum_axes result_shape = tuple(self.shape[:-axes] + other.shape[axes:]) result_block_shape = tuple(self.block_shape[:-axes] + other.block_shape[axes:]) result_grid = ArrayGrid( shape=result_shape, block_shape=result_block_shape, dtype=array_utils.get_bop_output_type( "tensordot", self.dtype, other.dtype ).__name__, ) assert result_grid.grid_shape == tuple(this_axes + other_axes) result = BlockArray(result_grid, self.cm) this_dims = list(itertools.product(*map(range, this_axes))) other_dims = list(itertools.product(*map(range, other_axes))) sum_dims = list(itertools.product(*map(range, this_sum_axes))) for i in this_dims: for j in other_dims: grid_entry = tuple(i + j) result_block: Block = result.blocks[grid_entry] sum_oids = [] for k in sum_dims: self_block: Block = self.blocks[tuple(i + k)] other_block: Block = other.blocks[tuple(k + j)] dot_grid_args = self._compute_tensordot_syskwargs( self_block, other_block ) dotted_oid = self.cm.bop( "tensordot", self_block.oid, other_block.oid, self_block.transposed, other_block.transposed, axes=axes, syskwargs={ "grid_entry": dot_grid_args[0], "grid_shape": dot_grid_args[1], }, ) sum_oids.append( (dotted_oid, dot_grid_args[0], dot_grid_args[1], False) ) result_block.oid = self._tree_reduce( "sum", sum_oids, result_block.grid_entry, result_block.grid_shape ) return result
def __matmul__(self, other): if len(self.shape) > 2: # TODO (bcp): NumPy's implementation does a stacked matmul, which is not supported yet. raise NotImplementedError( "Matrix multiply for tensors of rank > 2 not supported yet." ) else: return self.tensordot(other, 1) def __rmatmul__(self, other): other = self.check_or_convert_other(other) return other @ self __imatmul__ = __matmul__ ################# # Arithmetic ################# def _fast_element_wise(self, op_name, other): """ Implements fast scheduling for basic element-wise operations. """ dtype = array_utils.get_bop_output_type(op_name, self.dtype, other.dtype) # Schedule the op first. blocks = np.empty(shape=self.grid.grid_shape, dtype=Block) for grid_entry in self.grid.get_entry_iterator(): self_block: Block = self.blocks[grid_entry] other_block: Block = other.blocks[grid_entry] blocks[grid_entry] = block = Block( grid_entry=grid_entry, grid_shape=self_block.grid_shape, rect=self_block.rect, shape=self_block.shape, dtype=dtype, transposed=False, cm=self.cm, ) block.oid = self.cm.bop( op_name, self_block.oid, other_block.oid, self_block.transposed, other_block.transposed, axes={}, syskwargs={ "grid_entry": grid_entry, "grid_shape": self.grid.grid_shape, }, ) return BlockArray( ArrayGrid(self.shape, self.block_shape, dtype.__name__), self.cm, blocks=blocks, ) def __elementwise__(self, op_name, other): other = self.check_or_convert_other(other) if self.shape == other.shape and self.block_shape == other.block_shape: return self._fast_element_wise(op_name, other) blocks_op = self.blocks.__getattribute__("__%s__" % op_name) return BlockArray.from_blocks( blocks_op(other.blocks), result_shape=None, cm=self.cm ) def __neg__(self): return self.ufunc("negative") def __pos__(self): return self def __abs__(self): return self.ufunc("abs") def __mod__(self, other): return self.__elementwise__("mod", other) def __rmod__(self, other): other = self.check_or_convert_other(other) return other.__elementwise__("mod", self) __imod__ = __mod__ def __add__(self, other): return self.__elementwise__("add", other) def __radd__(self, other): other = self.check_or_convert_other(other) return other.__elementwise__("add", self) __iadd__ = __add__ def __sub__(self, other): return self.__elementwise__("sub", other) def __rsub__(self, other): other = self.check_or_convert_other(other) return other.__elementwise__("sub", self) __isub__ = __sub__ def __mul__(self, other): return self.__elementwise__("mul", other) def __rmul__(self, other): other = self.check_or_convert_other(other) return other.__elementwise__("mul", self) __imul__ = __mul__ def __truediv__(self, other): return self.__elementwise__("truediv", other) def __rtruediv__(self, other): other = self.check_or_convert_other(other) return other / self __itruediv__ = __truediv__ def __floordiv__(self, other): return self.__elementwise__("floor_divide", other) def __rfloordiv__(self, other): other = self.check_or_convert_other(other) return other.__elementwise__("floor_divide", self) __ifloordiv__ = __floordiv__ def __pow__(self, other): return self.__elementwise__("pow", other) def __rpow__(self, other): other = self.check_or_convert_other(other) return other ** self __ipow__ = __pow__ ################# # Inequalities ################# def __inequality__(self, op, other): other = self.check_or_convert_other(other) assert ( other.shape == () or other.shape == self.shape ), "Currently supports comparison with scalars only." shape = array_utils.broadcast(self.shape, other.shape).shape block_shape = array_utils.broadcast_block_shape( self.shape, other.shape, self.block_shape ) dtype = bool.__name__ grid = ArrayGrid(shape, block_shape, dtype) result = BlockArray(grid, self.cm) for grid_entry in result.grid.get_entry_iterator(): if other.shape == (): other_block: Block = other.blocks.item() else: other_block: Block = other.blocks[grid_entry] result.blocks[grid_entry] = self.blocks[grid_entry].bop( op, other_block, args={} ) return result def __ge__(self, other): return self.__inequality__("ge", other) def __rge__(self, other): other = self.check_or_convert_other(other) return other.__inequality__("ge", self) def __gt__(self, other): return self.__inequality__("gt", other) def __rgt__(self, other): other = self.check_or_convert_other(other) return other.__inequality__("gt", self) def __le__(self, other): return self.__inequality__("le", other) def __rle__(self, other): other = self.check_or_convert_other(other) return other.__inequality__("le", self) def __lt__(self, other): return self.__inequality__("lt", other) def __rlt__(self, other): other = self.check_or_convert_other(other) return other.__inequality__("lt", self) def __eq__(self, other): return self.__inequality__("eq", other) def __req__(self, other): other = self.check_or_convert_other(other) return other.__inequality__("eq", self) def __ne__(self, other): return self.__inequality__("ne", other) def __rne__(self, other): other = self.check_or_convert_other(other) return other.__inequality__("ne", self) ################## # Boolean ################## # TODO (hme): Type check bool ops. def __bool__(self): # pylint: disable=no-member if np.sum(self.shape) == len(self.shape): # If all ones or scalar, then this is defined. return self.get().__bool__() return True def __invert__(self): return self.ufunc("invert") def __or__(self, other): return self.__elementwise__("bitwise_or", other) def __ror__(self, other): other = self.check_or_convert_other(other) return other.__elementwise__("bitwise_or", self) __ior__ = __or__ def __and__(self, other): return self.__elementwise__("bitwise_and", other) def __rand__(self, other): other = self.check_or_convert_other(other) return other.__elementwise__("bitwise_and", self) __iand__ = __and__ def __xor__(self, other): return self.__elementwise__("bitwise_xor", other) def __rxor__(self, other): other = self.check_or_convert_other(other) return other.__elementwise__("bitwise_xor", self) __ixor__ = __xor__ def __lshift__(self, other): return self.__elementwise__("left_shift", other) def __rlshift__(self, other): other = self.check_or_convert_other(other) return other.__elementwise__("left_shift", self) __ilshift__ = __lshift__ def __rshift__(self, other): return self.__elementwise__("right_shift", other) def __rrshift__(self, other): other = self.check_or_convert_other(other) return other.__elementwise__("right_shift", self) __irshift__ = __rshift__ # All operators: https://docs.python.org/3/library/operator.html
[docs] def astype(self, dtype): grid = ArrayGrid(self.shape, self.block_shape, dtype.__name__) result = BlockArray(grid, self.cm) for grid_entry in result.grid.get_entry_iterator(): result.blocks[grid_entry] = self.blocks[grid_entry].astype(dtype) return result
[docs] def flattened_oids(self): oids = [] for grid_entry in self.grid.get_entry_iterator(): oid = self.blocks[grid_entry].oid oids.append(oid) return oids
[docs]class Reshape:
[docs] @staticmethod def compute_shape(shape, input_shape): size = np.product(shape) if -1 in input_shape: new_shape = [] other_dim_prod = 1 negative_one_seen = False for dim in input_shape: if dim == -1: if negative_one_seen: raise Exception("Only one -1 permitted in reshape.") negative_one_seen = True continue other_dim_prod *= dim if size % other_dim_prod != 0: raise Exception("Invalid shape.") for dim in input_shape: if dim == -1: new_shape.append(size // other_dim_prod) else: new_shape.append(dim) else: new_shape = input_shape assert size == np.product(new_shape) return new_shape
def _group_index_lists_by_block( self, dst_slice_tuples, src_grid: ArrayGrid, dst_index_list, src_index_list ): # TODO(hme): Keep this function here until it's needed for greater support of # selection/assignment operations. # Block grid entries needed to write to given dst_slice_selection. src_blocks = {} dst_slice_np = np.array(dst_slice_tuples).T dst_index_arr = np.array(dst_index_list) src_index_arr = np.array(src_index_list) # Pick the smallest type to represent indices. # A set of these indices may be transmitted over the network, # so we want to pick the smallest encoding possible. index_types = [ (2 ** 8, np.uint8), (2 ** 16, np.uint16), (2 ** 32, np.uint32), (2 ** 64, np.uint64), ] index_type = None for bound, curr_index_type in index_types: if np.all(np.array(src_grid.block_shape) < bound) and np.all( dst_slice_np[1] < bound ): index_type = curr_index_type break if index_type is None: raise Exception("Unable to encode block indices, blocks are too large.") for grid_entry in src_grid.get_entry_iterator(): src_slice_np = np.array(src_grid.get_slice_tuples(grid_entry)).T index_pairs = [] for i in range(src_index_arr.shape[0]): src_index = src_index_arr[i] dst_index = dst_index_arr[i] if np.all( (src_slice_np[0] <= src_index) & (src_index < src_slice_np[1]) ): index_pair = ( (dst_index - dst_slice_np[0]).astype(index_type), (src_index - src_slice_np[0]).astype(index_type), ) index_pairs.append(index_pair) if len(index_pairs) > 0: src_blocks[grid_entry] = index_pairs return src_blocks def _arbitrary_reshape(self, arr: BlockArray, shape, block_shape) -> BlockArray: # This is the worst-case scenario. # Generate index mappings per block, and group source indices to minimize # RPCs and generation of new objects. cm = arr.cm dst_arr = BlockArray.empty( shape=shape, block_shape=block_shape, dtype=arr.dtype, cm=cm ) for dst_grid_entry in dst_arr.grid.get_entry_iterator(): dst_block: Block = dst_arr.blocks[dst_grid_entry] dst_slice_selection = dst_arr.grid.get_slice(dst_grid_entry) dst_index_list = array_utils.slice_sel_to_index_list(dst_slice_selection) src_index_list = array_utils.translate_index_list( dst_index_list, shape, arr.shape ) src_blocks = self._group_index_lists_by_block( dst_arr.grid.get_slice_tuples(dst_grid_entry), arr.grid, dst_index_list, src_index_list, ) for src_grid_entry in src_blocks: src_block: Block = arr.blocks[src_grid_entry] index_pairs = src_blocks[src_grid_entry] syskwargs = { "grid_entry": dst_grid_entry, "grid_shape": dst_arr.grid.grid_shape, } dst_block.oid = cm.update_block_by_index( dst_block.oid, src_block.oid, index_pairs, syskwargs=syskwargs ) return dst_arr def _block_shape_reshape(self, arr, block_shape): rarr: BlockArray = BlockArray.empty(arr.shape, block_shape, arr.dtype, arr.cm) for grid_entry in rarr.grid.get_entry_iterator(): grid_entry_slice = rarr.grid.get_slice(grid_entry) # TODO (hme): This could be less costly. rarr[grid_entry_slice] = arr[grid_entry_slice] return rarr def _strip_ones(self, shape): return tuple(filter(lambda x: x != 1, shape)) def _check_positions_ones(self, shape, block_shape): # If a position in the shape is 1, then the corresponding # position in block_shape should also be 1. for i in range(len(shape)): if shape[i] == 1: if shape[i] != block_shape[i]: return False return True def _is_simple_reshape(self, arr: BlockArray, shape, block_shape): # Is the reshape a difference of factors of 1? # Strip out 1s and compare. # If a position in the shape is 1, then the corresponding # position in block_shape should also be 1. # If source shape and dest shape are the same or source block_shape and dest block_shape # are same, this is not a simple reshape. if shape == arr.shape or block_shape == arr.block_shape: return False # Checks if source shape and dest shape are same & source block_shape and dest # block_shape are same after stripping ones. if not ( self._strip_ones(shape) == self._strip_ones(arr.shape) and self._strip_ones(block_shape) == self._strip_ones(arr.block_shape) ): return False if not self._check_positions_ones(shape, block_shape): return False return True def _simple_reshape(self, arr, shape, block_shape): # Reshape the array of blocks only. # This is only used when the difference in shape are factors of 1s, # and the ordering of other factors are maintained. # Check assumptions. assert len(self._strip_ones(arr.shape)) == len(self._strip_ones(shape)) # Create new grid, and perform reshape on blocks # to simplify access to source blocks. grid = ArrayGrid(shape, block_shape, dtype=arr.dtype.__name__) src_blocks = arr.blocks.reshape(grid.grid_shape) rarr = BlockArray(grid, arr.cm) for grid_entry in grid.get_entry_iterator(): src_block: Block = src_blocks[grid_entry] dst_block: Block = rarr.blocks[grid_entry] syskwargs = {"grid_entry": grid_entry, "grid_shape": grid.grid_shape} dst_block.oid = arr.cm.reshape( src_block.oid, dst_block.shape, syskwargs=syskwargs ) return rarr def _validate(self, arr, shape, block_shape): assert -1 not in shape assert -1 not in block_shape assert len(shape) == len(block_shape) assert np.product(arr.shape) == np.product(shape) def __call__(self, arr: BlockArray, shape, block_shape): self._validate(arr, shape, block_shape) if arr.shape == shape and arr.block_shape == block_shape: return arr elif self._is_simple_reshape(arr, shape, block_shape): return self._simple_reshape(arr, shape, block_shape) elif arr.shape == shape and arr.block_shape != block_shape: return self._block_shape_reshape(arr, block_shape) elif arr.shape != shape and arr.block_shape == block_shape: # Just do full reshape for this case as well. # Though there may be a better solution, we generally expect # the block shape to change with array shape. return self._arbitrary_reshape(arr, shape, block_shape) else: assert arr.shape != shape and arr.block_shape != block_shape return self._arbitrary_reshape(arr, shape, block_shape)