Source code for nums.core.array.base

# Copyright (C) 2020 NumS Development Team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import numpy as np

from nums.core.array import utils as array_utils
from nums.core.compute.compute_manager import ComputeManager
from nums.core.grid.grid import ArrayGrid


[docs]class Block: # pylint: disable=redefined-builtin, global-statement # TODO(hme): Create a base class, and move this concrete class into blockarray.py. # Do this when we implement a SparseBlock object. block_id_counter = -1 def __init__( self, grid_entry, grid_shape, rect, shape, dtype, transposed, cm: ComputeManager, id=None, ): self._cm = cm self.grid_entry: tuple = grid_entry self.grid_shape: tuple = grid_shape self.rect: list = rect self.oid: np.object = None self.shape: tuple = shape self.dtype = dtype self.num_dims = len(self.rect) self.transposed = transposed self.id = id if self.id is None: Block.block_id_counter += 1 self.id = Block.block_id_counter # Set if a device id was used to compute this block. self.device_id = None def __repr__(self): return "Block(" + str(self.oid) + ")"
[docs] def size(self): return np.product(self.shape)
[docs] def copy(self, shallow=True): assert shallow, "Only shallow copies are currently supported." block = Block( self.grid_entry, self.grid_shape, self.rect, self.shape, self.dtype, self.transposed, self._cm, ) block.oid = self.oid return block
[docs] def true_grid_entry(self): if self.transposed: return tuple(reversed(self.grid_entry)) return self.grid_entry
[docs] def true_grid_shape(self): if self.transposed: return tuple(reversed(self.grid_shape)) return self.grid_shape
[docs] def transpose(self, defer=False, redistribute=False): # If defer is True, this operation does not modify the remote object. # If defer is True and redistribute is False, # this operation does not move the remote object. grid_entryT = tuple(reversed(self.grid_entry)) grid_shapeT = tuple(reversed(self.grid_shape)) rectT = list(reversed(self.rect)) blockT = Block( grid_entry=grid_entryT, grid_shape=grid_shapeT, rect=rectT, shape=tuple(reversed(self.shape)), dtype=self.dtype, transposed=not self.transposed, cm=self._cm, ) blockT.oid = self.oid if not defer: blockT.transposed = False if redistribute: syskwargs = {"grid_entry": grid_entryT, "grid_shape": grid_shapeT} else: syskwargs = { "grid_entry": self.grid_entry, "grid_shape": self.grid_shape, } blockT.oid = self._cm.transpose(self.oid, syskwargs=syskwargs) return blockT
[docs] def swapaxes(self, axis1, axis2): block = self.copy() grid_entry = list(block.grid_entry) grid_shape = list(block.grid_shape) shape = list(block.shape) rect = block.rect grid_entry[axis1], grid_entry[axis2] = grid_entry[axis2], grid_entry[axis1] grid_shape[axis1], grid_shape[axis2] = grid_shape[axis2], grid_shape[axis1] shape[axis1], shape[axis2] = shape[axis2], shape[axis1] rect[axis1], rect[axis2] = rect[axis2], rect[axis1] block.grid_entry = tuple(grid_entry) block.grid_shape = tuple(grid_shape) block.shape = tuple(shape) block.rect = rect block.oid = self._cm.swapaxes( block.oid, axis1, axis2, syskwargs={"grid_entry": block.grid_entry, "grid_shape": block.grid_shape}, ) return block
[docs] def ufunc(self, op_name, device_id=None): return self.uop_map(op_name, device_id=device_id)
[docs] def uop_map(self, op_name, args=None, kwargs=None, device_id=None): # This retains transpose. block = self.copy() block.dtype = array_utils.get_uop_output_type(op_name, self.dtype) args = () if args is None else args kwargs = {} if kwargs is None else kwargs if device_id is None: syskwargs = {"grid_entry": block.grid_entry, "grid_shape": block.grid_shape} else: syskwargs = {"device_id": device_id} block.device_id = device_id block.oid = self._cm.map_uop( op_name, self.oid, args, kwargs, syskwargs=syskwargs ) return block
def _block_from_other(self, other): # Assume other is numeric. # This only occurs during some numpy operations (e.g. np.mean), # where a literal is used in the operation. assert isinstance(other, (int, float, np.int, np.float)) block = Block( self.grid_entry, self.grid_shape, [(0, 1)], (1,), self.dtype, False, self._cm, ) # We pass syskwargs here for correct node placement for `other`, # which should be local to self. block.oid = self._cm.put( np.array(other, dtype=self.dtype), syskwargs={ "grid_entry": self.grid_entry, "grid_shape": self.grid_shape, }, ) return block
[docs] def bop(self, op, other, args: dict, device_id=None): if not isinstance(other, Block): other = self._block_from_other(other) if op == "tensordot": axes = args["axes"] result_grid_entry = tuple( list(self.grid_entry[:-axes]) + list(other.grid_entry[axes:]) ) result_grid_shape = tuple( list(self.grid_shape[:-axes]) + list(other.grid_shape[axes:]) ) result_rect = list(self.rect[:-axes] + other.rect[axes:]) result_shape = tuple(list(self.shape[:-axes]) + list(other.shape[axes:])) else: # Broadcasting starts from trailing dimensions. # Resulting shape is max of trailing shapes result_grid_entry = [] result_grid_shape = [] result_rect = [] result_shape = [] for i in range(1, max(self.num_dims, other.num_dims) + 1): other_i = other.num_dims - i self_i = self.num_dims - i if other_i < 0: is_self = True elif self_i < 0: is_self = False else: is_self = other.shape[other_i] < self.shape[self_i] if is_self: result_grid_entry.append(self.grid_entry[self_i]) result_grid_shape.append(self.grid_shape[self_i]) result_rect.append(self.rect[self_i]) result_shape.append(self.shape[self_i]) else: result_grid_entry.append(other.grid_entry[other_i]) result_grid_shape.append(other.grid_shape[other_i]) result_rect.append(other.rect[other_i]) result_shape.append(other.shape[other_i]) result_grid_entry = tuple(reversed(result_grid_entry)) result_grid_shape = tuple(reversed(result_grid_shape)) result_rect = list(reversed(result_rect)) result_shape = tuple(reversed(result_shape)) dtype = array_utils.get_bop_output_type(op, self.dtype, other.dtype) block = Block( grid_entry=result_grid_entry, grid_shape=result_grid_shape, rect=result_rect, shape=result_shape, dtype=dtype, transposed=False, cm=self._cm, ) if device_id is None: syskwargs = {"grid_entry": block.grid_entry, "grid_shape": block.grid_shape} else: syskwargs = {"device_id": device_id} block.device_id = device_id block.oid = self._cm.bop( op, self.oid, other.oid, self.transposed, other.transposed, axes=args.get("axes"), syskwargs=syskwargs, ) return block
[docs] def tensordot(self, other, axes): return self.bop("tensordot", other, args={"axes": axes})
def __add__(self, other): return self.bop("add", other, args={}) def __sub__(self, other): return self.bop("sub", other, args={}) def __mul__(self, other): return self.bop("mul", other, args={}) def __matmul__(self, other): return self.tensordot(other, axes=1) def __truediv__(self, other): return self.bop("truediv", other, args={}) def __pow__(self, other): return self.bop("pow", other, args={}) def __ge__(self, other): return self.bop("ge", other, args={}) def __gt__(self, other): return self.bop("gt", other, args={}) def __le__(self, other): return self.bop("le", other, args={}) def __lt__(self, other): return self.bop("lt", other, args={}) def __eq__(self, other): return self.bop("eq", other, args={}) def __ne__(self, other): return self.bop("ne", other, args={}) __iadd__ = __add__ __isub__ = __sub__ __imul__ = __mul__ __imatmul__ = __matmul__ __itruediv__ = __truediv__ __ipow__ = __pow__
[docs] def astype(self, dtype): block = self.copy() block.dtype = dtype block.oid = self._cm.astype( self.oid, dtype.__name__, syskwargs={"grid_entry": block.grid_entry, "grid_shape": block.grid_shape}, ) return block
[docs] def conjugate(self): return self.ufunc("conjugate")
[docs] def sqrt(self): return self.ufunc("sqrt")
[docs] def get(self): return self._cm.get(self.oid)
[docs]class BlockArrayBase: def __init__(self, grid: ArrayGrid, cm: ComputeManager, blocks: np.ndarray = None): self.grid = grid self.cm = cm self.shape = self.grid.shape self.block_shape = self.grid.block_shape self.grid_shape = self.grid.grid_shape self.size = np.product(self.shape) self.ndim = len(self.shape) self.dtype = self.grid.dtype try: self.nbytes = self.grid.nbytes() except ValueError as _: self.nbytes = None self.blocks = blocks if self.blocks is None: # TODO (hme): Subclass np.ndarray for self.blocks instances, # and override key methods to better integrate with NumPy's ufuncs. self.blocks = np.empty(shape=self.grid.grid_shape, dtype=Block) for grid_entry in self.grid.get_entry_iterator(): self.blocks[grid_entry] = Block( grid_entry=grid_entry, grid_shape=self.grid.grid_shape, rect=self.grid.get_slice_tuples(grid_entry), shape=self.grid.get_block_shape(grid_entry), dtype=self.dtype, transposed=False, cm=self.cm, ) def __repr__(self): return "BlockArray(" + str(self.blocks) + ")"
[docs] def get(self) -> np.ndarray: result: np.ndarray = np.zeros(shape=self.grid.shape, dtype=self.grid.dtype) block_shape: np.ndarray = np.array(self.grid.block_shape, dtype=np.int) arrays: list = self.cm.get( [ self.blocks[grid_entry].oid for grid_entry in self.grid.get_entry_iterator() ] ) for block_index, grid_entry in enumerate(self.grid.get_entry_iterator()): start = block_shape * grid_entry entry_shape = np.array(self.grid.get_block_shape(grid_entry), dtype=np.int) end = start + entry_shape slices = tuple(map(lambda item: slice(*item), zip(*(start, end)))) block: Block = self.blocks[grid_entry] arr: np.ndarray = arrays[block_index] if block.transposed: arr = arr.T result[slices] = arr.reshape(block.shape) return result
[docs] def broadcast_to(self, shape): b = array_utils.broadcast(self.shape, shape) result_block_shape = array_utils.broadcast_block_shape( self.shape, shape, self.block_shape ) result: BlockArrayBase = BlockArrayBase( ArrayGrid(b.shape, result_block_shape, self.grid.dtype.__name__), self.cm ) extras = [] # Below taken directly from _broadcast_to in numpy's stride_tricks.py. it = np.nditer( (self.blocks,), flags=["multi_index", "refs_ok", "zerosize_ok"] + extras, op_flags=["readonly"], itershape=result.grid.grid_shape, order="C", ) with it: # never really has writebackifcopy semantics broadcast = it.itviews[0] result.blocks = broadcast return result