-
Notifications
You must be signed in to change notification settings - Fork 901
/
Copy pathexample_producing_events_test.go
116 lines (91 loc) · 3.17 KB
/
example_producing_events_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package azeventhubs_test
import (
"context"
"errors"
"os"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/v2"
)
// Shows how to send events to an Event Hub partition using the [ProducerClient]
// and [EventDataBatch].
func Example_producingEventsUsingProducerClient() {
eventHubNamespace := os.Getenv("EVENTHUB_NAMESPACE") // <ex: myeventhubnamespace.servicebus.windows.net>
eventHubName := os.Getenv("EVENTHUB_NAME")
defaultAzureCred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
panic(err)
}
// Can also use a connection string:
//
// producerClient, err := azeventhubs.NewProducerClientFromConnectionString(connectionString, eventHubName, nil)
//
producerClient, err := azeventhubs.NewProducerClient(eventHubNamespace, eventHubName, defaultAzureCred, nil)
if err != nil {
panic(err)
}
defer producerClient.Close(context.TODO())
events := createEventsForSample()
newBatchOptions := &azeventhubs.EventDataBatchOptions{
// The options allow you to control the size of the batch, as well as the partition it will get sent to.
// PartitionID can be used to target a specific partition ID.
// specific partition ID.
//
// PartitionID: partitionID,
// PartitionKey can be used to ensure that messages that have the same key
// will go to the same partition without requiring your application to specify
// that partition ID.
//
// PartitionKey: partitionKey,
//
// Or, if you leave both PartitionID and PartitionKey nil, the service will choose a partition.
}
// Creates an EventDataBatch, which you can use to pack multiple events together, allowing for efficient transfer.
batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)
if err != nil {
panic(err)
}
for i := 0; i < len(events); i++ {
err = batch.AddEventData(events[i], nil)
if errors.Is(err, azeventhubs.ErrEventDataTooLarge) {
if batch.NumEvents() == 0 {
// This one event is too large for this batch, even on its own. No matter what we do it
// will not be sendable at its current size.
panic(err)
}
// This batch is full - we can send it and create a new one and continue
// packaging and sending events.
if err := producerClient.SendEventDataBatch(context.TODO(), batch, nil); err != nil {
panic(err)
}
// create the next batch we'll use for events, ensuring that we use the same options
// each time so all the messages go the same target.
tmpBatch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)
if err != nil {
panic(err)
}
batch = tmpBatch
// rewind so we can retry adding this event to a batch
i--
} else if err != nil {
panic(err)
}
}
// if we have any events in the last batch, send it
if batch.NumEvents() > 0 {
if err := producerClient.SendEventDataBatch(context.TODO(), batch, nil); err != nil {
panic(err)
}
}
}
func createEventsForSample() []*azeventhubs.EventData {
return []*azeventhubs.EventData{
{
Body: []byte("hello"),
},
{
Body: []byte("world"),
},
}
}