Source code for calcpy._nppd

"""Extensions to numpy and pandas."""
import datetime
from math import inf
from numbers import Number
import sys

import numpy as np
import pandas as pd

from .typing import ListLike, NDFrame


[docs] def overall_equal(loper, roper): """Check whether two operands are exactly equal as a whole. It behaves like ``np.array_equal`` for ``np.ndarray``, and ``loper.equals(roper)`` for ``pd.Series`` and ``pd.DataFrame``. Parameters: loper (number | list | tuple | np.ndarray | pd.Series | pd.DataFrame): roper (number | list | tuple | np.ndarray | pd.Series | pd.DataFrame): Returns: bool: Examples: Compare lists. >>> overall_equal([1, 2, 3], [1, 2, 3]) True Compare ``pd.DataFrame``. >>> import pandas as pd >>> df = pd.DataFrame({"A": 1, "B": 2}, index=[0]) >>> overall_equal(df, df+0) True >>> overall_equal(df, df+1) False >>> overall_equal(df, df.iloc[:, 0]) False >>> overall_equal(df["A"], df["B"]-1) True """ if not isinstance(loper, type(roper)): return False if isinstance(loper, NDFrame): return loper.equals(roper) if isinstance(loper, np.ndarray): return np.array_equal(loper, roper) return loper == roper
[docs] def shape(arg): """Get the shape of an argument. Parameters: arg Returns: tuple: Examples: >>> shape(1) () >>> shape([1, 2, 3]) (3,) >>> shape(np.array([1, 2, 3])) (3,) >>> shape(pd.Series([1, 2, 3])) (3,) >>> shape(pd.DataFrame({"A": 1, "B": 2}, index=[0])) (1, 2) """ if hasattr(arg, "shape"): return arg.shape if hasattr(arg, "__len__"): return (len(arg),) return ()
[docs] def ndim(arg): """Get the number of dimensions of an argument. Parameters: arg Returns: int: Examples: >>> ndim(1) 0 >>> ndim([1, 2, 3]) 1 >>> ndim(np.array([1, 2, 3])) 1 >>> ndim(pd.Series([1, 2, 3])) 1 >>> ndim(pd.DataFrame({"A": 1, "B": 2}, index=[0])) 2 """ if hasattr(arg, "ndim"): return arg.ndim if hasattr(arg, "__len__"): return 1 return 0
[docs] def size(arg): """Get the size of an argument. Parameters: arg Returns: int: Examples: >>> size(1) 1 >>> size([1, 2, 3]) 3 >>> size(np.array([1, 2, 3])) 3 >>> size(pd.Series([1, 2, 3])) 3 >>> size(pd.DataFrame({"A": 1, "B": 2}, index=[0])) 2 """ if hasattr(arg, "size"): return arg.size if hasattr(arg, "__len__"): return len(arg) return 1
[docs] def empty(arg): """Check whether it is empty. Parameters: arg: Returns: bool: Examples: >>> empty(1) False >>> empty([]) True >>> empty([1, 2, 3]) False >>> empty(np.array([1, 2, 3])) False >>> empty(pd.Series([1, 2, 3])) False >>> empty(pd.DataFrame({"A": 1, "B": 2}, index=[0])) False """ return size(arg) == 0
[docs] def full_like(template, fill_value, **kwargs): """Create a np.array or pd.Series or pd.DataFrame with the same shape as template. Parameters: template (list | tuple | np.ndarray | pd.Series | pd.DataFrame): fill_value : Value to populate. **kwargs: Keyword arguments for ``np.full_alike()``, ``pd.Series()``, or ``pd.DataFrame()``. Returns: list | tuple | np.ndarray | pd.Series | pd.DataFrame: Raises: TypeError: Examples: Create list and tuple. >>> full_like([1, 2, 3], 0) [0, 0, 0] >>> full_like((1, 2, 3), 0) (0, 0, 0) Create ``np.array``. >>> full_like(np.array([1, 2, 3]), 0) array([0, 0, 0]) Create ``pd.Series`` and ``pd.DataFrame``. >>> full_like(pd.Series([1, 2, 3]), 0) 0 0 1 0 2 0 dtype: int64 >>> full_like(pd.DataFrame({"A": 1, "B": 2}, index=[0]), 0) A B 0 0 0 """ if isinstance(template, (list, tuple)): if size(fill_value) == 1: values = [fill_value] * size(template) else: values = fill_value return type(template)(values) if isinstance(template, np.ndarray): return np.full_like(template, fill_value, **kwargs) if isinstance(template, pd.Series): return pd.Series(fill_value, index=template.index, name=template.name, **kwargs) if isinstance(template, pd.DataFrame): return pd.DataFrame(fill_value, index=template.index, columns=template.columns, **kwargs) raise ValueError(f"Unknown template types {type(template)}.")
[docs] def broadcast_first(fun): """Decorator for supporting ``np.ndarray``, ``pd.Series``, and ``pd.DataFrame``. Parameters: fun (callable): Callable that applies to a single element in its first argument. Returns: callable: Callable that applies to a single element or a ``list``, ``tuple``, ``np.ndarray``, ``pd.Series``, or ``pd.DataFrame``. Examples: >>> @broadcast_first ... def add(x, y): ... return x + y >>> add(1, 2) 3 >>> add([1, 2, 3], 2) [3, 4, 5] >>> add(np.array([1, 2, 3]), 2) array([3, 4, 5]) >>> add(pd.Series([1, 2, 3]), 2) 0 3 1 4 2 5 dtype: int64 >>> add(pd.DataFrame({"A": 1, "B": 2}, index=[0]), 2) A B 0 3 4 """ def f(value, *args, **kwargs): def f0(arg): return fun(arg, *args, **kwargs) if isinstance(value, ListLike): return type(value)(f0(e) for e in value) if isinstance(value, np.ndarray): return np.vectorize(f0)(value) if isinstance(value, pd.Series): return value.apply(f0) if isinstance(value, pd.DataFrame): if hasattr(value, "map"): return value.map(f0) # pandas>=2.1.0 else: return value.applymap(f0) # pandas<2.1.0 return f0(value) return f
def _fetch_indices(index): """Auxiliary function for map""" if isinstance(index, pd.DatetimeIndex): results = list(index) # avoid conversion to int else: results = index.values return results
[docs] def mapi(inputs, f): """Apply a function on every input element and its index. Parameters: inputs (list | tuple | np.ndarray | pd.Series | pd.DataFrame): Input data to transform f (callable): Transformation function with some positional arguments: - For list and tuple: f(value, index) -> new_value - For ndarray: f(value, index_0, index_1, ..., index_(ndim-1)) - For DataFrame: f(value, index, column) -> new_value - For Series: f(value, index, name) -> new_value Returns: list | tuple | np.ndarray | pd.Series | pd.DataFrame: Transformed data with same shape/index/columns Examples: Transform a list. >>> def printall(*args): ... return ":".join(str(arg) for arg in args) >>> mapi([1, 2, 3], printall) ['1:0', '2:1', '3:2'] Transform a ndarray. >>> from calcpy import add >>> a = np.ones(shape=(2, 3, 4)) >>> mapi(a, add) array([[[1., 2., 3., 4.], [2., 3., 4., 5.], [3., 4., 5., 6.]], [[2., 3., 4., 5.], [3., 4., 5., 6.], [4., 5., 6., 7.]]]) Transform a Series. >>> s = pd.Series("value", index=range(3)) >>> mapi(s, printall) 0 value:0:None 1 value:1:None 2 value:2:None dtype: object Transform a Series with datetime index and Series name. >>> tindex = pd.date_range("2000-01-01", "2000-01-03") >>> s = pd.Series("value", index=tindex, name="name") >>> mapi(s, printall) 2000-01-01 value:2000-01-01 00:00:00:name 2000-01-02 value:2000-01-02 00:00:00:name 2000-01-03 value:2000-01-03 00:00:00:name Freq: D, Name: name, dtype: object Transform a Series with multi-level index. >>> mindex = pd.DataFrame({"app": "X", "date": tindex}).set_index(["app", "date"]).index >>> s = pd.Series("value", index=mindex) >>> mapi(s, printall) app date X 2000-01-01 value:('X', Timestamp('2000-01-01 00:00:00')):... 2000-01-02 value:('X', Timestamp('2000-01-02 00:00:00')):... 2000-01-03 value:('X', Timestamp('2000-01-03 00:00:00')):... dtype: object Transform a Series to another datatype >>> def sumlen(*args): ... return sum(len(arg) for arg in args) >>> s = pd.Series("value", index=["a", "b"], name="name") >>> mapi(s, sumlen) # doctest: +ELLIPSIS a 10 b 10 Name: name, dtype: int... Transform a DataFrame. >>> df = pd.DataFrame('hello', index=range(4), columns=range(3)) >>> mapi(df, printall) 0 1 2 0 hello:0:0 hello:0:1 hello:0:2 1 hello:1:0 hello:1:1 hello:1:2 2 hello:2:0 hello:2:1 hello:2:2 3 hello:3:0 hello:3:1 hello:3:2 Transform a DataFrame with multi-level index. >>> df = pd.DataFrame("value", index=mindex, columns=["a"]) >>> print(mapi(df, printall)) a app date X 2000-01-01 value:('X', Timestamp('2000-01-01 00:00:00')):a 2000-01-02 value:('X', Timestamp('2000-01-02 00:00:00')):a 2000-01-03 value:('X', Timestamp('2000-01-03 00:00:00')):a Handle empty input. >>> s = pd.Series(dtype=object, name='empty') >>> mapi(s, printall) Series([], Name: empty, dtype: object) Create a DataFrame whose elements are all the same as index. >>> from calcpy import arggetter >>> index = pd.date_range("2000-01-01", "2000-01-03") >>> df = pd.DataFrame(index=index, columns=["A", "B"]) >>> mapi(df, arggetter(1)) A B 2000-01-01 2000-01-01 2000-01-01 2000-01-02 2000-01-02 2000-01-02 2000-01-03 2000-01-03 2000-01-03 """ if empty(inputs): return full_like(inputs, None) if isinstance(inputs, (list, tuple)): results = type(inputs)([f(inp, idx) for idx, inp in enumerate(inputs)]) elif isinstance(inputs, (np.ndarray,) + NDFrame): vf = np.vectorize(f) if isinstance(inputs, np.ndarray): indices = np.meshgrid(*[range(s) for s in inputs.shape], indexing='ij') results = vf(inputs, *indices) elif isinstance(inputs, NDFrame): if isinstance(inputs, pd.Series): indices = _fetch_indices(inputs.index) result_data = vf(inputs.values, indices, inputs.name) elif isinstance(inputs, pd.DataFrame): idxs = _fetch_indices(inputs.index) cols = _fetch_indices(inputs.columns) indices, columns = np.meshgrid(idxs, cols, indexing='ij') result_data = vf(inputs.values, indices, columns) results = full_like(inputs, result_data) else: results = f(inputs) return results
def _minmaxvalue(dtype, *ignored_args): """Auxiliary function to return the min and max values for a given dtype.""" if dtype is int: return -sys.maxsize-1, sys.maxsize if dtype is float: return -inf, inf if dtype is bool: return False, True for supertype in [np.datetime64, np.timedelta64]: # need to be checked before np.integer if np.issubdtype(dtype, supertype): unit = np.datetime_data(dtype)[0] # fetch time unit such as 'ns','us','s', or 'generic' if unit == "generic": unit = "ns" iinfo = np.iinfo(np.int64) return supertype(iinfo.min+1, unit), supertype(iinfo.max, unit) if np.issubdtype(dtype, np.integer): iinfo = np.iinfo(dtype) return iinfo.min, iinfo.max if np.issubdtype(dtype, np.floating): finfo = np.finfo(dtype) return finfo.min, finfo.max if hasattr(dtype, "max"): # such as datetime.datetime, datetime.date, datetime.time, pd.Timestamp, pd.Timedelta return dtype.min, dtype.max raise TypeError(f"Unsupported type: {dtype}") def maxvalue(dtype): """Get the max value of a type. Parameters: dtype (type): Type to get max value Returns: Max value of the type Examples: >>> maxvalue(int) 9223372036854775807 >>> maxvalue(float) inf >>> maxvalue(bool) True >>> import datetime >>> maxvalue(datetime.datetime) datetime.datetime(9999, 12, 31, 23, 59, 59, 999999) >>> maxvalue(datetime.date) datetime.date(9999, 12, 31) >>> maxvalue(datetime.time) datetime.time(23, 59, 59, 999999) >>> maxvalue(datetime.timedelta) datetime.timedelta(days=999999999, seconds=86399, microseconds=999999) >>> maxvalue(np.int64) 9223372036854775807 >>> maxvalue(np.float64) # doctest: +SKIP 1.7976931348623157e+308 >>> maxvalue(np.datetime64) # doctest: +SKIP numpy.datetime64('2262-04-11T23:47:16.854775807') >>> maxvalue(np.timedelta64) # doctest: +SKIP numpy.timedelta64(9223372036854775807,'ns') >>> maxvalue(pd.Timestamp) Timestamp('2262-04-11 23:47:16.854775807') >>> maxvalue(pd.Timedelta) Timedelta('106751 days 23:47:16.854775807') """ _, mx = _minmaxvalue(dtype) return mx def minvalue(dtype): """Get the min value of a type. Parameters: dtype (type): Type to get min value Returns: Min value of the type Examples: >>> minvalue(int) -9223372036854775808 >>> minvalue(float) -inf >>> minvalue(bool) False >>> minvalue(datetime.datetime) datetime.datetime(1, 1, 1, 0, 0) >>> minvalue(datetime.date) datetime.date(1, 1, 1) >>> minvalue(datetime.time) datetime.time(0, 0) >>> minvalue(datetime.timedelta) datetime.timedelta(days=-999999999) >>> minvalue(np.int64) -9223372036854775808 >>> minvalue(np.float64) # doctest: +SKIP -1.7976931348623157e+308 >>> minvalue(np.datetime64) # doctest: +SKIP numpy.datetime64('1677-09-21T00:12:43.145224193') >>> minvalue(np.timedelta64) # doctest: +SKIP numpy.timedelta64(-9223372036854775807,'ns') >>> minvalue(pd.Timestamp) Timestamp('1677-09-21 00:12:43.145224193') >>> minvalue(pd.Timedelta) Timedelta('-106752 days +00:12:43.145224193') """ mn, _ = _minmaxvalue(dtype) return mn def difftype(dtype): """Determine the result type of subtracting two values of the given data type. This function handles Python built-in types, NumPy types, and Pandas-compatible types. For datetime types, returns the corresponding timedelta type. For numeric and timedelta types, returns the input type itself. Psarameters: dtype: A type object (e.g., datetime.datetime, np.int64, np.dtype('datetime64[ns]')) Returns: type: The resulting type of the subtraction operation Raises: NotImplementedError: If the input type doesn't support subtraction Examples: Python built-in types >>> difftype(int) <class 'int'> >>> difftype(float) <class 'float'> >>> import datetime >>> difftype(datetime.datetime) <class 'datetime.timedelta'> >>> difftype(datetime.timedelta) <class 'datetime.timedelta'> NumPy scalar types >>> import numpy as np >>> difftype(np.int32) <class 'numpy.int32'> >>> difftype(np.float64) <class 'numpy.float64'> >>> difftype(np.datetime64) <class 'numpy.timedelta64'> >>> difftype(np.timedelta64) <class 'numpy.timedelta64'> NumPy dtype objects >>> difftype(np.dtype('datetime64[ns]')) dtype('<m8[ns]') >>> difftype(np.dtype('timedelta64[ns]')) dtype('<m8[ns]') Unsupported types >>> difftype(str) Traceback (most recent call last): ... NotImplementedError: dtype <class 'str'> is not supported for difference operation """ if dtype is datetime.datetime: return datetime.timedelta if dtype is np.datetime64: return np.timedelta64 if isinstance(dtype, np.dtype) and dtype.kind == 'M': # Handle NumPy datetime dtype objects with units # Extract time unit from dtype name (e.g., 'ns' from 'datetime64[ns]') name = dtype.name if '[' in name: unit = name.split('[')[1].split(']')[0] return np.dtype(f'timedelta64[{unit}]') return np.timedelta64 if (isinstance(dtype, type) and issubclass(dtype, Number)) or \ dtype in (datetime.timedelta, np.timedelta64) or \ (isinstance(dtype, np.dtype) and dtype.kind in 'iufcm'): return dtype raise NotImplementedError(f"dtype {dtype} is not supported for difference operation")