Skip to content

Commit d42c17b

Browse files
author
Matthew Sackman
committed
Merging bug 22587 into default
2 parents 234284c + 4ad1795 commit d42c17b

File tree

3 files changed

+155
-3
lines changed

3 files changed

+155
-3
lines changed

projects/client/RabbitMQ.Client/src/client/api/IModel.cs

100644100755
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,30 @@ public interface IModel: IDisposable
9393
///</remarks>
9494
event CallbackExceptionEventHandler CallbackException;
9595

96+
///<summary>Signalled when an unexpected message is delivered
97+
///
98+
/// Under certain circumstances it is possible for a channel to receive a
99+
/// message delivery which does not match any consumer which is currently
100+
/// set up via basicConsume(). This will occur after the following sequence
101+
/// of events:
102+
///
103+
/// ctag = basicConsume(queue, consumer); // i.e. with explicit acks
104+
/// // some deliveries take place but are not acked
105+
/// basicCancel(ctag);
106+
/// basicRecover(false);
107+
///
108+
/// Since requeue is specified to be false in the basicRecover, the spec
109+
/// states that the message must be redelivered to "the original recipient"
110+
/// - i.e. the same channel / consumer-tag. But the consumer is no longer
111+
/// active.
112+
///
113+
/// In these circumstances, you can register a default consumer to handle
114+
/// such deliveries. If no default consumer is registered an
115+
/// InvalidOperationException will be thrown when such a delivery arrives.
116+
///
117+
/// Most people will not need to use this.</summary>
118+
IBasicConsumer DefaultConsumer { get; set; }
119+
96120
///<summary>Returns null if the session is still in a state
97121
///where it can be used, or the cause of its closure
98122
///otherwise.</summary>

projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs

100644100755
Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,9 @@ public event CallbackExceptionEventHandler CallbackException
146146
m_callbackException -= value;
147147
}
148148
}
149-
}
149+
}
150+
151+
public IBasicConsumer DefaultConsumer { get; set; }
150152

151153
public ISession m_session;
152154

@@ -357,8 +359,14 @@ public void HandleBasicDeliver(string consumerTag,
357359
}
358360
if (consumer == null)
359361
{
360-
// FIXME: what is an appropriate thing to do here?
361-
throw new NotSupportedException("FIXME unsolicited delivery for consumer tag " + consumerTag);
362+
if (DefaultConsumer == null) {
363+
throw new InvalidOperationException("Unsolicited delivery -" +
364+
" see IModel.DefaultConsumer to handle this" +
365+
" case.");
366+
}
367+
else {
368+
consumer = DefaultConsumer;
369+
}
362370
}
363371

364372
try {
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 1.1.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial
8+
// Technologies LLC., and Rabbit Technologies Ltd.
9+
//
10+
// Licensed under the Apache License, Version 2.0 (the "License");
11+
// you may not use this file except in compliance with the License.
12+
// You may obtain a copy of the License at
13+
//
14+
// http://www.apache.org/licenses/LICENSE-2.0
15+
//
16+
// Unless required by applicable law or agreed to in writing, software
17+
// distributed under the License is distributed on an "AS IS" BASIS,
18+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19+
// See the License for the specific language governing permissions and
20+
// limitations under the License.
21+
//---------------------------------------------------------------------------
22+
//
23+
// The MPL v1.1:
24+
//
25+
//---------------------------------------------------------------------------
26+
// The contents of this file are subject to the Mozilla Public License
27+
// Version 1.1 (the "License"); you may not use this file except in
28+
// compliance with the License. You may obtain a copy of the License at
29+
// http://www.rabbitmq.com/mpl.html
30+
//
31+
// Software distributed under the License is distributed on an "AS IS"
32+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
33+
// License for the specific language governing rights and limitations
34+
// under the License.
35+
//
36+
// The Original Code is The RabbitMQ .NET Client.
37+
//
38+
// The Initial Developers of the Original Code are LShift Ltd,
39+
// Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
40+
//
41+
// Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
42+
// Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
43+
// are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
44+
// Technologies LLC, and Rabbit Technologies Ltd.
45+
//
46+
// Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
47+
// Ltd. Portions created by Cohesive Financial Technologies LLC are
48+
// Copyright (C) 2007-2010 Cohesive Financial Technologies
49+
// LLC. Portions created by Rabbit Technologies Ltd are Copyright
50+
// (C) 2007-2010 Rabbit Technologies Ltd.
51+
//
52+
// All Rights Reserved.
53+
//
54+
// Contributor(s): ______________________________________.
55+
//
56+
//---------------------------------------------------------------------------
57+
using NUnit.Framework;
58+
59+
using System;
60+
using System.IO;
61+
using System.Text;
62+
using System.Collections;
63+
64+
using RabbitMQ.Client.Impl;
65+
using RabbitMQ.Client.Exceptions;
66+
using RabbitMQ.Client.Events;
67+
using RabbitMQ.Util;
68+
69+
namespace RabbitMQ.Client.Unit
70+
{
71+
[TestFixture]
72+
public class TestRecoverAfterCancel
73+
{
74+
IConnection Connection;
75+
IModel Channel;
76+
String Queue;
77+
78+
public int ModelNumber(IModel model)
79+
{
80+
return ((ModelBase)model).m_session.ChannelNumber;
81+
}
82+
83+
[SetUp] public void Connect()
84+
{
85+
Connection = new ConnectionFactory().CreateConnection();
86+
Channel = Connection.CreateModel();
87+
Queue = Channel.QueueDeclare();
88+
}
89+
90+
[TearDown] public void Disconnect()
91+
{
92+
Channel.Close();
93+
Connection.Close();
94+
}
95+
96+
[Test]
97+
public void TestRecoverAfterCancel_()
98+
{
99+
UTF8Encoding enc = new UTF8Encoding();
100+
Channel.BasicPublish("", Queue, null, enc.GetBytes("message"));
101+
QueueingBasicConsumer Consumer = new QueueingBasicConsumer(Channel);
102+
QueueingBasicConsumer DefaultConsumer = new QueueingBasicConsumer(Channel);
103+
Channel.DefaultConsumer = DefaultConsumer;
104+
String CTag = Channel.BasicConsume(Queue, null, Consumer);
105+
BasicDeliverEventArgs Event = (BasicDeliverEventArgs) Consumer.Queue.Dequeue();
106+
Channel.BasicCancel(CTag);
107+
Channel.BasicRecover(false);
108+
109+
// The server will now redeliver us the first message again, with the
110+
// same ctag, but we're not set up to handle it with a standard
111+
// consumer - it should end up with the default one.
112+
113+
BasicDeliverEventArgs Event2 = (BasicDeliverEventArgs) DefaultConsumer.Queue.Dequeue();
114+
115+
Assert.AreEqual(Event.Body, Event2.Body);
116+
Assert.IsFalse(Event.Redelivered);
117+
Assert.IsTrue(Event2.Redelivered);
118+
}
119+
}
120+
}

0 commit comments

Comments
 (0)