From e195c58a9915ea0fd91e12bd35f8936b6b96cd4b Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Mon, 31 May 2021 15:07:58 -0700 Subject: [PATCH 01/22] Add scaffolding for online EWM --- pandas/core/window/ewm.py | 109 ++++++++++++++++++++++++++++++++++ pandas/core/window/online.py | 112 +++++++++++++++++++++++++++++++++++ 2 files changed, 221 insertions(+) create mode 100644 pandas/core/window/online.py diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index 4187c56079060..087eea6ff3f55 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -15,6 +15,7 @@ FrameOrSeriesUnion, TimedeltaConvertibleTypes, ) +from pandas.compat._optional import import_optional_dependency from pandas.compat.numpy import function as nv from pandas.util._decorators import doc @@ -41,6 +42,10 @@ GroupbyIndexer, ) from pandas.core.window.numba_ import generate_numba_ewma_func +from pandas.core.window.online import ( + EWMeanState, + generate_online_numba_ewma_func, +) from pandas.core.window.rolling import ( BaseWindow, BaseWindowGroupby, @@ -338,6 +343,24 @@ def _get_window_indexer(self) -> BaseIndexer: """ return ExponentialMovingWindowIndexer() + def online(self, engine="numba", engine_kwargs=None): + import_optional_dependency("numba") + return OnlineExponentialMovingWindow( + obj=self.obj, + com=self.com, + span=self.span, + halflife=self.halflife, + alpha=self.alpha, + min_periods=self.min_periods, + adjust=self.adjust, + ignore_na=self.ignore_na, + axis=self.axis, + times=self.times, + engine=engine, + engine_kwargs=engine_kwargs, + selection=self._selection, + ) + @doc( _shared_docs["aggregate"], see_also=dedent( @@ -655,3 +678,89 @@ def _get_window_indexer(self) -> GroupbyIndexer: window_indexer=ExponentialMovingWindowIndexer, ) return window_indexer + + +class OnlineExponentialMovingWindow(ExponentialMovingWindow): + def __init__( + self, + obj: FrameOrSeries, + com: float | None = None, + span: float | None = None, + halflife: float | TimedeltaConvertibleTypes | None = None, + alpha: float | None = None, + min_periods: int = 0, + adjust: bool = True, + ignore_na: bool = False, + axis: Axis = 0, + times: str | np.ndarray | FrameOrSeries | None = None, + engine: str = "numba", + engine_kwargs: dict[str, bool] | None = None, + *, + selection=None, + ): + super().__init__( + obj=obj, + com=com, + span=span, + halflife=halflife, + alpha=alpha, + min_periods=min_periods, + adjust=adjust, + ignore_na=ignore_na, + axis=axis, + times=times, + selection=selection, + ) + self._mean = EWMeanState(self._com, self.adjust, self.ignore_na) + self.engine = engine + self.engine_kwargs = engine_kwargs + + def reset(self): + """ + Reset the state captured by `update` calls. + """ + self._mean.reset() + + def aggregate(self, func, *args, **kwargs): + return NotImplementedError + + def std(self, bias: bool = False, *args, **kwargs): + return NotImplementedError + + def corr( + self, + other: FrameOrSeriesUnion | None = None, + pairwise: bool | None = None, + **kwargs, + ): + return NotImplementedError + + def cov( + self, + other: FrameOrSeriesUnion | None = None, + pairwise: bool | None = None, + bias: bool = False, + **kwargs, + ): + return NotImplementedError + + def var(self, bias: bool = False, *args, **kwargs): + return NotImplementedError + + def mean(self, engine=None, engine_kwargs=None, update=None, update_deltas=None): + if update is not None: + if self._mean.last_ewm is None: + raise ValueError( + "Must call mean with update=None first before passing update" + ) + obj = np.concatenate(([self._mean.last_ewm], update.to_numpy())) + result_from = 1 + else: + obj = self._selected_obj.to_numpy() + result_from = 0 + if update_deltas is None: + update_deltas = np.ones(max(len(obj) - 1, 0), dtype=np.float64) + ewma_func = generate_online_numba_ewma_func(engine_kwargs) + result = self._mean.run_ewm(obj, update_deltas, self.min_periods, ewma_func) + result = self._selected_obj._constructor(result) + return result.iloc[result_from:] diff --git a/pandas/core/window/online.py b/pandas/core/window/online.py new file mode 100644 index 0000000000000..c4a52d3c3f43b --- /dev/null +++ b/pandas/core/window/online.py @@ -0,0 +1,112 @@ +from typing import ( + Dict, + Optional, +) + +import numpy as np + +from pandas.compat._optional import import_optional_dependency + +from pandas.core.util.numba_ import ( + NUMBA_FUNC_CACHE, + get_jit_arguments, +) + + +def generate_online_numba_ewma_func(engine_kwargs: Optional[Dict[str, bool]]): + """ + Generate a numba jitted groupby ewma function specified by values + from engine_kwargs. + Parameters + ---------- + engine_kwargs : dict + dictionary of arguments to be passed into numba.jit + Returns + ------- + Numba function + """ + nopython, nogil, parallel = get_jit_arguments(engine_kwargs) + + cache_key = (lambda x: x, "online_ewma") + if cache_key in NUMBA_FUNC_CACHE: + return NUMBA_FUNC_CACHE[cache_key] + + numba = import_optional_dependency("numba") + + @numba.jit(nopython=nopython, nogil=nogil, parallel=parallel) + def online_ewma( + values: np.ndarray, + deltas: np.ndarray, + minimum_periods: int, + old_wt_factor: float, + new_wt: float, + old_wt: float, + adjust: bool, + ignore_na: bool, + ): + result = np.empty(len(values)) + + weighted_avg = values[0] + nobs = int(not np.isnan(weighted_avg)) + result[0] = weighted_avg if nobs >= minimum_periods else np.nan + + for j in range(1, len(values)): + cur = values[j] + is_observation = not np.isnan(cur) + nobs += is_observation + if not np.isnan(weighted_avg): + + if is_observation or not ignore_na: + + # note that len(deltas) = len(vals) - 1 and deltas[i] is to be + # used in conjunction with vals[i+1] + old_wt *= old_wt_factor ** deltas[j - 1] + if is_observation: + + # avoid numerical errors on constant series + if weighted_avg != cur: + weighted_avg = ( + (old_wt * weighted_avg) + (new_wt * cur) + ) / (old_wt + new_wt) + if adjust: + old_wt += new_wt + else: + old_wt = 1.0 + elif is_observation: + weighted_avg = cur + + result[j] = weighted_avg if nobs >= minimum_periods else np.nan + + return result, old_wt + + return online_ewma + + +class EWMeanState: + def __init__(self, com, adjust, ignore_na): + alpha = 1.0 / (1.0 + com) + self.old_wt_factor = 1.0 - alpha + self.new_wt = 1.0 if adjust else alpha + self.old_wt = 1.0 + self.adjust = adjust + self.ignore_na = ignore_na + self.last_ewm = None + + def run_ewm(self, weighted_avg, deltas, min_periods, ewm_func): + result, old_wt = ewm_func( + weighted_avg, + deltas, + min_periods, + self.old_wt_factor, + self.new_wt, + self.old_wt, + self.adjust, + self.ignore_na, + ) + self.old_wt = old_wt + self.last_ewm = result[-1] + return result + + def reset(self): + self.old_wt = 1 + self.last_ewm = None From 9354bd0d261580c02459f8ed71f641d2c227c116 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Sun, 6 Jun 2021 20:21:42 -0700 Subject: [PATCH 02/22] Add online op and new methods and class --- pandas/core/window/ewm.py | 10 +++-- pandas/core/window/online.py | 73 ++++++++++++++++++------------------ 2 files changed, 42 insertions(+), 41 deletions(-) diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index 087eea6ff3f55..6716148c41a63 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -711,7 +711,9 @@ def __init__( times=times, selection=selection, ) - self._mean = EWMeanState(self._com, self.adjust, self.ignore_na) + self._mean = EWMeanState( + self._com, self.adjust, self.ignore_na, self.axis, obj.shape + ) self.engine = engine self.engine_kwargs = engine_kwargs @@ -747,7 +749,7 @@ def cov( def var(self, bias: bool = False, *args, **kwargs): return NotImplementedError - def mean(self, engine=None, engine_kwargs=None, update=None, update_deltas=None): + def mean(self, update=None, update_deltas=None): if update is not None: if self._mean.last_ewm is None: raise ValueError( @@ -756,11 +758,11 @@ def mean(self, engine=None, engine_kwargs=None, update=None, update_deltas=None) obj = np.concatenate(([self._mean.last_ewm], update.to_numpy())) result_from = 1 else: - obj = self._selected_obj.to_numpy() + obj = self._selected_obj.astype(np.float64).to_numpy() result_from = 0 if update_deltas is None: update_deltas = np.ones(max(len(obj) - 1, 0), dtype=np.float64) - ewma_func = generate_online_numba_ewma_func(engine_kwargs) + ewma_func = generate_online_numba_ewma_func(self.engine_kwargs) result = self._mean.run_ewm(obj, update_deltas, self.min_periods, ewma_func) result = self._selected_obj._constructor(result) return result.iloc[result_from:] diff --git a/pandas/core/window/online.py b/pandas/core/window/online.py index c4a52d3c3f43b..45bb66f04d2b3 100644 --- a/pandas/core/window/online.py +++ b/pandas/core/window/online.py @@ -40,42 +40,39 @@ def online_ewma( minimum_periods: int, old_wt_factor: float, new_wt: float, - old_wt: float, + old_wt: np.ndarray, adjust: bool, ignore_na: bool, ): - result = np.empty(len(values)) - + result = np.empty(values.shape) weighted_avg = values[0] - nobs = int(not np.isnan(weighted_avg)) - result[0] = weighted_avg if nobs >= minimum_periods else np.nan - - for j in range(1, len(values)): - cur = values[j] - is_observation = not np.isnan(cur) - nobs += is_observation - if not np.isnan(weighted_avg): - - if is_observation or not ignore_na: - - # note that len(deltas) = len(vals) - 1 and deltas[i] is to be - # used in conjunction with vals[i+1] - old_wt *= old_wt_factor ** deltas[j - 1] - if is_observation: - - # avoid numerical errors on constant series - if weighted_avg != cur: - weighted_avg = ( - (old_wt * weighted_avg) + (new_wt * cur) - ) / (old_wt + new_wt) - if adjust: - old_wt += new_wt - else: - old_wt = 1.0 - elif is_observation: - weighted_avg = cur - - result[j] = weighted_avg if nobs >= minimum_periods else np.nan + nobs = (~np.isnan(weighted_avg)).astype(np.int64) + result[0] = np.where(nobs >= minimum_periods, weighted_avg, np.nan) + + for i in range(1, len(values)): + cur = values[i] + is_observations = ~np.isnan(cur) + nobs += is_observations.astype(np.int64) + for j in range(len(cur)): + if not np.isnan(weighted_avg[j]): + if is_observations[j] or not ignore_na: + # note that len(deltas) = len(vals) - 1 and deltas[i] is to be + # used in conjunction with vals[i+1] + old_wt[j] *= old_wt_factor ** deltas[j - 1] + if is_observations[j]: + # avoid numerical errors on constant series + if weighted_avg[j] != cur[j]: + weighted_avg[j] = ( + (old_wt[j] * weighted_avg[j]) + (new_wt * cur[j]) + ) / (old_wt[j] + new_wt) + if adjust: + old_wt[j] += new_wt + else: + old_wt[j] = 1.0 + elif is_observations[j]: + weighted_avg[j] = cur[j] + + result[i] = np.where(nobs >= minimum_periods, weighted_avg, np.nan) return result, old_wt @@ -83,13 +80,15 @@ def online_ewma( class EWMeanState: - def __init__(self, com, adjust, ignore_na): + def __init__(self, com, adjust, ignore_na, axis, shape): alpha = 1.0 / (1.0 + com) - self.old_wt_factor = 1.0 - alpha - self.new_wt = 1.0 if adjust else alpha - self.old_wt = 1.0 + self.axis = axis + self.shape = shape self.adjust = adjust self.ignore_na = ignore_na + self.new_wt = 1.0 if adjust else alpha + self.old_wt_factor = 1.0 - alpha + self.old_wt = np.ones(self.shape[self.axis - 1]) self.last_ewm = None def run_ewm(self, weighted_avg, deltas, min_periods, ewm_func): @@ -108,5 +107,5 @@ def run_ewm(self, weighted_avg, deltas, min_periods, ewm_func): return result def reset(self): - self.old_wt = 1 + self.old_wt = np.ones(self.shape[self.axis - 1]) self.last_ewm = None From 0ce197dab324511eb569a04b872f11119dc534a9 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Sun, 6 Jun 2021 20:33:32 -0700 Subject: [PATCH 03/22] Make signatures match --- pandas/core/window/ewm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index 6716148c41a63..20ebf1bc9b102 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -749,7 +749,7 @@ def cov( def var(self, bias: bool = False, *args, **kwargs): return NotImplementedError - def mean(self, update=None, update_deltas=None): + def mean(self, *args, update=None, update_deltas=None, **kwargs): if update is not None: if self._mean.last_ewm is None: raise ValueError( From 8096cc600000ba0edb5b2c8782450f598d2ccf30 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Sun, 6 Jun 2021 21:01:13 -0700 Subject: [PATCH 04/22] Add some tests, rename some variables --- pandas/core/window/ewm.py | 14 ++++++---- pandas/core/window/online.py | 2 +- pandas/tests/window/test_online.py | 44 ++++++++++++++++++++++++++++++ 3 files changed, 53 insertions(+), 7 deletions(-) create mode 100644 pandas/tests/window/test_online.py diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index 20ebf1bc9b102..986cc0caccc35 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -43,7 +43,7 @@ ) from pandas.core.window.numba_ import generate_numba_ewma_func from pandas.core.window.online import ( - EWMeanState, + EWMMeanState, generate_online_numba_ewma_func, ) from pandas.core.window.rolling import ( @@ -711,7 +711,7 @@ def __init__( times=times, selection=selection, ) - self._mean = EWMeanState( + self._mean = EWMMeanState( self._com, self.adjust, self.ignore_na, self.axis, obj.shape ) self.engine = engine @@ -749,7 +749,7 @@ def cov( def var(self, bias: bool = False, *args, **kwargs): return NotImplementedError - def mean(self, *args, update=None, update_deltas=None, **kwargs): + def mean(self, *args, update=None, update_times=None, **kwargs): if update is not None: if self._mean.last_ewm is None: raise ValueError( @@ -760,9 +760,11 @@ def mean(self, *args, update=None, update_deltas=None, **kwargs): else: obj = self._selected_obj.astype(np.float64).to_numpy() result_from = 0 - if update_deltas is None: - update_deltas = np.ones(max(len(obj) - 1, 0), dtype=np.float64) + if update_times is None: + update_times = np.ones(max(len(obj) - 1, 0), dtype=np.float64) + else: + update_times = _calculate_deltas(update_times, self.halflife) ewma_func = generate_online_numba_ewma_func(self.engine_kwargs) - result = self._mean.run_ewm(obj, update_deltas, self.min_periods, ewma_func) + result = self._mean.run_ewm(obj, update_times, self.min_periods, ewma_func) result = self._selected_obj._constructor(result) return result.iloc[result_from:] diff --git a/pandas/core/window/online.py b/pandas/core/window/online.py index 45bb66f04d2b3..3cc88cee892c3 100644 --- a/pandas/core/window/online.py +++ b/pandas/core/window/online.py @@ -79,7 +79,7 @@ def online_ewma( return online_ewma -class EWMeanState: +class EWMMeanState: def __init__(self, com, adjust, ignore_na, axis, shape): alpha = 1.0 / (1.0 + com) self.axis = axis diff --git a/pandas/tests/window/test_online.py b/pandas/tests/window/test_online.py new file mode 100644 index 0000000000000..fdca82e86b532 --- /dev/null +++ b/pandas/tests/window/test_online.py @@ -0,0 +1,44 @@ +import numpy as np +import pytest + +import pandas.util._test_decorators as td + +from pandas import ( + DataFrame, + Series, +) +import pandas._testing as tm + + +@td.skip_if_no("numba", "0.46.0") +@pytest.mark.filterwarnings("ignore:\\nThe keyword argument") +class TestEWM: + def test_online_vs_non_online(self, nogil, parallel, nopython): + df = DataFrame({"a": range(5), "b": range(5)}) + expected = df.ewm(0.5).mean() + engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython} + online_ewm = df.head(2).ewm(0.5).online(engine_kwargs=engine_kwargs) + result = online_ewm.mean() + tm.assert_frame_equal(result, expected.head(2)) + + result = online_ewm.update(update=df.tail(3)) + tm.assert_frame_equal(result, expected.tail(3)) + + def test_update_times(self, nogil, parallel, nopython): + times = Series( + np.array( + ["2020-01-01", "2020-01-02", "2020-01-04", "2020-01-17", "2020-01-21"], + dtype="datetime64", + ) + ) + df = DataFrame({"a": range(5), "b": range(5)}) + expected = df.ewm(0.5, times=times).mean() + engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython} + online_ewm = ( + df.head(2).ewm(0.5, times=times.head(2)).online(engine_kwargs=engine_kwargs) + ) + result = online_ewm.mean() + tm.assert_frame_equal(result, expected.head(2)) + + result = online_ewm.update(update=df.tail(3), update_times=times.tail(3)) + tm.assert_frame_equal(result, expected.tail(3)) From a5273b9fc163d9e24395bbb5f9e9c38fe32e56d0 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Sun, 6 Jun 2021 21:02:04 -0700 Subject: [PATCH 05/22] Add newline for readability --- pandas/tests/window/test_online.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pandas/tests/window/test_online.py b/pandas/tests/window/test_online.py index fdca82e86b532..dd21b69bdd129 100644 --- a/pandas/tests/window/test_online.py +++ b/pandas/tests/window/test_online.py @@ -17,6 +17,7 @@ def test_online_vs_non_online(self, nogil, parallel, nopython): df = DataFrame({"a": range(5), "b": range(5)}) expected = df.ewm(0.5).mean() engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython} + online_ewm = df.head(2).ewm(0.5).online(engine_kwargs=engine_kwargs) result = online_ewm.mean() tm.assert_frame_equal(result, expected.head(2)) @@ -33,6 +34,7 @@ def test_update_times(self, nogil, parallel, nopython): ) df = DataFrame({"a": range(5), "b": range(5)}) expected = df.ewm(0.5, times=times).mean() + engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython} online_ewm = ( df.head(2).ewm(0.5, times=times.head(2)).online(engine_kwargs=engine_kwargs) From bab78ccf5e673b0f247400102f9e46a992067ea3 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Sun, 6 Jun 2021 21:05:18 -0700 Subject: [PATCH 06/22] Parameterize over adjust and ignore_na --- pandas/tests/window/test_online.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/pandas/tests/window/test_online.py b/pandas/tests/window/test_online.py index dd21b69bdd129..cd617db20f922 100644 --- a/pandas/tests/window/test_online.py +++ b/pandas/tests/window/test_online.py @@ -13,19 +13,23 @@ @td.skip_if_no("numba", "0.46.0") @pytest.mark.filterwarnings("ignore:\\nThe keyword argument") class TestEWM: - def test_online_vs_non_online(self, nogil, parallel, nopython): + def test_online_vs_non_online(self, nogil, parallel, nopython, adjust, ignore_na): df = DataFrame({"a": range(5), "b": range(5)}) - expected = df.ewm(0.5).mean() + expected = df.ewm(0.5, adjust=adjust, ignore_na=ignore_na).mean() engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython} - online_ewm = df.head(2).ewm(0.5).online(engine_kwargs=engine_kwargs) + online_ewm = ( + df.head(2) + .ewm(0.5, adjust=adjust, ignore_na=ignore_na) + .online(engine_kwargs=engine_kwargs) + ) result = online_ewm.mean() tm.assert_frame_equal(result, expected.head(2)) result = online_ewm.update(update=df.tail(3)) tm.assert_frame_equal(result, expected.tail(3)) - def test_update_times(self, nogil, parallel, nopython): + def test_update_times(self, nogil, parallel, nopython, adjust, ignore_na): times = Series( np.array( ["2020-01-01", "2020-01-02", "2020-01-04", "2020-01-17", "2020-01-21"], @@ -33,11 +37,13 @@ def test_update_times(self, nogil, parallel, nopython): ) ) df = DataFrame({"a": range(5), "b": range(5)}) - expected = df.ewm(0.5, times=times).mean() + expected = df.ewm(0.5, adjust=adjust, ignore_na=ignore_na, times=times).mean() engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython} online_ewm = ( - df.head(2).ewm(0.5, times=times.head(2)).online(engine_kwargs=engine_kwargs) + df.head(2) + .ewm(0.5, adjust=adjust, ignore_na=ignore_na, times=times.head(2)) + .online(engine_kwargs=engine_kwargs) ) result = online_ewm.mean() tm.assert_frame_equal(result, expected.head(2)) From d72a03e8a87db1255c15caa35cf1d2fcf6cd153d Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Sun, 6 Jun 2021 21:08:47 -0700 Subject: [PATCH 07/22] Test resetting in tests --- pandas/tests/window/test_online.py | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/pandas/tests/window/test_online.py b/pandas/tests/window/test_online.py index cd617db20f922..b40f336485467 100644 --- a/pandas/tests/window/test_online.py +++ b/pandas/tests/window/test_online.py @@ -23,11 +23,15 @@ def test_online_vs_non_online(self, nogil, parallel, nopython, adjust, ignore_na .ewm(0.5, adjust=adjust, ignore_na=ignore_na) .online(engine_kwargs=engine_kwargs) ) - result = online_ewm.mean() - tm.assert_frame_equal(result, expected.head(2)) + # Test resetting once + for i in range(2): + result = online_ewm.mean() + tm.assert_frame_equal(result, expected.head(2)) - result = online_ewm.update(update=df.tail(3)) - tm.assert_frame_equal(result, expected.tail(3)) + result = online_ewm.update(update=df.tail(3)) + tm.assert_frame_equal(result, expected.tail(3)) + + online_ewm.reset() def test_update_times(self, nogil, parallel, nopython, adjust, ignore_na): times = Series( @@ -45,8 +49,12 @@ def test_update_times(self, nogil, parallel, nopython, adjust, ignore_na): .ewm(0.5, adjust=adjust, ignore_na=ignore_na, times=times.head(2)) .online(engine_kwargs=engine_kwargs) ) - result = online_ewm.mean() - tm.assert_frame_equal(result, expected.head(2)) + # Test resetting once + for i in range(2): + result = online_ewm.mean() + tm.assert_frame_equal(result, expected.head(2)) + + result = online_ewm.update(update=df.tail(3), update_times=times.tail(3)) + tm.assert_frame_equal(result, expected.tail(3)) - result = online_ewm.update(update=df.tail(3), update_times=times.tail(3)) - tm.assert_frame_equal(result, expected.tail(3)) + online_ewm.reset() From 0b7e77325cbbf9f24b48e0da2e3111550eb87818 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Sun, 6 Jun 2021 21:14:55 -0700 Subject: [PATCH 08/22] Add test with invalid update --- pandas/tests/window/test_online.py | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/pandas/tests/window/test_online.py b/pandas/tests/window/test_online.py index b40f336485467..ca22c3b18c4c1 100644 --- a/pandas/tests/window/test_online.py +++ b/pandas/tests/window/test_online.py @@ -13,7 +13,18 @@ @td.skip_if_no("numba", "0.46.0") @pytest.mark.filterwarnings("ignore:\\nThe keyword argument") class TestEWM: - def test_online_vs_non_online(self, nogil, parallel, nopython, adjust, ignore_na): + def test_invalid_update(self): + df = DataFrame({"a": range(5), "b": range(5)}) + online_ewm = df.head(2).ewm(0.5).online() + with pytest.raises( + ValueError, + match="Must call mean with update=None first before passing update", + ): + online_ewm.mean(update=df.head(1)) + + def test_online_vs_non_online_mean( + self, nogil, parallel, nopython, adjust, ignore_na + ): df = DataFrame({"a": range(5), "b": range(5)}) expected = df.ewm(0.5, adjust=adjust, ignore_na=ignore_na).mean() engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython} @@ -24,7 +35,7 @@ def test_online_vs_non_online(self, nogil, parallel, nopython, adjust, ignore_na .online(engine_kwargs=engine_kwargs) ) # Test resetting once - for i in range(2): + for _ in range(2): result = online_ewm.mean() tm.assert_frame_equal(result, expected.head(2)) @@ -33,7 +44,7 @@ def test_online_vs_non_online(self, nogil, parallel, nopython, adjust, ignore_na online_ewm.reset() - def test_update_times(self, nogil, parallel, nopython, adjust, ignore_na): + def test_update_times_mean(self, nogil, parallel, nopython, adjust, ignore_na): times = Series( np.array( ["2020-01-01", "2020-01-02", "2020-01-04", "2020-01-17", "2020-01-21"], @@ -50,7 +61,7 @@ def test_update_times(self, nogil, parallel, nopython, adjust, ignore_na): .online(engine_kwargs=engine_kwargs) ) # Test resetting once - for i in range(2): + for _ in range(2): result = online_ewm.mean() tm.assert_frame_equal(result, expected.head(2)) From 8444b42d9e22a312c1f9d3e6194e2b4e92c7497a Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Sun, 6 Jun 2021 21:32:53 -0700 Subject: [PATCH 09/22] Add docstring for mean --- pandas/core/window/ewm.py | 42 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index 986cc0caccc35..397bf43f3dba5 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -750,6 +750,48 @@ def var(self, bias: bool = False, *args, **kwargs): return NotImplementedError def mean(self, *args, update=None, update_times=None, **kwargs): + """ + Calculate an online exponentially weighted mean. + + Parameters + ---------- + update: DataFrame or Series, default None + New values to continue calculating the + exponentially weighted mean from the last values and weights. + Values should be float64 dtype. + + ``update`` needs to be ``None`` the first time the + exponentially weighted mean is calculated. + + update_times: Series or np.ndarray, default None + New times to continue calculating the + exponentially weighted mean from the last values and weights. + If ``None``, values are assumed to be evenly spaced + in time. + + Returns + ------- + DataFrame or Series + + Examples + -------- + >>> df = pd.DataFrame({"a": range(5), "b": range(5, 10)}) + >>> online_ewm = df.head(2).ewm(0.5).online() + >>> online_ewm.mean() + 0 1 + 0 0.00 5.00 + 1 0.75 5.75 + >>> online_ewm.mean(update=df.tail(3)) + 0 1 + 1 1.615385 6.615385 + 2 2.550000 7.550000 + 3 3.520661 8.520661 + >>> online_ewm.reset() + >>> online_ewm.mean() + 0 1 + 0 0.00 5.00 + 1 0.75 5.75 + """ if update is not None: if self._mean.last_ewm is None: raise ValueError( From 7847373e46a9bdc19df827c21cf041d95f6f0150 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Sun, 6 Jun 2021 21:42:19 -0700 Subject: [PATCH 10/22] Add docstring for online --- pandas/core/window/ewm.py | 34 ++++++++++++++++++++++++++++++---- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index 397bf43f3dba5..cdf69d28977df 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -15,7 +15,6 @@ FrameOrSeriesUnion, TimedeltaConvertibleTypes, ) -from pandas.compat._optional import import_optional_dependency from pandas.compat.numpy import function as nv from pandas.util._decorators import doc @@ -344,7 +343,31 @@ def _get_window_indexer(self) -> BaseIndexer: return ExponentialMovingWindowIndexer() def online(self, engine="numba", engine_kwargs=None): - import_optional_dependency("numba") + """ + Return an ``OnlineExponentialMovingWindow`` object to calculate + exponentially moving window aggregations in an online method. + + .. versionadded:: 1.3.0 + + Parameters + ---------- + engine: str, default ``'numba'`` + Execution engine to calculate online aggregations. + Applies to all supported aggregation methods. + + engine_kwargs : dict, default None + Applies to all supported aggregation methods. + + * For ``'numba'`` engine, the engine can accept ``nopython``, ``nogil`` + and ``parallel`` dictionary keys. The values must either be ``True`` or + ``False``. The default ``engine_kwargs`` for the ``'numba'`` engine is + ``{{'nopython': True, 'nogil': False, 'parallel': False}}`` and will be + applied to the function + + Returns + ------- + OnlineExponentialMovingWindow + """ return OnlineExponentialMovingWindow( obj=self.obj, com=self.com, @@ -714,8 +737,11 @@ def __init__( self._mean = EWMMeanState( self._com, self.adjust, self.ignore_na, self.axis, obj.shape ) - self.engine = engine - self.engine_kwargs = engine_kwargs + if maybe_use_numba(engine): + self.engine = engine + self.engine_kwargs = engine_kwargs + else: + raise ValueError("'numba' is the only supported engine") def reset(self): """ From df13b556472bec7c5a8100f6ae5c5c959c73b903 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Sun, 6 Jun 2021 21:50:59 -0700 Subject: [PATCH 11/22] Parameterize over dataframe and series --- pandas/tests/window/test_online.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/pandas/tests/window/test_online.py b/pandas/tests/window/test_online.py index ca22c3b18c4c1..34b117fc79b9e 100644 --- a/pandas/tests/window/test_online.py +++ b/pandas/tests/window/test_online.py @@ -22,15 +22,17 @@ def test_invalid_update(self): ): online_ewm.mean(update=df.head(1)) + @pytest.mark.parametrize( + "obj", [DataFrame({"a": range(5), "b": range(5)}), Series(range(5))] + ) def test_online_vs_non_online_mean( - self, nogil, parallel, nopython, adjust, ignore_na + self, obj, nogil, parallel, nopython, adjust, ignore_na ): - df = DataFrame({"a": range(5), "b": range(5)}) - expected = df.ewm(0.5, adjust=adjust, ignore_na=ignore_na).mean() + expected = obj.ewm(0.5, adjust=adjust, ignore_na=ignore_na).mean() engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython} online_ewm = ( - df.head(2) + obj.head(2) .ewm(0.5, adjust=adjust, ignore_na=ignore_na) .online(engine_kwargs=engine_kwargs) ) @@ -39,24 +41,26 @@ def test_online_vs_non_online_mean( result = online_ewm.mean() tm.assert_frame_equal(result, expected.head(2)) - result = online_ewm.update(update=df.tail(3)) + result = online_ewm.update(update=obj.tail(3)) tm.assert_frame_equal(result, expected.tail(3)) online_ewm.reset() - def test_update_times_mean(self, nogil, parallel, nopython, adjust, ignore_na): + @pytest.mark.parametrize( + "obj", [DataFrame({"a": range(5), "b": range(5)}), Series(range(5))] + ) + def test_update_times_mean(self, obj, nogil, parallel, nopython, adjust, ignore_na): times = Series( np.array( ["2020-01-01", "2020-01-02", "2020-01-04", "2020-01-17", "2020-01-21"], dtype="datetime64", ) ) - df = DataFrame({"a": range(5), "b": range(5)}) - expected = df.ewm(0.5, adjust=adjust, ignore_na=ignore_na, times=times).mean() + expected = obj.ewm(0.5, adjust=adjust, ignore_na=ignore_na, times=times).mean() engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython} online_ewm = ( - df.head(2) + obj.head(2) .ewm(0.5, adjust=adjust, ignore_na=ignore_na, times=times.head(2)) .online(engine_kwargs=engine_kwargs) ) @@ -65,7 +69,7 @@ def test_update_times_mean(self, nogil, parallel, nopython, adjust, ignore_na): result = online_ewm.mean() tm.assert_frame_equal(result, expected.head(2)) - result = online_ewm.update(update=df.tail(3), update_times=times.tail(3)) + result = online_ewm.update(update=obj.tail(3), update_times=times.tail(3)) tm.assert_frame_equal(result, expected.tail(3)) online_ewm.reset() From 57db06e9501523026931863c7f0f91c98e5255cc Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Sun, 6 Jun 2021 22:12:54 -0700 Subject: [PATCH 12/22] Generalize axis call for update_times --- pandas/core/window/ewm.py | 13 +++++++++++-- pandas/tests/window/test_online.py | 4 ++-- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index cdf69d28977df..d88ee49008d1c 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -823,13 +823,22 @@ def mean(self, *args, update=None, update_times=None, **kwargs): raise ValueError( "Must call mean with update=None first before passing update" ) - obj = np.concatenate(([self._mean.last_ewm], update.to_numpy())) + # if len(self._mean.last_ewm.shape) != len(update.shape): + # # update can be 1D or 2D, self._mean.last_ewm is 1D + # last_values = self._mean.last_ewm.reshape( + # update.shape[self.axis - 1], - 1 + # ) + # else: + # last_values = self._mean.last_ewm + obj = np.concatenate((self._mean.last_ewm, update.to_numpy())) result_from = 1 else: obj = self._selected_obj.astype(np.float64).to_numpy() result_from = 0 if update_times is None: - update_times = np.ones(max(len(obj) - 1, 0), dtype=np.float64) + update_times = np.ones( + max(self.obj.shape[self.axis - 1] - 1, 0), dtype=np.float64 + ) else: update_times = _calculate_deltas(update_times, self.halflife) ewma_func = generate_online_numba_ewma_func(self.engine_kwargs) diff --git a/pandas/tests/window/test_online.py b/pandas/tests/window/test_online.py index 34b117fc79b9e..2466423a64203 100644 --- a/pandas/tests/window/test_online.py +++ b/pandas/tests/window/test_online.py @@ -41,7 +41,7 @@ def test_online_vs_non_online_mean( result = online_ewm.mean() tm.assert_frame_equal(result, expected.head(2)) - result = online_ewm.update(update=obj.tail(3)) + result = online_ewm.mean(update=obj.tail(3)) tm.assert_frame_equal(result, expected.tail(3)) online_ewm.reset() @@ -69,7 +69,7 @@ def test_update_times_mean(self, obj, nogil, parallel, nopython, adjust, ignore_ result = online_ewm.mean() tm.assert_frame_equal(result, expected.head(2)) - result = online_ewm.update(update=obj.tail(3), update_times=times.tail(3)) + result = online_ewm.mean(update=obj.tail(3), update_times=times.tail(3)) tm.assert_frame_equal(result, expected.tail(3)) online_ewm.reset() From 329dbd2eab41c9d579c252d0a9d9c6c384909b87 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Sun, 6 Jun 2021 22:24:01 -0700 Subject: [PATCH 13/22] Remove comments --- pandas/core/window/ewm.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index d88ee49008d1c..05e74f242462d 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -823,13 +823,6 @@ def mean(self, *args, update=None, update_times=None, **kwargs): raise ValueError( "Must call mean with update=None first before passing update" ) - # if len(self._mean.last_ewm.shape) != len(update.shape): - # # update can be 1D or 2D, self._mean.last_ewm is 1D - # last_values = self._mean.last_ewm.reshape( - # update.shape[self.axis - 1], - 1 - # ) - # else: - # last_values = self._mean.last_ewm obj = np.concatenate((self._mean.last_ewm, update.to_numpy())) result_from = 1 else: From 28be18a4169233cf8cb2a28313e6c2cbdda11cfd Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Tue, 8 Jun 2021 11:44:53 -0700 Subject: [PATCH 14/22] Add more test and ensure constructions --- pandas/core/window/ewm.py | 26 ++++++++++++++++++++------ pandas/tests/window/test_online.py | 4 ++-- 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index 05e74f242462d..ed315c8c8b6d2 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -789,7 +789,7 @@ def mean(self, *args, update=None, update_times=None, **kwargs): ``update`` needs to be ``None`` the first time the exponentially weighted mean is calculated. - update_times: Series or np.ndarray, default None + update_times: Series or 1-D np.ndarray, default None New times to continue calculating the exponentially weighted mean from the last values and weights. If ``None``, values are assumed to be evenly spaced @@ -818,23 +818,37 @@ def mean(self, *args, update=None, update_times=None, **kwargs): 0 0.00 5.00 1 0.75 5.75 """ + result_kwargs = {} if update is not None: if self._mean.last_ewm is None: raise ValueError( "Must call mean with update=None first before passing update" ) - obj = np.concatenate((self._mean.last_ewm, update.to_numpy())) result_from = 1 + result_kwargs["index"] = update.index + if update.ndim == 2: + last_value = self._mean.last_ewm[np.newaxis, :] + result_kwargs["columns"] = update.columns + else: + last_value = self._mean.last_ewm + result_kwargs["name"] = update.name + obj = np.concatenate((last_value, update.to_numpy())) else: - obj = self._selected_obj.astype(np.float64).to_numpy() result_from = 0 + result_kwargs["index"] = self._selected_obj.index + if self._selected_obj.ndim == 2: + result_kwargs["columns"] = self._selected_obj.columns + else: + result_kwargs["name"] = self._selected_obj.name + obj = self._selected_obj.astype(np.float64).to_numpy() if update_times is None: update_times = np.ones( - max(self.obj.shape[self.axis - 1] - 1, 0), dtype=np.float64 + max(self._selected_obj.shape[self.axis - 1] - 1, 0), dtype=np.float64 ) else: update_times = _calculate_deltas(update_times, self.halflife) ewma_func = generate_online_numba_ewma_func(self.engine_kwargs) result = self._mean.run_ewm(obj, update_times, self.min_periods, ewma_func) - result = self._selected_obj._constructor(result) - return result.iloc[result_from:] + result = result[result_from:] + result = self._selected_obj._constructor(result, **result_kwargs) + return result diff --git a/pandas/tests/window/test_online.py b/pandas/tests/window/test_online.py index 2466423a64203..bd34e49804f95 100644 --- a/pandas/tests/window/test_online.py +++ b/pandas/tests/window/test_online.py @@ -23,7 +23,7 @@ def test_invalid_update(self): online_ewm.mean(update=df.head(1)) @pytest.mark.parametrize( - "obj", [DataFrame({"a": range(5), "b": range(5)}), Series(range(5))] + "obj", [DataFrame({"a": range(5), "b": range(5)}), Series(range(5), name="foo")] ) def test_online_vs_non_online_mean( self, obj, nogil, parallel, nopython, adjust, ignore_na @@ -47,7 +47,7 @@ def test_online_vs_non_online_mean( online_ewm.reset() @pytest.mark.parametrize( - "obj", [DataFrame({"a": range(5), "b": range(5)}), Series(range(5))] + "obj", [DataFrame({"a": range(5), "b": range(5)}), Series(range(5), name="foo")] ) def test_update_times_mean(self, obj, nogil, parallel, nopython, adjust, ignore_na): times = Series( From 85025ff8630bbe3c5f22bef9bd440994c99910b1 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Tue, 8 Jun 2021 16:39:46 -0700 Subject: [PATCH 15/22] Passing all the non-time tests --- pandas/core/window/ewm.py | 18 +++++++++++++----- pandas/tests/window/test_online.py | 28 +++++++++++++++++++++------- 2 files changed, 34 insertions(+), 12 deletions(-) diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index ed315c8c8b6d2..d6e01160c79b1 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -819,6 +819,7 @@ def mean(self, *args, update=None, update_times=None, **kwargs): 1 0.75 5.75 """ result_kwargs = {} + is_frame = True if self._selected_obj.ndim == 2 else False if update is not None: if self._mean.last_ewm is None: raise ValueError( @@ -826,21 +827,21 @@ def mean(self, *args, update=None, update_times=None, **kwargs): ) result_from = 1 result_kwargs["index"] = update.index - if update.ndim == 2: + if is_frame: last_value = self._mean.last_ewm[np.newaxis, :] result_kwargs["columns"] = update.columns else: last_value = self._mean.last_ewm result_kwargs["name"] = update.name - obj = np.concatenate((last_value, update.to_numpy())) + np_array = np.concatenate((last_value, update.to_numpy())) else: result_from = 0 result_kwargs["index"] = self._selected_obj.index - if self._selected_obj.ndim == 2: + if is_frame: result_kwargs["columns"] = self._selected_obj.columns else: result_kwargs["name"] = self._selected_obj.name - obj = self._selected_obj.astype(np.float64).to_numpy() + np_array = self._selected_obj.astype(np.float64).to_numpy() if update_times is None: update_times = np.ones( max(self._selected_obj.shape[self.axis - 1] - 1, 0), dtype=np.float64 @@ -848,7 +849,14 @@ def mean(self, *args, update=None, update_times=None, **kwargs): else: update_times = _calculate_deltas(update_times, self.halflife) ewma_func = generate_online_numba_ewma_func(self.engine_kwargs) - result = self._mean.run_ewm(obj, update_times, self.min_periods, ewma_func) + result = self._mean.run_ewm( + np_array if is_frame else np_array[:, np.newaxis], + update_times, + self.min_periods, + ewma_func, + ) + if not is_frame: + result = result.squeeze() result = result[result_from:] result = self._selected_obj._constructor(result, **result_kwargs) return result diff --git a/pandas/tests/window/test_online.py b/pandas/tests/window/test_online.py index bd34e49804f95..857e4598881ab 100644 --- a/pandas/tests/window/test_online.py +++ b/pandas/tests/window/test_online.py @@ -39,37 +39,51 @@ def test_online_vs_non_online_mean( # Test resetting once for _ in range(2): result = online_ewm.mean() - tm.assert_frame_equal(result, expected.head(2)) + tm.assert_equal(result, expected.head(2)) result = online_ewm.mean(update=obj.tail(3)) - tm.assert_frame_equal(result, expected.tail(3)) + tm.assert_equal(result, expected.tail(3)) online_ewm.reset() @pytest.mark.parametrize( "obj", [DataFrame({"a": range(5), "b": range(5)}), Series(range(5), name="foo")] ) - def test_update_times_mean(self, obj, nogil, parallel, nopython, adjust, ignore_na): + def test_update_times_mean( + self, obj, nogil, parallel, nopython, adjust, ignore_na, halflife_with_times + ): times = Series( np.array( ["2020-01-01", "2020-01-02", "2020-01-04", "2020-01-17", "2020-01-21"], dtype="datetime64", ) ) - expected = obj.ewm(0.5, adjust=adjust, ignore_na=ignore_na, times=times).mean() + expected = obj.ewm( + 0.5, + adjust=adjust, + ignore_na=ignore_na, + times=times, + halflife=halflife_with_times, + ).mean() engine_kwargs = {"nogil": nogil, "parallel": parallel, "nopython": nopython} online_ewm = ( obj.head(2) - .ewm(0.5, adjust=adjust, ignore_na=ignore_na, times=times.head(2)) + .ewm( + 0.5, + adjust=adjust, + ignore_na=ignore_na, + times=times.head(2), + halflife=halflife_with_times, + ) .online(engine_kwargs=engine_kwargs) ) # Test resetting once for _ in range(2): result = online_ewm.mean() - tm.assert_frame_equal(result, expected.head(2)) + tm.assert_equal(result, expected.head(2)) result = online_ewm.mean(update=obj.tail(3), update_times=times.tail(3)) - tm.assert_frame_equal(result, expected.tail(3)) + tm.assert_equal(result, expected.tail(3)) online_ewm.reset() From 33452718830f6a3897521920aae59e84e9c12226 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Tue, 8 Jun 2021 21:49:34 -0700 Subject: [PATCH 16/22] Add whatsnew and window.rst; xfail update_times --- doc/source/user_guide/window.rst | 33 ++++++++++++++++++++++-------- doc/source/whatsnew/v1.3.0.rst | 1 + pandas/core/window/ewm.py | 19 ++++++++++------- pandas/tests/window/test_online.py | 3 ++- 4 files changed, 40 insertions(+), 16 deletions(-) diff --git a/doc/source/user_guide/window.rst b/doc/source/user_guide/window.rst index c8687f808a802..0c58b311bcc2b 100644 --- a/doc/source/user_guide/window.rst +++ b/doc/source/user_guide/window.rst @@ -37,14 +37,14 @@ pandas supports 4 types of windowing operations: #. Expanding window: Accumulating window over the values. #. Exponentially Weighted window: Accumulating and exponentially weighted window over the values. -============================= ================= =========================== =========================== ======================== =================================== -Concept Method Returned Object Supports time-based windows Supports chained groupby Supports table method -============================= ================= =========================== =========================== ======================== =================================== -Rolling window ``rolling`` ``Rolling`` Yes Yes Yes (as of version 1.3) -Weighted window ``rolling`` ``Window`` No No No -Expanding window ``expanding`` ``Expanding`` No Yes Yes (as of version 1.3) -Exponentially Weighted window ``ewm`` ``ExponentialMovingWindow`` No Yes (as of version 1.2) No -============================= ================= =========================== =========================== ======================== =================================== +============================= ================= =========================== =========================== ======================== =================================== =========================== +Concept Method Returned Object Supports time-based windows Supports chained groupby Supports table method Supports online operations +============================= ================= =========================== =========================== ======================== =================================== =========================== +Rolling window ``rolling`` ``Rolling`` Yes Yes Yes (as of version 1.3) No +Weighted window ``rolling`` ``Window`` No No No No +Expanding window ``expanding`` ``Expanding`` No Yes Yes (as of version 1.3) No +Exponentially Weighted window ``ewm`` ``ExponentialMovingWindow`` No Yes (as of version 1.2) No Yes (as of version 1.3) +============================= ================= =========================== =========================== ======================== =================================== =========================== As noted above, some operations support specifying a window based on a time offset: @@ -98,6 +98,23 @@ be calculated with :meth:`~Rolling.apply` by specifying a separate column of wei df = pd.DataFrame([[1, 2, 0.6], [2, 3, 0.4], [3, 4, 0.2], [4, 5, 0.7]]) df.rolling(2, method="table", min_periods=0).apply(weighted_mean, raw=True, engine="numba") # noqa:E501 +.. versionadded:: 1.3 + +Some windowing operations also support an ``online`` method after constructing a windowing object +which returns a new object that supports passing in new :class:`DataFrame` or :class:`Series` objects +to continue the windowing calculation with the new values (i.e. online calculations). + +The methods on this new windowing objects must call the aggregation method first to "prime" the initial +state of the online calculation. Then, new :class:`DataFrame` or :class:`Series` objects can be passed in +the ``update`` argument to continue the windowing calculation. + +.. ipython:: python + + df = pd.DataFrame([[1, 2, 0.6], [2, 3, 0.4], [3, 4, 0.2], [4, 5, 0.7]]) + df.ewm(0.5).mean() + online_ewm = df.head(2).ewm(0.5).online() + online_ewm.mean() + online_ewm.mean(update=df.tail(1)) All windowing operations support a ``min_periods`` argument that dictates the minimum amount of non-``np.nan`` values a window must have; otherwise, the resulting value is ``np.nan``. diff --git a/doc/source/whatsnew/v1.3.0.rst b/doc/source/whatsnew/v1.3.0.rst index 8b413808503ad..90ebf367532fd 100644 --- a/doc/source/whatsnew/v1.3.0.rst +++ b/doc/source/whatsnew/v1.3.0.rst @@ -194,6 +194,7 @@ Other enhancements ^^^^^^^^^^^^^^^^^^ - :meth:`DataFrame.rolling`, :meth:`Series.rolling`, :meth:`DataFrame.expanding`, and :meth:`Series.expanding` now support a ``method`` argument with a ``'table'`` option that performs the windowing operation over an entire :class:`DataFrame`. See :ref:`Window Overview ` for performance and functional benefits (:issue:`15095`, :issue:`38995`) +- :class:`.ExponentialMovingWindow` now support a ``online`` that can perform ``mean`` calculations in an online fashion. See :ref:`Window Overview ` (:issue:`41673`) - Added :meth:`MultiIndex.dtypes` (:issue:`37062`) - Added ``end`` and ``end_day`` options for the ``origin`` argument in :meth:`DataFrame.resample` (:issue:`37804`) - Improve error message when ``usecols`` and ``names`` do not match for :func:`read_csv` and ``engine="c"`` (:issue:`29042`) diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index d6e01160c79b1..2f1643280671c 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -721,6 +721,10 @@ def __init__( *, selection=None, ): + if times is not None: + raise NotImplementedError( + "times is not implemented with online operations." + ) super().__init__( obj=obj, com=com, @@ -794,6 +798,7 @@ def mean(self, *args, update=None, update_times=None, **kwargs): exponentially weighted mean from the last values and weights. If ``None``, values are assumed to be evenly spaced in time. + This feature is currently unsupported. Returns ------- @@ -820,6 +825,12 @@ def mean(self, *args, update=None, update_times=None, **kwargs): """ result_kwargs = {} is_frame = True if self._selected_obj.ndim == 2 else False + if update_times is not None: + raise NotImplementedError("update_times is not implemented.") + else: + update_deltas = np.ones( + max(self._selected_obj.shape[self.axis - 1] - 1, 0), dtype=np.float64 + ) if update is not None: if self._mean.last_ewm is None: raise ValueError( @@ -842,16 +853,10 @@ def mean(self, *args, update=None, update_times=None, **kwargs): else: result_kwargs["name"] = self._selected_obj.name np_array = self._selected_obj.astype(np.float64).to_numpy() - if update_times is None: - update_times = np.ones( - max(self._selected_obj.shape[self.axis - 1] - 1, 0), dtype=np.float64 - ) - else: - update_times = _calculate_deltas(update_times, self.halflife) ewma_func = generate_online_numba_ewma_func(self.engine_kwargs) result = self._mean.run_ewm( np_array if is_frame else np_array[:, np.newaxis], - update_times, + update_deltas, self.min_periods, ewma_func, ) diff --git a/pandas/tests/window/test_online.py b/pandas/tests/window/test_online.py index 857e4598881ab..c7580650926da 100644 --- a/pandas/tests/window/test_online.py +++ b/pandas/tests/window/test_online.py @@ -46,6 +46,7 @@ def test_online_vs_non_online_mean( online_ewm.reset() + @pytest.mark.xfail(raises=NotImplementedError) @pytest.mark.parametrize( "obj", [DataFrame({"a": range(5), "b": range(5)}), Series(range(5), name="foo")] ) @@ -54,7 +55,7 @@ def test_update_times_mean( ): times = Series( np.array( - ["2020-01-01", "2020-01-02", "2020-01-04", "2020-01-17", "2020-01-21"], + ["2020-01-01", "2020-01-05", "2020-01-07", "2020-01-17", "2020-01-21"], dtype="datetime64", ) ) From 8024a7be3fa56a4217861a9984ceb03be6be45a1 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Wed, 9 Jun 2021 00:17:36 -0700 Subject: [PATCH 17/22] mypy --- pandas/core/generic.py | 2 +- pandas/core/window/ewm.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pandas/core/generic.py b/pandas/core/generic.py index 193b05ffa3082..c568ac774ee14 100644 --- a/pandas/core/generic.py +++ b/pandas/core/generic.py @@ -10892,7 +10892,7 @@ def ewm( span: float | None = None, halflife: float | TimedeltaConvertibleTypes | None = None, alpha: float | None = None, - min_periods: int = 0, + min_periods: int | None = 0, adjust: bool_t = True, ignore_na: bool_t = False, axis: Axis = 0, diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index 2f1643280671c..7190680b4aa96 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -267,7 +267,7 @@ def __init__( span: float | None = None, halflife: float | TimedeltaConvertibleTypes | None = None, alpha: float | None = None, - min_periods: int = 0, + min_periods: int | None = 0, adjust: bool = True, ignore_na: bool = False, axis: Axis = 0, @@ -277,7 +277,7 @@ def __init__( ): super().__init__( obj=obj, - min_periods=max(int(min_periods), 1), + min_periods=1 if min_periods is None else max(int(min_periods), 1), on=None, center=False, closed=None, @@ -711,7 +711,7 @@ def __init__( span: float | None = None, halflife: float | TimedeltaConvertibleTypes | None = None, alpha: float | None = None, - min_periods: int = 0, + min_periods: int | None = 0, adjust: bool = True, ignore_na: bool = False, axis: Axis = 0, From 8a5b0b936845fa99e6a8e0b5e753b8500e436150 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Wed, 9 Jun 2021 11:00:08 -0700 Subject: [PATCH 18/22] Address comments --- doc/source/user_guide/window.rst | 3 +++ doc/source/whatsnew/v1.3.0.rst | 2 +- pandas/core/window/online.py | 11 +++++++++-- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/doc/source/user_guide/window.rst b/doc/source/user_guide/window.rst index e024b16ffc1f4..a510c035a8162 100644 --- a/doc/source/user_guide/window.rst +++ b/doc/source/user_guide/window.rst @@ -112,6 +112,9 @@ the ``update`` argument to continue the windowing calculation. df = pd.DataFrame([[1, 2, 0.6], [2, 3, 0.4], [3, 4, 0.2], [4, 5, 0.7]]) df.ewm(0.5).mean() + +.. ipython:: python + online_ewm = df.head(2).ewm(0.5).online() online_ewm.mean() online_ewm.mean(update=df.tail(1)) diff --git a/doc/source/whatsnew/v1.3.0.rst b/doc/source/whatsnew/v1.3.0.rst index 5c0d391e8048f..53f7fa3183bd5 100644 --- a/doc/source/whatsnew/v1.3.0.rst +++ b/doc/source/whatsnew/v1.3.0.rst @@ -238,7 +238,7 @@ Other enhancements ^^^^^^^^^^^^^^^^^^ - :meth:`DataFrame.rolling`, :meth:`Series.rolling`, :meth:`DataFrame.expanding`, and :meth:`Series.expanding` now support a ``method`` argument with a ``'table'`` option that performs the windowing operation over an entire :class:`DataFrame`. See :ref:`Window Overview ` for performance and functional benefits (:issue:`15095`, :issue:`38995`) -- :class:`.ExponentialMovingWindow` now support a ``online`` that can perform ``mean`` calculations in an online fashion. See :ref:`Window Overview ` (:issue:`41673`) +- :class:`.ExponentialMovingWindow` now support a ``online`` method that can perform ``mean`` calculations in an online fashion. See :ref:`Window Overview ` (:issue:`41673`) - Added :meth:`MultiIndex.dtypes` (:issue:`37062`) - Added ``end`` and ``end_day`` options for the ``origin`` argument in :meth:`DataFrame.resample` (:issue:`37804`) - Improved error message when ``usecols`` and ``names`` do not match for :func:`read_csv` and ``engine="c"`` (:issue:`29042`) diff --git a/pandas/core/window/online.py b/pandas/core/window/online.py index 3cc88cee892c3..9864d91c09a57 100644 --- a/pandas/core/window/online.py +++ b/pandas/core/window/online.py @@ -44,18 +44,25 @@ def online_ewma( adjust: bool, ignore_na: bool, ): + """ + Compute online exponentially weighted mean per column over 2D values. + + Takes the first observation as is, then computes the subsequent + exponentially weighted mean accounting minimum periods. + """ result = np.empty(values.shape) weighted_avg = values[0] nobs = (~np.isnan(weighted_avg)).astype(np.int64) result[0] = np.where(nobs >= minimum_periods, weighted_avg, np.nan) - for i in range(1, len(values)): + for i in numba.prange(1, len(values)): cur = values[i] is_observations = ~np.isnan(cur) nobs += is_observations.astype(np.int64) - for j in range(len(cur)): + for j in numba.prange(len(cur)): if not np.isnan(weighted_avg[j]): if is_observations[j] or not ignore_na: + # note that len(deltas) = len(vals) - 1 and deltas[i] is to be # used in conjunction with vals[i+1] old_wt[j] *= old_wt_factor ** deltas[j - 1] From e790947f592b6df73e0375fae26d22215e6bb58e Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Wed, 9 Jun 2021 12:48:49 -0700 Subject: [PATCH 19/22] Fix doctest --- pandas/core/window/ewm.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index 7190680b4aa96..7c24954982ea8 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -809,17 +809,17 @@ def mean(self, *args, update=None, update_times=None, **kwargs): >>> df = pd.DataFrame({"a": range(5), "b": range(5, 10)}) >>> online_ewm = df.head(2).ewm(0.5).online() >>> online_ewm.mean() - 0 1 + a b 0 0.00 5.00 1 0.75 5.75 >>> online_ewm.mean(update=df.tail(3)) - 0 1 + a b 1 1.615385 6.615385 2 2.550000 7.550000 3 3.520661 8.520661 >>> online_ewm.reset() >>> online_ewm.mean() - 0 1 + a b 0 0.00 5.00 1 0.75 5.75 """ From 175c4cafef7cf8d4b5be8b89b91769f89e794e6a Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Thu, 10 Jun 2021 19:34:46 -0700 Subject: [PATCH 20/22] Fix doctest --- pandas/core/window/ewm.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pandas/core/window/ewm.py b/pandas/core/window/ewm.py index 7c24954982ea8..c1d532d94eb83 100644 --- a/pandas/core/window/ewm.py +++ b/pandas/core/window/ewm.py @@ -814,9 +814,9 @@ def mean(self, *args, update=None, update_times=None, **kwargs): 1 0.75 5.75 >>> online_ewm.mean(update=df.tail(3)) a b - 1 1.615385 6.615385 - 2 2.550000 7.550000 - 3 3.520661 8.520661 + 2 1.615385 6.615385 + 3 2.550000 7.550000 + 4 3.520661 8.520661 >>> online_ewm.reset() >>> online_ewm.mean() a b From 2cb40190c678907cd972d4cff13c249ccc582fab Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Thu, 10 Jun 2021 22:08:28 -0700 Subject: [PATCH 21/22] Cannot parallelize a loop --- pandas/core/window/online.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pandas/core/window/online.py b/pandas/core/window/online.py index 9864d91c09a57..5a9e8d65255ae 100644 --- a/pandas/core/window/online.py +++ b/pandas/core/window/online.py @@ -55,7 +55,7 @@ def online_ewma( nobs = (~np.isnan(weighted_avg)).astype(np.int64) result[0] = np.where(nobs >= minimum_periods, weighted_avg, np.nan) - for i in numba.prange(1, len(values)): + for i in range(1, len(values)): cur = values[i] is_observations = ~np.isnan(cur) nobs += is_observations.astype(np.int64) From fea8b0b5e848779b3fdc62b890e71cba0df00308 Mon Sep 17 00:00:00 2001 From: Matthew Roeschke Date: Thu, 10 Jun 2021 22:22:41 -0700 Subject: [PATCH 22/22] Trigger CI