Source code for nums.api

# Copyright (C) 2020 NumS Development Team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from typing import Optional

from nums.core.application_manager import instance as _instance
from nums.core.array.blockarray import BlockArray
import nums.core.array.utils as array_utils
from nums.core.array.application import ArrayApplication


[docs]def init( address: Optional[str] = None, num_cpus: Optional[int] = None, cluster_shape: Optional[tuple] = None, ): # pylint: disable = import-outside-toplevel import nums.core.settings as settings if cluster_shape is not None: settings.cluster_shape = cluster_shape settings.num_cpus = num_cpus if address is not None: settings.address = address _instance() return None
[docs]def read(filename: str) -> BlockArray: """ Args: filename: The name of the file to read. This must be the name of an array that was previously written using the nums.write command. Returns: A BlockArray instance. """ if filename.lower().startswith("s3://"): filename = filename.lower().split("s3://")[-1] return _instance().read_s3(filename) else: return _instance().read_fs(filename)
[docs]def write(filename: str, ba: BlockArray) -> BlockArray: """ Args: filename: The name of the file to write. Supports the s3 protocol. ba: The BlockArray instance to write. Returns: A BlockArray indicating the outcome of this operation. """ if filename.lower().startswith("s3://"): filename = filename.lower().split("s3://")[-1] return _instance().write_s3(ba, filename) else: return _instance().write_fs(ba, filename)
[docs]def delete(filename: str) -> bool: """ Args: filename: The name of the file to delete. This must be a file previously written to disk. Returns: A BlockArray indicating the outcome of this operation. """ if filename.lower().startswith("s3://"): filename = filename.lower().split("s3://")[-1] return _instance().delete_s3(filename) else: return _instance().delete_fs(filename)
[docs]def read_csv(filename, dtype=float, delimiter=",", has_header=False) -> BlockArray: """Read a csv text file. Args: filename: The filename of the csv. dtype: The data type of the csv file's entries. delimiter: The value delimiter for each row; usually a comma. has_header: Whether the csv file has a header. The header is discarded. Returns: A BlockArray instance. """ return _instance().read_csv(filename, dtype, delimiter, has_header)
[docs]def from_modin(df): # pylint: disable = import-outside-toplevel, protected-access, unidiomatic-typecheck import numpy as np try: from modin.pandas.dataframe import DataFrame from modin.engines.ray.pandas_on_ray.frame.data import PandasOnRayFrame from modin.engines.ray.pandas_on_ray.frame.partition import ( PandasOnRayFramePartition, ) except Exception as e: raise Exception( "Unable to import modin. Install modin with command 'pip install modin'" ) from e assert isinstance(df, DataFrame), "Unexpected dataframe type %s" % str(type(df)) assert isinstance( df._query_compiler._modin_frame, PandasOnRayFrame ), "Unexpected dataframe type %s" % str(type(df._query_compiler._modin_frame)) frame: PandasOnRayFrame = df._query_compiler._modin_frame app: ArrayApplication = _instance() system = app.cm # Make sure the partitions are numeric. dtype = frame.dtypes[0] if not array_utils.is_supported(dtype, type_test=True): raise TypeError("%s is not supported." % str(dtype)) for dt in frame.dtypes: if dt != dtype: raise TypeError("Mixed types are not supported (%s != %s).") dtype = np.__getattribute__(str(dtype)) # Convert from Pandas to NumPy. pd_parts = frame._partition_mgr_cls.map_partitions( frame._partitions, lambda df: np.array(df) ) grid_shape = len(frame._row_lengths), len(frame._column_widths) shape = (np.sum(frame._row_lengths), np.sum(frame._column_widths)) block_shape = app.get_block_shape(shape, dtype) rows = [] for i in range(grid_shape[0]): cols = [] for j in range(grid_shape[1]): curr_block_shape = (frame._row_lengths[i], frame._column_widths[j]) part: PandasOnRayFramePartition = pd_parts[(i, j)] part.drain_call_queue() ba: BlockArray = BlockArray.from_oid( part.oid, curr_block_shape, dtype, system ) cols.append(ba) if grid_shape[1] == 1: row_ba: BlockArray = cols[0] else: row_ba: BlockArray = app.concatenate( cols, axis=1, axis_block_size=block_shape[1] ) rows.append(row_ba) result = app.concatenate(rows, axis=0, axis_block_size=block_shape[0]) return result