diff --git a/doc/source/user_guide/window.rst b/doc/source/user_guide/window.rst index f7e219ab23e39..0d6dcaa3726e6 100644 --- a/doc/source/user_guide/window.rst +++ b/doc/source/user_guide/window.rst @@ -37,14 +37,14 @@ pandas supports 4 types of windowing operations: #. Expanding window: Accumulating window over the values. #. Exponentially Weighted window: Accumulating and exponentially weighted window over the values. -============================= ================= =========================== =========================== ======================== =================================== -Concept Method Returned Object Supports time-based windows Supports chained groupby Supports table method -============================= ================= =========================== =========================== ======================== =================================== -Rolling window ``rolling`` ``Rolling`` Yes Yes Yes (as of version 1.3) -Weighted window ``rolling`` ``Window`` No No No -Expanding window ``expanding`` ``Expanding`` No Yes Yes (as of version 1.3) -Exponentially Weighted window ``ewm`` ``ExponentialMovingWindow`` No Yes (as of version 1.2) No -============================= ================= =========================== =========================== ======================== =================================== +============================= ================= =========================== =========================== ======================== =================================== =========================== +Concept Method Returned Object Supports time-based windows Supports chained groupby Supports table method Supports online operations +============================= ================= =========================== =========================== ======================== =================================== =========================== +Rolling window ``rolling`` ``Rolling`` Yes Yes Yes (as of version 1.3) No +Weighted window ``rolling`` ``Window`` No No No No +Expanding window ``expanding`` ``Expanding`` No Yes Yes (as of version 1.3) No +Exponentially Weighted window ``ewm`` ``ExponentialMovingWindow`` No Yes (as of version 1.2) No Yes (as of version 1.3) +============================= ================= =========================== =========================== ======================== =================================== =========================== As noted above, some operations support specifying a window based on a time offset: @@ -98,6 +98,26 @@ be calculated with :meth:`~Rolling.apply` by specifying a separate column of wei df = pd.DataFrame([[1, 2, 0.6], [2, 3, 0.4], [3, 4, 0.2], [4, 5, 0.7]]) df.rolling(2, method="table", min_periods=0).apply(weighted_mean, raw=True, engine="numba") # noqa:E501 +.. versionadded:: 1.3 + +Some windowing operations also support an ``online`` method after constructing a windowing object +which returns a new object that supports passing in new :class:`DataFrame` or :class:`Series` objects +to continue the windowing calculation with the new values (i.e. online calculations). + +The methods on this new windowing objects must call the aggregation method first to "prime" the initial +state of the online calculation. Then, new :class:`DataFrame` or :class:`Series` objects can be passed in +the ``update`` argument to continue the windowing calculation. + +.. ipython:: python + + df = pd.DataFrame([[1, 2, 0.6], [2, 3, 0.4], [3, 4, 0.2], [4, 5, 0.7]]) + df.ewm(0.5).mean() + +.. ipython:: python + + online_ewm = df.head(2).ewm(0.5).online() + online_ewm.mean() + online_ewm.mean(update=df.tail(1)) All windowing operations support a ``min_periods`` argument that dictates the minimum amount of non-``np.nan`` values a window must have; otherwise, the resulting value is ``np.nan``. diff --git a/doc/source/whatsnew/v1.3.0.rst b/doc/source/whatsnew/v1.3.0.rst index e2b923812a211..dd95f9088e3da 100644 --- a/doc/source/whatsnew/v1.3.0.rst +++ b/doc/source/whatsnew/v1.3.0.rst @@ -239,7 +239,8 @@ For example: Other enhancements ^^^^^^^^^^^^^^^^^^ -- :meth:`DataFrame.rolling`, :meth:`Series.rolling`, :meth:`DataFrame.expanding`, and :meth:`Series.expanding` now support a ``method`` argument with a ``'table'`` option that performs the windowing operation over an entire DataFrame. See :ref:`Window Overview ` for performance and functional benefits (:issue:`15095`, :issue:`38995`) +- :meth:`DataFrame.rolling`, :meth:`Series.rolling`, :meth:`DataFrame.expanding`, and :meth:`Series.expanding` now support a ``method`` argument with a ``'table'`` option that performs the windowing operation over an entire :class:`DataFrame`. See :ref:`Window Overview ` for performance and functional benefits (:issue:`15095`, :issue:`38995`) +- :class:`.ExponentialMovingWindow` now support a ``online`` method that can perform ``mean`` calculations in an online fashion. See :ref:`Window Overview ` (:issue:`41673`) - Added :meth:`MultiIndex.dtypes` (:issue:`37062`) - Added ``end`` and ``end_day`` options for the ``origin`` argument in :meth:`DataFrame.resample` (:issue:`37804`) - Improved error message when ``usecols`` and ``names`` do not match for :func:`read_csv` and ``engine="c"`` (:issue:`29042`) diff --git a/pandas/core/generic.py b/pandas/core/generic.py index 99e4888d08be6..41f77e081c1e9 100644 --- a/pandas/core/generic.py +++ b/pandas/core/generic.py @@ -10893,7 +10893,7 @@ def ewm( span: float | None = None, halflife: float | TimedeltaConvertibleTypes | None = None, alpha: float | None = None, - min_periods: int = 0, + min_periods: int | None = 0, adjust: bool_t = True, ignore_na: bool_t = False, axis: Axis = 0, diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index 4187c56079060..c1d532d94eb83 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -41,6 +41,10 @@ GroupbyIndexer, ) from pandas.core.window.numba_ import generate_numba_ewma_func +from pandas.core.window.online import ( + EWMMeanState, + generate_online_numba_ewma_func, +) from pandas.core.window.rolling import ( BaseWindow, BaseWindowGroupby, @@ -263,7 +267,7 @@ def __init__( span: float | None = None, halflife: float | TimedeltaConvertibleTypes | None = None, alpha: float | None = None, - min_periods: int = 0, + min_periods: int | None = 0, adjust: bool = True, ignore_na: bool = False, axis: Axis = 0, @@ -273,7 +277,7 @@ def __init__( ): super().__init__( obj=obj, - min_periods=max(int(min_periods), 1), + min_periods=1 if min_periods is None else max(int(min_periods), 1), on=None, center=False, closed=None, @@ -338,6 +342,48 @@ def _get_window_indexer(self) -> BaseIndexer: """ return ExponentialMovingWindowIndexer() + def online(self, engine="numba", engine_kwargs=None): + """ + Return an ``OnlineExponentialMovingWindow`` object to calculate + exponentially moving window aggregations in an online method. + + .. versionadded:: 1.3.0 + + Parameters + ---------- + engine: str, default ``'numba'`` + Execution engine to calculate online aggregations. + Applies to all supported aggregation methods. + + engine_kwargs : dict, default None + Applies to all supported aggregation methods. + + * For ``'numba'`` engine, the engine can accept ``nopython``, ``nogil`` + and ``parallel`` dictionary keys. The values must either be ``True`` or + ``False``. The default ``engine_kwargs`` for the ``'numba'`` engine is + ``{{'nopython': True, 'nogil': False, 'parallel': False}}`` and will be + applied to the function + + Returns + ------- + OnlineExponentialMovingWindow + """ + return OnlineExponentialMovingWindow( + obj=self.obj, + com=self.com, + span=self.span, + halflife=self.halflife, + alpha=self.alpha, + min_periods=self.min_periods, + adjust=self.adjust, + ignore_na=self.ignore_na, + axis=self.axis, + times=self.times, + engine=engine, + engine_kwargs=engine_kwargs, + selection=self._selection, + ) + @doc( _shared_docs["aggregate"], see_also=dedent( @@ -655,3 +701,167 @@ def _get_window_indexer(self) -> GroupbyIndexer: window_indexer=ExponentialMovingWindowIndexer, ) return window_indexer + + +class OnlineExponentialMovingWindow(ExponentialMovingWindow): + def __init__( + self, + obj: FrameOrSeries, + com: float | None = None, + span: float | None = None, + halflife: float | TimedeltaConvertibleTypes | None = None, + alpha: float | None = None, + min_periods: int | None = 0, + adjust: bool = True, + ignore_na: bool = False, + axis: Axis = 0, + times: str | np.ndarray | FrameOrSeries | None = None, + engine: str = "numba", + engine_kwargs: dict[str, bool] | None = None, + *, + selection=None, + ): + if times is not None: + raise NotImplementedError( + "times is not implemented with online operations." + ) + super().__init__( + obj=obj, + com=com, + span=span, + halflife=halflife, + alpha=alpha, + min_periods=min_periods, + adjust=adjust, + ignore_na=ignore_na, + axis=axis, + times=times, + selection=selection, + ) + self._mean = EWMMeanState( + self._com, self.adjust, self.ignore_na, self.axis, obj.shape + ) + if maybe_use_numba(engine): + self.engine = engine + self.engine_kwargs = engine_kwargs + else: + raise ValueError("'numba' is the only supported engine") + + def reset(self): + """ + Reset the state captured by `update` calls. + """ + self._mean.reset() + + def aggregate(self, func, *args, **kwargs): + return NotImplementedError + + def std(self, bias: bool = False, *args, **kwargs): + return NotImplementedError + + def corr( + self, + other: FrameOrSeriesUnion | None = None, + pairwise: bool | None = None, + **kwargs, + ): + return NotImplementedError + + def cov( + self, + other: FrameOrSeriesUnion | None = None, + pairwise: bool | None = None, + bias: bool = False, + **kwargs, + ): + return NotImplementedError + + def var(self, bias: bool = False, *args, **kwargs): + return NotImplementedError + + def mean(self, *args, update=None, update_times=None, **kwargs): + """ + Calculate an online exponentially weighted mean. + + Parameters + ---------- + update: DataFrame or Series, default None + New values to continue calculating the + exponentially weighted mean from the last values and weights. + Values should be float64 dtype. + + ``update`` needs to be ``None`` the first time the + exponentially weighted mean is calculated. + + update_times: Series or 1-D np.ndarray, default None + New times to continue calculating the + exponentially weighted mean from the last values and weights. + If ``None``, values are assumed to be evenly spaced + in time. + This feature is currently unsupported. + + Returns + ------- + DataFrame or Series + + Examples + -------- + >>> df = pd.DataFrame({"a": range(5), "b": range(5, 10)}) + >>> online_ewm = df.head(2).ewm(0.5).online() + >>> online_ewm.mean() + a b + 0 0.00 5.00 + 1 0.75 5.75 + >>> online_ewm.mean(update=df.tail(3)) + a b + 2 1.615385 6.615385 + 3 2.550000 7.550000 + 4 3.520661 8.520661 + >>> online_ewm.reset() + >>> online_ewm.mean() + a b + 0 0.00 5.00 + 1 0.75 5.75 + """ + result_kwargs = {} + is_frame = True if self._selected_obj.ndim == 2 else False + if update_times is not None: + raise NotImplementedError("update_times is not implemented.") + else: + update_deltas = np.ones( + max(self._selected_obj.shape[self.axis - 1] - 1, 0), dtype=np.float64 + ) + if update is not None: + if self._mean.last_ewm is None: + raise ValueError( + "Must call mean with update=None first before passing update" + ) + result_from = 1 + result_kwargs["index"] = update.index + if is_frame: + last_value = self._mean.last_ewm[np.newaxis, :] + result_kwargs["columns"] = update.columns + else: + last_value = self._mean.last_ewm + result_kwargs["name"] = update.name + np_array = np.concatenate((last_value, update.to_numpy())) + else: + result_from = 0 + result_kwargs["index"] = self._selected_obj.index + if is_frame: + result_kwargs["columns"] = self._selected_obj.columns + else: + result_kwargs["name"] = self._selected_obj.name + np_array = self._selected_obj.astype(np.float64).to_numpy() + ewma_func = generate_online_numba_ewma_func(self.engine_kwargs) + result = self._mean.run_ewm( + np_array if is_frame else np_array[:, np.newaxis], + update_deltas, + self.min_periods, + ewma_func, + ) + if not is_frame: + result = result.squeeze() + result = result[result_from:] + result = self._selected_obj._constructor(result, **result_kwargs) + return result diff --git a/pandas/core/window/online.py b/pandas/core/window/online.py new file mode 100644 index 0000000000000..5a9e8d65255ae --- /dev/null +++ b/pandas/core/window/online.py @@ -0,0 +1,118 @@ +from typing import ( + Dict, + Optional, +) + +import numpy as np + +from pandas.compat._optional import import_optional_dependency + +from pandas.core.util.numba_ import ( + NUMBA_FUNC_CACHE, + get_jit_arguments, +) + + +def generate_online_numba_ewma_func(engine_kwargs: Optional[Dict[str, bool]]): + """ + Generate a numba jitted groupby ewma function specified by values + from engine_kwargs. + Parameters + ---------- + engine_kwargs : dict + dictionary of arguments to be passed into numba.jit + Returns + ------- + Numba function + """ + nopython, nogil, parallel = get_jit_arguments(engine_kwargs) + + cache_key = (lambda x: x, "online_ewma") + if cache_key in NUMBA_FUNC_CACHE: + return NUMBA_FUNC_CACHE[cache_key] + + numba = import_optional_dependency("numba") + + @numba.jit(nopython=nopython, nogil=nogil, parallel=parallel) + def online_ewma( + values: np.ndarray, + deltas: np.ndarray, + minimum_periods: int, + old_wt_factor: float, + new_wt: float, + old_wt: np.ndarray, + adjust: bool, + ignore_na: bool, + ): + """ + Compute online exponentially weighted mean per column over 2D values. + + Takes the first observation as is, then computes the subsequent + exponentially weighted mean accounting minimum periods. + """ + result = np.empty(values.shape) + weighted_avg = values[0] + nobs = (~np.isnan(weighted_avg)).astype(np.int64) + result[0] = np.where(nobs >= minimum_periods, weighted_avg, np.nan) + + for i in range(1, len(values)): + cur = values[i] + is_observations = ~np.isnan(cur) + nobs += is_observations.astype(np.int64) + for j in numba.prange(len(cur)): + if not np.isnan(weighted_avg[j]): + if is_observations[j] or not ignore_na: + + # note that len(deltas) = len(vals) - 1 and deltas[i] is to be + # used in conjunction with vals[i+1] + old_wt[j] *= old_wt_factor ** deltas[j - 1] + if is_observations[j]: + # avoid numerical errors on constant series + if weighted_avg[j] != cur[j]: + weighted_avg[j] = ( + (old_wt[j] * weighted_avg[j]) + (new_wt * cur[j]) + ) / (old_wt[j] + new_wt) + if adjust: + old_wt[j] += new_wt + else: + old_wt[j] = 1.0 + elif is_observations[j]: + weighted_avg[j] = cur[j] + + result[i] = np.where(nobs >= minimum_periods, weighted_avg, np.nan) + + return result, old_wt + + return online_ewma + + +class EWMMeanState: + def __init__(self, com, adjust, ignore_na, axis, shape): + alpha = 1.0 / (1.0 + com) + self.axis = axis + self.shape = shape + self.adjust = adjust + self.ignore_na = ignore_na + self.new_wt = 1.0 if adjust else alpha + self.old_wt_factor = 1.0 - alpha + self.old_wt = np.ones(self.shape[self.axis - 1]) + self.last_ewm = None + + def run_ewm(self, weighted_avg, deltas, min_periods, ewm_func): + result, old_wt = ewm_func( + weighted_avg, + deltas, + min_periods, + self.old_wt_factor, + self.new_wt, + self.old_wt, + self.adjust, + self.ignore_na, + ) + self.old_wt = old_wt + self.last_ewm = result[-1] + return result + + def reset(self): + self.old_wt = np.ones(self.shape[self.axis - 1]) + self.last_ewm = None diff --git a/pandas/tests/window/test_online.py b/pandas/tests/window/test_online.py new file mode 100644 index 0000000000000..c7580650926da --- /dev/null +++ b/pandas/tests/window/test_online.py @@ -0,0 +1,90 @@ +import numpy as np +import pytest + +import pandas.util._test_decorators as td + +from pandas import ( + DataFrame, + Series, +) +import pandas._testing as tm + + +@td.skip_if_no("numba", "0.46.0") +@pytest.mark.filterwarnings("ignore:\\nThe keyword argument") +class TestEWM: + def test_invalid_update(self): + df = DataFrame({"a": range(5), "b": range(5)}) + online_ewm = df.head(2).ewm(0.5).online() + with pytest.raises( + ValueError, + match="Must call mean with update=None first before passing update", + ): + online_ewm.mean(update=df.head(1)) + + @pytest.mark.parametrize( + "obj", [DataFrame({"a": range(5), "b": range(5)}), Series(range(5), name="foo")] + ) + def test_online_vs_non_online_mean( + self, obj, nogil, parallel, nopython, adjust, ignore_na + ): + expected = obj.ewm(0.5, adjust=adjust, ignore_na=ignore_na).mean() + engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython} + + online_ewm = ( + obj.head(2) + .ewm(0.5, adjust=adjust, ignore_na=ignore_na) + .online(engine_kwargs=engine_kwargs) + ) + # Test resetting once + for _ in range(2): + result = online_ewm.mean() + tm.assert_equal(result, expected.head(2)) + + result = online_ewm.mean(update=obj.tail(3)) + tm.assert_equal(result, expected.tail(3)) + + online_ewm.reset() + + @pytest.mark.xfail(raises=NotImplementedError) + @pytest.mark.parametrize( + "obj", [DataFrame({"a": range(5), "b": range(5)}), Series(range(5), name="foo")] + ) + def test_update_times_mean( + self, obj, nogil, parallel, nopython, adjust, ignore_na, halflife_with_times + ): + times = Series( + np.array( + ["2020-01-01", "2020-01-05", "2020-01-07", "2020-01-17", "2020-01-21"], + dtype="datetime64", + ) + ) + expected = obj.ewm( + 0.5, + adjust=adjust, + ignore_na=ignore_na, + times=times, + halflife=halflife_with_times, + ).mean() + + engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython} + online_ewm = ( + obj.head(2) + .ewm( + 0.5, + adjust=adjust, + ignore_na=ignore_na, + times=times.head(2), + halflife=halflife_with_times, + ) + .online(engine_kwargs=engine_kwargs) + ) + # Test resetting once + for _ in range(2): + result = online_ewm.mean() + tm.assert_equal(result, expected.head(2)) + + result = online_ewm.mean(update=obj.tail(3), update_times=times.tail(3)) + tm.assert_equal(result, expected.tail(3)) + + online_ewm.reset()