Skip to content

Commit 44dbd0a

Browse files
authored
Merge pull request #1702 from rabbitmq/lukebakken/pub-conf-code-moving
Isolate publisher confirmation code
2 parents ecd948c + 7ebc77b commit 44dbd0a

File tree

5 files changed

+571
-408
lines changed

5 files changed

+571
-408
lines changed

.ci/ubuntu/gha-log-check.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,6 @@ declare -r rabbitmq_docker_name="$docker_name_prefix-rabbitmq"
1111

1212
if docker logs "$rabbitmq_docker_name" | grep -iF inet_error
1313
then
14-
echo '[ERROR] found inet_error in RabbitMQ logs' 1>&2
15-
exit 1
14+
echo '[WARNING] found inet_error in RabbitMQ logs' 1>&2
15+
exit 0
1616
fi

.ci/windows/gha-log-check.ps1

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,6 @@ Write-Host "[INFO] looking for errors in '$rabbitmq_log_dir'"
88

99
If (Get-ChildItem $rabbitmq_log_dir\*.log | Select-String -Quiet -SimpleMatch -Pattern inet_error)
1010
{
11-
Write-Error "[ERROR] found inet_error in '$rabbitmq_log_dir'"
12-
exit 1
11+
Write-Error "[WARNING] found inet_error in '$rabbitmq_log_dir'"
12+
exit 0
1313
}
Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// https://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v2.0:
23+
//
24+
//---------------------------------------------------------------------------
25+
// This Source Code Form is subject to the terms of the Mozilla Public
26+
// License, v. 2.0. If a copy of the MPL was not distributed with this
27+
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
28+
//
29+
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
30+
//---------------------------------------------------------------------------
31+
32+
using System;
33+
using System.Collections.Generic;
34+
using System.Diagnostics;
35+
using System.Threading;
36+
using System.Threading.Tasks;
37+
using RabbitMQ.Client.Framing;
38+
using RabbitMQ.Client.Util;
39+
40+
namespace RabbitMQ.Client.Impl
41+
{
42+
internal partial class Channel : IChannel, IRecoverable
43+
{
44+
public async ValueTask BasicPublishAsync<TProperties>(string exchange, string routingKey,
45+
bool mandatory, TProperties basicProperties, ReadOnlyMemory<byte> body,
46+
CancellationToken cancellationToken = default)
47+
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
48+
{
49+
PublisherConfirmationInfo? publisherConfirmationInfo = null;
50+
try
51+
{
52+
publisherConfirmationInfo =
53+
await MaybeStartPublisherConfirmationTracking(cancellationToken)
54+
.ConfigureAwait(false);
55+
56+
await EnforceFlowControlAsync(cancellationToken)
57+
.ConfigureAwait(false);
58+
59+
var cmd = new BasicPublish(exchange, routingKey, mandatory, default);
60+
61+
using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
62+
? RabbitMQActivitySource.Send(routingKey, exchange, body.Length)
63+
: default;
64+
65+
ulong publishSequenceNumber = 0;
66+
if (publisherConfirmationInfo is not null)
67+
{
68+
publishSequenceNumber = publisherConfirmationInfo.PublishSequenceNumber;
69+
}
70+
71+
BasicProperties? props = PopulateBasicPropertiesHeaders(basicProperties, sendActivity, publishSequenceNumber);
72+
if (props is null)
73+
{
74+
await ModelSendAsync(in cmd, in basicProperties, body, cancellationToken)
75+
.ConfigureAwait(false);
76+
}
77+
else
78+
{
79+
await ModelSendAsync(in cmd, in props, body, cancellationToken)
80+
.ConfigureAwait(false);
81+
}
82+
}
83+
catch (Exception ex)
84+
{
85+
bool exceptionWasHandled =
86+
MaybeHandleExceptionWithEnabledPublisherConfirmations(publisherConfirmationInfo, ex);
87+
if (!exceptionWasHandled)
88+
{
89+
throw;
90+
}
91+
}
92+
finally
93+
{
94+
await MaybeEndPublisherConfirmationTracking(publisherConfirmationInfo, cancellationToken)
95+
.ConfigureAwait(false);
96+
}
97+
}
98+
99+
public async ValueTask BasicPublishAsync<TProperties>(CachedString exchange, CachedString routingKey,
100+
bool mandatory, TProperties basicProperties, ReadOnlyMemory<byte> body,
101+
CancellationToken cancellationToken = default)
102+
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
103+
{
104+
PublisherConfirmationInfo? publisherConfirmationInfo = null;
105+
try
106+
{
107+
publisherConfirmationInfo =
108+
await MaybeStartPublisherConfirmationTracking(cancellationToken)
109+
.ConfigureAwait(false);
110+
111+
await EnforceFlowControlAsync(cancellationToken)
112+
.ConfigureAwait(false);
113+
114+
var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);
115+
116+
using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
117+
? RabbitMQActivitySource.Send(routingKey.Value, exchange.Value, body.Length)
118+
: default;
119+
120+
ulong publishSequenceNumber = 0;
121+
if (publisherConfirmationInfo is not null)
122+
{
123+
publishSequenceNumber = publisherConfirmationInfo.PublishSequenceNumber;
124+
}
125+
126+
BasicProperties? props = PopulateBasicPropertiesHeaders(basicProperties, sendActivity, publishSequenceNumber);
127+
if (props is null)
128+
{
129+
await ModelSendAsync(in cmd, in basicProperties, body, cancellationToken)
130+
.ConfigureAwait(false);
131+
}
132+
else
133+
{
134+
await ModelSendAsync(in cmd, in props, body, cancellationToken)
135+
.ConfigureAwait(false);
136+
}
137+
}
138+
catch (Exception ex)
139+
{
140+
bool exceptionWasHandled =
141+
MaybeHandleExceptionWithEnabledPublisherConfirmations(publisherConfirmationInfo, ex);
142+
if (!exceptionWasHandled)
143+
{
144+
throw;
145+
}
146+
}
147+
finally
148+
{
149+
await MaybeEndPublisherConfirmationTracking(publisherConfirmationInfo, cancellationToken)
150+
.ConfigureAwait(false);
151+
}
152+
}
153+
154+
private BasicProperties? PopulateBasicPropertiesHeaders<TProperties>(TProperties basicProperties,
155+
Activity? sendActivity, ulong publishSequenceNumber)
156+
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
157+
{
158+
/*
159+
* Note: there is nothing to do in this method if *both* of these
160+
* conditions are true:
161+
*
162+
* sendActivity is null - there is no activity to add as a header
163+
* publisher confirmations are NOT enabled
164+
*/
165+
if (sendActivity is null && !_publisherConfirmationsEnabled)
166+
{
167+
return null;
168+
}
169+
170+
bool newHeaders = false;
171+
IDictionary<string, object?>? headers = basicProperties.Headers;
172+
if (headers is null)
173+
{
174+
headers = new Dictionary<string, object?>();
175+
newHeaders = true;
176+
}
177+
MaybeAddActivityToHeaders(headers, basicProperties.CorrelationId, sendActivity);
178+
MaybeAddPublishSequenceNumberToHeaders(headers);
179+
180+
switch (basicProperties)
181+
{
182+
case BasicProperties writableProperties:
183+
if (newHeaders)
184+
{
185+
writableProperties.Headers = headers;
186+
}
187+
return null;
188+
case EmptyBasicProperty:
189+
return new BasicProperties { Headers = headers };
190+
default:
191+
return new BasicProperties(basicProperties) { Headers = headers };
192+
}
193+
194+
void MaybeAddActivityToHeaders(IDictionary<string, object?> headers,
195+
string? correlationId, Activity? sendActivity)
196+
{
197+
if (sendActivity is not null)
198+
{
199+
// This activity is marked as recorded, so let's propagate the trace and span ids.
200+
if (sendActivity.IsAllDataRequested)
201+
{
202+
if (!string.IsNullOrEmpty(correlationId))
203+
{
204+
sendActivity.SetTag(RabbitMQActivitySource.MessageConversationId, correlationId);
205+
}
206+
}
207+
208+
// Inject the ActivityContext into the message headers to propagate trace context to the receiving service.
209+
RabbitMQActivitySource.ContextInjector(sendActivity, headers);
210+
}
211+
}
212+
213+
void MaybeAddPublishSequenceNumberToHeaders(IDictionary<string, object?> headers)
214+
{
215+
if (_publisherConfirmationsEnabled)
216+
{
217+
byte[] publishSequenceNumberBytes = new byte[8];
218+
NetworkOrderSerializer.WriteUInt64(ref publishSequenceNumberBytes.GetStart(), publishSequenceNumber);
219+
headers[Constants.PublishSequenceNumberHeader] = publishSequenceNumberBytes;
220+
}
221+
}
222+
}
223+
}
224+
}

0 commit comments

Comments
 (0)