Skip to content

3.x: ReplayProcessor retains unused object #7417

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
ansip opened this issue May 9, 2022 · 2 comments
Closed

3.x: ReplayProcessor retains unused object #7417

ansip opened this issue May 9, 2022 · 2 comments

Comments

@ansip
Copy link
Contributor

ansip commented May 9, 2022

Hi,

I've encountered the issue using ReplayProcessor
The problem is that it keeps previous published object, in my case this object is about 600MB

I'm aware of javadoc and cleanupBuffer method

  • Note that due to concurrency requirements, a size- and time-bounded {@code ReplayProcessor} may hold strong references to more
  • source emissions than specified while it isn't terminated yet. Use the {@link #cleanupBuffer()} to allow
  • such inaccessible items to be cleaned up by GC once no consumer references them anymore.

but this api looks a bit tricky and inconvinient especially if you work with processor via abstract class for example FlowableProcessor

I think we could get rid from cleanupBuffer and trimHead if we change algorithm a bit
Basically the idea is to make Node/TimedNode mutable and fill values in node which is pointed by tail, then shift tail to a new empty node, hence trim method will remove head with relevant data when conditions met

If you don't mind I can raise a PR, and there we can discuss more specifically

@akarnokd
Copy link
Member

akarnokd commented May 9, 2022

Hi.

What you are suggesting has been evaluated and rejected back then when cleanupBuffer was introduced as the workaround. See #5892, #5898 and #6532.

@akarnokd akarnokd closed this as completed May 9, 2022
@ansip
Copy link
Contributor Author

ansip commented May 9, 2022

sorry maybe I didn't get you correct
I've checked these 3 PRs
#5892 and #5898 - here just added trim* methods
#6532 - described how to create new head with null value

but I didn't find any description/drawbacks of the solution I suggested

if you mean this
Nulling out this reference is not possible at this point because old consumers may be still walking through the underlying linked list of nodes then this is not what I meant

I suggest to set value in node

    @Override
    public void next(T value) {
        Node<T> n = new Node<>(null);
        Node<T> t = tail;

        tail = n;
        size++;
        t.value = value; //new value is set here
        t.set(n); // this guarantees us happens before

        trim();
    }

in replay method we check whether next node is set and then take the value
I'm not going to set value to null in nodes so all subscribers will be in consistent state

to illustrate processor with size(1)

initial state: head/tail(null)
onNext(1): head(1) -> tail(null)
onNext(2): head(2) -> tail(null) - because trim method shifted head to the next

in other words we always have one extra node which will be filled in the next 'onNext' method call

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants