Skip to content

ENH: Add online operations for EWM.mean #41888

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 30 commits into from
Jun 12, 2021
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
e195c58
Add scaffolding for online EWM
mroeschke May 31, 2021
3d95167
Merge remote-tracking branch 'upstream/master' into online/ewm
mroeschke May 31, 2021
9354bd0
Add online op and new methods and class
mroeschke Jun 7, 2021
0ce197d
Make signatures match
mroeschke Jun 7, 2021
8096cc6
Add some tests, rename some variables
mroeschke Jun 7, 2021
a5273b9
Add newline for readability
mroeschke Jun 7, 2021
bab78cc
Parameterize over adjust and ignore_na
mroeschke Jun 7, 2021
d72a03e
Test resetting in tests
mroeschke Jun 7, 2021
0b7e773
Add test with invalid update
mroeschke Jun 7, 2021
8444b42
Add docstring for mean
mroeschke Jun 7, 2021
7847373
Add docstring for online
mroeschke Jun 7, 2021
df13b55
Parameterize over dataframe and series
mroeschke Jun 7, 2021
57db06e
Generalize axis call for update_times
mroeschke Jun 7, 2021
329dbd2
Remove comments
mroeschke Jun 7, 2021
9594afe
Merge remote-tracking branch 'upstream/master' into online/ewm
mroeschke Jun 8, 2021
28be18a
Add more test and ensure constructions
mroeschke Jun 8, 2021
85025ff
Passing all the non-time tests
mroeschke Jun 8, 2021
3345271
Add whatsnew and window.rst; xfail update_times
mroeschke Jun 9, 2021
2186ea0
Merge remote-tracking branch 'upstream/master' into online/ewm
mroeschke Jun 9, 2021
8024a7b
mypy
mroeschke Jun 9, 2021
80c8b7f
Merge remote-tracking branch 'upstream/master' into online/ewm
mroeschke Jun 9, 2021
8a5b0b9
Address comments
mroeschke Jun 9, 2021
e790947
Fix doctest
mroeschke Jun 9, 2021
916e68b
Merge remote-tracking branch 'upstream/master' into online/ewm
mroeschke Jun 11, 2021
175c4ca
Fix doctest
mroeschke Jun 11, 2021
f799a0f
Merge remote-tracking branch 'upstream/master' into online/ewm
mroeschke Jun 11, 2021
c8b09b6
Merge remote-tracking branch 'upstream/master' into online/ewm
mroeschke Jun 11, 2021
2cb4019
Cannot parallelize a loop
mroeschke Jun 11, 2021
fea8b0b
Trigger CI
mroeschke Jun 11, 2021
04ea064
Merge remote-tracking branch 'upstream/master' into online/ewm
mroeschke Jun 12, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 28 additions & 8 deletions doc/source/user_guide/window.rst
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ pandas supports 4 types of windowing operations:
#. Expanding window: Accumulating window over the values.
#. Exponentially Weighted window: Accumulating and exponentially weighted window over the values.

============================= ================= =========================== =========================== ======================== ===================================
Concept Method Returned Object Supports time-based windows Supports chained groupby Supports table method
============================= ================= =========================== =========================== ======================== ===================================
Rolling window ``rolling`` ``Rolling`` Yes Yes Yes (as of version 1.3)
Weighted window ``rolling`` ``Window`` No No No
Expanding window ``expanding`` ``Expanding`` No Yes Yes (as of version 1.3)
Exponentially Weighted window ``ewm`` ``ExponentialMovingWindow`` No Yes (as of version 1.2) No
============================= ================= =========================== =========================== ======================== ===================================
============================= ================= =========================== =========================== ======================== =================================== ===========================
Concept Method Returned Object Supports time-based windows Supports chained groupby Supports table method Supports online operations
============================= ================= =========================== =========================== ======================== =================================== ===========================
Rolling window ``rolling`` ``Rolling`` Yes Yes Yes (as of version 1.3) No
Weighted window ``rolling`` ``Window`` No No No No
Expanding window ``expanding`` ``Expanding`` No Yes Yes (as of version 1.3) No
Exponentially Weighted window ``ewm`` ``ExponentialMovingWindow`` No Yes (as of version 1.2) No Yes (as of version 1.3)
============================= ================= =========================== =========================== ======================== =================================== ===========================

As noted above, some operations support specifying a window based on a time offset:

Expand Down Expand Up @@ -98,6 +98,26 @@ be calculated with :meth:`~Rolling.apply` by specifying a separate column of wei
df = pd.DataFrame([[1, 2, 0.6], [2, 3, 0.4], [3, 4, 0.2], [4, 5, 0.7]])
df.rolling(2, method="table", min_periods=0).apply(weighted_mean, raw=True, engine="numba") # noqa:E501

.. versionadded:: 1.3

Some windowing operations also support an ``online`` method after constructing a windowing object
which returns a new object that supports passing in new :class:`DataFrame` or :class:`Series` objects
to continue the windowing calculation with the new values (i.e. online calculations).

The methods on this new windowing objects must call the aggregation method first to "prime" the initial
state of the online calculation. Then, new :class:`DataFrame` or :class:`Series` objects can be passed in
the ``update`` argument to continue the windowing calculation.

.. ipython:: python

df = pd.DataFrame([[1, 2, 0.6], [2, 3, 0.4], [3, 4, 0.2], [4, 5, 0.7]])
df.ewm(0.5).mean()

.. ipython:: python

online_ewm = df.head(2).ewm(0.5).online()
online_ewm.mean()
online_ewm.mean(update=df.tail(1))

All windowing operations support a ``min_periods`` argument that dictates the minimum amount of
non-``np.nan`` values a window must have; otherwise, the resulting value is ``np.nan``.
Expand Down
3 changes: 2 additions & 1 deletion doc/source/whatsnew/v1.3.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ For example:
Other enhancements
^^^^^^^^^^^^^^^^^^

- :meth:`DataFrame.rolling`, :meth:`Series.rolling`, :meth:`DataFrame.expanding`, and :meth:`Series.expanding` now support a ``method`` argument with a ``'table'`` option that performs the windowing operation over an entire DataFrame. See :ref:`Window Overview <window.overview>` for performance and functional benefits (:issue:`15095`, :issue:`38995`)
- :meth:`DataFrame.rolling`, :meth:`Series.rolling`, :meth:`DataFrame.expanding`, and :meth:`Series.expanding` now support a ``method`` argument with a ``'table'`` option that performs the windowing operation over an entire :class:`DataFrame`. See :ref:`Window Overview <window.overview>` for performance and functional benefits (:issue:`15095`, :issue:`38995`)
- :class:`.ExponentialMovingWindow` now support a ``online`` method that can perform ``mean`` calculations in an online fashion. See :ref:`Window Overview <window.overview>` (:issue:`41673`)
- Added :meth:`MultiIndex.dtypes` (:issue:`37062`)
- Added ``end`` and ``end_day`` options for the ``origin`` argument in :meth:`DataFrame.resample` (:issue:`37804`)
- Improved error message when ``usecols`` and ``names`` do not match for :func:`read_csv` and ``engine="c"`` (:issue:`29042`)
Expand Down
2 changes: 1 addition & 1 deletion pandas/core/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -10893,7 +10893,7 @@ def ewm(
span: float | None = None,
halflife: float | TimedeltaConvertibleTypes | None = None,
alpha: float | None = None,
min_periods: int = 0,
min_periods: int | None = 0,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this (and the changed annotation below) orthogonal to the rest of the PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit.

EWM.min_periods was typed as int

It's parent class typed min_periods as int | None.

Since I was calling super needed to get the types and some code aligned.

adjust: bool_t = True,
ignore_na: bool_t = False,
axis: Axis = 0,
Expand Down
214 changes: 212 additions & 2 deletions pandas/core/window/ewm.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@
GroupbyIndexer,
)
from pandas.core.window.numba_ import generate_numba_ewma_func
from pandas.core.window.online import (
EWMMeanState,
generate_online_numba_ewma_func,
)
from pandas.core.window.rolling import (
BaseWindow,
BaseWindowGroupby,
Expand Down Expand Up @@ -263,7 +267,7 @@ def __init__(
span: float | None = None,
halflife: float | TimedeltaConvertibleTypes | None = None,
alpha: float | None = None,
min_periods: int = 0,
min_periods: int | None = 0,
adjust: bool = True,
ignore_na: bool = False,
axis: Axis = 0,
Expand All @@ -273,7 +277,7 @@ def __init__(
):
super().__init__(
obj=obj,
min_periods=max(int(min_periods), 1),
min_periods=1 if min_periods is None else max(int(min_periods), 1),
on=None,
center=False,
closed=None,
Expand Down Expand Up @@ -338,6 +342,48 @@ def _get_window_indexer(self) -> BaseIndexer:
"""
return ExponentialMovingWindowIndexer()

def online(self, engine="numba", engine_kwargs=None):
"""
Return an ``OnlineExponentialMovingWindow`` object to calculate
exponentially moving window aggregations in an online method.

.. versionadded:: 1.3.0

Parameters
----------
engine: str, default ``'numba'``
Execution engine to calculate online aggregations.
Applies to all supported aggregation methods.

engine_kwargs : dict, default None
Applies to all supported aggregation methods.

* For ``'numba'`` engine, the engine can accept ``nopython``, ``nogil``
and ``parallel`` dictionary keys. The values must either be ``True`` or
``False``. The default ``engine_kwargs`` for the ``'numba'`` engine is
``{{'nopython': True, 'nogil': False, 'parallel': False}}`` and will be
applied to the function

Returns
-------
OnlineExponentialMovingWindow
"""
return OnlineExponentialMovingWindow(
obj=self.obj,
com=self.com,
span=self.span,
halflife=self.halflife,
alpha=self.alpha,
min_periods=self.min_periods,
adjust=self.adjust,
ignore_na=self.ignore_na,
axis=self.axis,
times=self.times,
engine=engine,
engine_kwargs=engine_kwargs,
selection=self._selection,
)

@doc(
_shared_docs["aggregate"],
see_also=dedent(
Expand Down Expand Up @@ -655,3 +701,167 @@ def _get_window_indexer(self) -> GroupbyIndexer:
window_indexer=ExponentialMovingWindowIndexer,
)
return window_indexer


class OnlineExponentialMovingWindow(ExponentialMovingWindow):
def __init__(
self,
obj: FrameOrSeries,
com: float | None = None,
span: float | None = None,
halflife: float | TimedeltaConvertibleTypes | None = None,
alpha: float | None = None,
min_periods: int | None = 0,
adjust: bool = True,
ignore_na: bool = False,
axis: Axis = 0,
times: str | np.ndarray | FrameOrSeries | None = None,
engine: str = "numba",
engine_kwargs: dict[str, bool] | None = None,
*,
selection=None,
):
if times is not None:
raise NotImplementedError(
"times is not implemented with online operations."
)
super().__init__(
obj=obj,
com=com,
span=span,
halflife=halflife,
alpha=alpha,
min_periods=min_periods,
adjust=adjust,
ignore_na=ignore_na,
axis=axis,
times=times,
selection=selection,
)
self._mean = EWMMeanState(
self._com, self.adjust, self.ignore_na, self.axis, obj.shape
)
if maybe_use_numba(engine):
self.engine = engine
self.engine_kwargs = engine_kwargs
else:
raise ValueError("'numba' is the only supported engine")

def reset(self):
"""
Reset the state captured by `update` calls.
"""
self._mean.reset()

def aggregate(self, func, *args, **kwargs):
return NotImplementedError

def std(self, bias: bool = False, *args, **kwargs):
return NotImplementedError

def corr(
self,
other: FrameOrSeriesUnion | None = None,
pairwise: bool | None = None,
**kwargs,
):
return NotImplementedError

def cov(
self,
other: FrameOrSeriesUnion | None = None,
pairwise: bool | None = None,
bias: bool = False,
**kwargs,
):
return NotImplementedError

def var(self, bias: bool = False, *args, **kwargs):
return NotImplementedError

def mean(self, *args, update=None, update_times=None, **kwargs):
"""
Calculate an online exponentially weighted mean.

Parameters
----------
update: DataFrame or Series, default None
New values to continue calculating the
exponentially weighted mean from the last values and weights.
Values should be float64 dtype.

``update`` needs to be ``None`` the first time the
exponentially weighted mean is calculated.

update_times: Series or 1-D np.ndarray, default None
New times to continue calculating the
exponentially weighted mean from the last values and weights.
If ``None``, values are assumed to be evenly spaced
in time.
This feature is currently unsupported.

Returns
-------
DataFrame or Series

Examples
--------
>>> df = pd.DataFrame({"a": range(5), "b": range(5, 10)})
>>> online_ewm = df.head(2).ewm(0.5).online()
>>> online_ewm.mean()
a b
0 0.00 5.00
1 0.75 5.75
>>> online_ewm.mean(update=df.tail(3))
a b
2 1.615385 6.615385
3 2.550000 7.550000
4 3.520661 8.520661
>>> online_ewm.reset()
>>> online_ewm.mean()
a b
0 0.00 5.00
1 0.75 5.75
"""
result_kwargs = {}
is_frame = True if self._selected_obj.ndim == 2 else False
if update_times is not None:
raise NotImplementedError("update_times is not implemented.")
else:
update_deltas = np.ones(
max(self._selected_obj.shape[self.axis - 1] - 1, 0), dtype=np.float64
)
if update is not None:
if self._mean.last_ewm is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A user needs to call mean() first then can call mean(update=new_df)

This checks that mean() was called first.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, and test for this? (with good error message)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

raise ValueError(
"Must call mean with update=None first before passing update"
)
result_from = 1
result_kwargs["index"] = update.index
if is_frame:
last_value = self._mean.last_ewm[np.newaxis, :]
result_kwargs["columns"] = update.columns
else:
last_value = self._mean.last_ewm
result_kwargs["name"] = update.name
np_array = np.concatenate((last_value, update.to_numpy()))
else:
result_from = 0
result_kwargs["index"] = self._selected_obj.index
if is_frame:
result_kwargs["columns"] = self._selected_obj.columns
else:
result_kwargs["name"] = self._selected_obj.name
np_array = self._selected_obj.astype(np.float64).to_numpy()
ewma_func = generate_online_numba_ewma_func(self.engine_kwargs)
result = self._mean.run_ewm(
np_array if is_frame else np_array[:, np.newaxis],
update_deltas,
self.min_periods,
ewma_func,
)
if not is_frame:
result = result.squeeze()
result = result[result_from:]
result = self._selected_obj._constructor(result, **result_kwargs)
return result
Loading