Skip to content

Ability to do concurrent dispatches both on the async as well as the sync consumer #866

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 24, 2020

Conversation

danielmarbach
Copy link
Collaborator

@danielmarbach danielmarbach commented Jun 15, 2020

Proposed Changes

An alternative to #806

If the interface is not extended then this can be added in a non breaking way on the level of the connection factory and later could be propagated to the interfaces in v7. I currently named the property ProcessingConcurrency to convey the intent for both the sync and the async case

Types of Changes

What types of changes does your code introduce to this project?
Put an x in the boxes that apply

  • Bug fix (non-breaking change which fixes issue #NNNN)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause an observable behavior change in existing systems)
  • Documentation improvements (corrections, new content, etc)
  • Cosmetic change (whitespace, formatting, etc)

Checklist

Put an x in the boxes that apply. You can also fill these out after creating
the PR. If you're unsure about any of them, don't hesitate to ask on the
mailing list. We're here to help! This is simply a reminder of what we are
going to look for before merging your code.

  • I have read the CONTRIBUTING.md document
  • I have signed the CA (see https://cla.pivotal.io/sign/rabbitmq)
  • All tests pass locally with my changes
  • I have added tests that prove my fix is effective or that my feature works
  • I have added necessary documentation (if appropriate)
  • Any dependent changes have been merged and published in related repositories

Further Comments

If this is a relatively large or complex change, kick off the discussion by
explaining why you chose the solution you did and what alternatives you
considered, etc.

@danielmarbach
Copy link
Collaborator Author

@stebet what do you think about this spike

public AsyncConsumerWorkService(int concurrency)
{
_concurrency = concurrency;
_startNewWorkPoolFunc = model => StartNewWorkPool(model);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would introduce a capture of this

@stebet
Copy link
Contributor

stebet commented Jun 15, 2020

I'll take a look at this tomorrow :)

@stebet
Copy link
Contributor

stebet commented Jun 16, 2020

This looks good to me. I'm wondering if it'd be better to default AsyncConsumerConcurrency to Environment.ProcessorCount if it's enabled. That'd mean two properties though, AsyncConsumerConcurrencyEnabled (false when AsyncConsumerConcurrencyCount == 1, when set to true, would set AsyncConsumerConcurrencyCount = Environment.ProcessorCount if AsyncConsumerConcurrencyCount == 1).

Maybe that's just making things a bit convoluted perhaps... what do you think?

@danielmarbach
Copy link
Collaborator Author

My thought process was that Concurrency = 1 is just a special case of Concurrency = N that's why I went without having an additional bool, because then you only need to set it to something sensible which than automatically switches to fan out.

@danielmarbach danielmarbach force-pushed the concurrent-dispatch branch 2 times, most recently from 162e193 to fbbbeba Compare June 23, 2020 07:08
@danielmarbach danielmarbach changed the title Quick spike to see how to achieve concurrent dispatch Ability to do concurrent dispatches both on the async as well as the sync consumer Jun 23, 2020
@danielmarbach
Copy link
Collaborator Author

I've pushed a few changes to the tests to make them more reliable.

@danielmarbach
Copy link
Collaborator Author

@michaelklishin @lukebakken any thoughts? This could go in 6.2 if you guys are willing to take it

@lukebakken
Copy link
Collaborator

I've been busy with other work. I'll try to get to this PR today. We have some bug fixes that are planned for 6.1.1 but it sounds like 6.2.0 should be the next release.

@lukebakken lukebakken added this to the 6.2.0 milestone Jun 24, 2020
@danielmarbach
Copy link
Collaborator Author

No stress @lukebakken

@lukebakken
Copy link
Collaborator

Thanks @danielmarbach and @stebet for the review.

@lukebakken lukebakken merged commit 05d0930 into rabbitmq:master Jun 24, 2020
lukebakken added a commit that referenced this pull request Jun 24, 2020
Ability to do concurrent dispatches both on the async as well as the sync consumer

(cherry picked from commit 05d0930)
@danielmarbach danielmarbach deleted the concurrent-dispatch branch June 24, 2020 18:02
@danielmarbach
Copy link
Collaborator Author

@lukebakken where should we track whether this new property should be moved to the interface or not? I did deliberately not add it there to not add a breaking change. Ideally an issue that is added to the next major version with a breaking change label would be awesome

@lukebakken
Copy link
Collaborator

@danielmarbach we can create a new issue for that. I'm mid-review / testing of #868 / #878 right now 😄

@shaneqld
Copy link

Have BasicAck and BasicNack been made thread-safe yet? If not, users will run into strange exceptions (mostly protocol errors) when they work on highly concurrent workloads sans workarounds.

@stebet
Copy link
Contributor

stebet commented Jun 25, 2020

Have BasicAck and BasicNack been made thread-safe yet? If not, users will run into strange exceptions (mostly protocol errors) when they work on highly concurrent workloads sans workarounds.

Do you have a repro you can share? Would love to dig into if this is still a problem.

BasicAck and BasicNack are IModel operations, and it is not supported to share the same IModel instance between threads.

@danielmarbach
Copy link
Collaborator Author

I'd be interested to see and hear this as well. We are doing BasicAck and BasicNack since multiple versions concurrently. Is that something that has regressed recently?

@michaelklishin
Copy link
Contributor

michaelklishin commented Jun 25, 2020

@shaneqld @danielmarbach @stebet concurrent acknowledgments of single deliveries are safe. Acknowledgments with multiple set to true can indeed cause protocol channel exceptions because double acking is an error in the protocol. But this can happen even with a single thread of execution. This is also not a novel problem, although I see that our doc section on concurrency of consumers does not cover this very well.

So I don't see how this approach changes anything for basic.ack and basic.{nack,reject} operations.

@danielmarbach
Copy link
Collaborator Author

danielmarbach commented Jun 25, 2020 via email

@shaneqld
Copy link

I'd be interested to see and hear this as well. We are doing BasicAck and BasicNack since multiple versions concurrently. Is that something that has regressed recently?

I ran into this in a previous version with a different implementation of concurrency, but have been unable to reproduce in this latest version. When I have a moment I will update a project that has this workaround for the protocol error (and remove said workaround) and let you know if I run into any issues.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants