Skip to content

Commit d162219

Browse files
author
Dylan Terry
committed
Refactor event handling by replacing SyncMode and EventHandleMode with SynchronousEventHandler. Simplify the event processing in BinlogSyncerConfig by introducing SynchronousEventHandler for synchronous event handling. Update StartBackup, StartBackupWithHandler, and associated tests to reflect these changes.
1 parent c7a3c17 commit d162219

File tree

3 files changed

+88
-88
lines changed

3 files changed

+88
-88
lines changed

replication/backup.go

+49-54
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,24 @@ import (
1212
"github.com/pingcap/errors"
1313
)
1414

15-
// StartBackup: Like mysqlbinlog remote raw backup
16-
// Backup remote binlog from position (filename, offset) and write in backupDir
15+
// StartBackup starts the backup process for the binary log and writes to the backup directory.
1716
func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Duration) error {
1817
err := os.MkdirAll(backupDir, 0755)
1918
if err != nil {
2019
return errors.Trace(err)
2120
}
22-
return b.StartBackupWithHandler(p, timeout, func(filename string) (io.WriteCloser, error) {
23-
return os.OpenFile(path.Join(backupDir, filename), os.O_CREATE|os.O_WRONLY, 0644)
24-
})
21+
if b.cfg.SynchronousEventHandler == nil {
22+
return b.StartBackupWithHandler(p, timeout, func(filename string) (io.WriteCloser, error) {
23+
return os.OpenFile(path.Join(backupDir, filename), os.O_CREATE|os.O_WRONLY, 0644)
24+
})
25+
} else {
26+
return b.StartSynchronousBackup(p, timeout)
27+
}
2528
}
2629

2730
// StartBackupWithHandler starts the backup process for the binary log using the specified position and handler.
2831
// The process will continue until the timeout is reached or an error occurs.
32+
// This method should not be used together with SynchronousEventHandler.
2933
//
3034
// Parameters:
3135
// - p: The starting position in the binlog from which to begin the backup.
@@ -38,6 +42,9 @@ func (b *BinlogSyncer) StartBackupWithHandler(p Position, timeout time.Duration,
3842
// a very long timeout here
3943
timeout = 30 * 3600 * 24 * time.Second
4044
}
45+
if b.cfg.SynchronousEventHandler != nil {
46+
return errors.New("StartBackupWithHandler cannot be used when SynchronousEventHandler is set. Use StartSynchronousBackup instead.")
47+
}
4148

4249
// Force use raw mode
4350
b.parser.SetRawMode(true)
@@ -47,18 +54,12 @@ func (b *BinlogSyncer) StartBackupWithHandler(p Position, timeout time.Duration,
4754
handler: handler,
4855
}
4956

50-
if b.cfg.SyncMode == SyncModeSync {
51-
// Set the event handler in BinlogSyncer for synchronous mode
52-
b.SetEventHandler(backupHandler)
53-
}
54-
5557
s, err := b.StartSync(p)
5658
if err != nil {
5759
return errors.Trace(err)
5860
}
5961

6062
defer func() {
61-
b.SetEventHandler(nil) // Reset the event handler
6263
if backupHandler.w != nil {
6364
closeErr := backupHandler.w.Close()
6465
if retErr == nil {
@@ -70,66 +71,76 @@ func (b *BinlogSyncer) StartBackupWithHandler(p Position, timeout time.Duration,
7071
ctx, cancel := context.WithTimeout(context.Background(), timeout)
7172
defer cancel()
7273

73-
if b.cfg.SyncMode == SyncModeSync {
74-
// Synchronous mode: wait for completion or error
74+
for {
7575
select {
7676
case <-ctx.Done():
7777
return nil
7878
case <-b.ctx.Done():
7979
return nil
8080
case err := <-s.ech:
8181
return errors.Trace(err)
82-
}
83-
} else {
84-
// Asynchronous mode: consume events from the streamer
85-
for {
86-
select {
87-
case <-ctx.Done():
88-
return nil
89-
case <-b.ctx.Done():
90-
return nil
91-
case err := <-s.ech:
82+
case e := <-s.ch:
83+
err = backupHandler.HandleEvent(e)
84+
if err != nil {
9285
return errors.Trace(err)
93-
case e := <-s.ch:
94-
err = backupHandler.HandleEvent(e)
95-
if err != nil {
96-
return errors.Trace(err)
97-
}
9886
}
9987
}
10088
}
10189
}
10290

91+
// StartSynchronousBackup starts the backup process using the SynchronousEventHandler in the BinlogSyncerConfig.
92+
func (b *BinlogSyncer) StartSynchronousBackup(p Position, timeout time.Duration) error {
93+
if b.cfg.SynchronousEventHandler == nil {
94+
return errors.New("SynchronousEventHandler must be set in BinlogSyncerConfig to use StartSynchronousBackup")
95+
}
96+
97+
if timeout == 0 {
98+
timeout = 30 * 3600 * 24 * time.Second // Long timeout by default
99+
}
100+
101+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
102+
defer cancel()
103+
104+
s, err := b.StartSync(p)
105+
if err != nil {
106+
return errors.Trace(err)
107+
}
108+
109+
// Wait for the binlog syncer to finish or encounter an error
110+
select {
111+
case <-ctx.Done():
112+
return nil
113+
case <-b.ctx.Done():
114+
return nil
115+
case err := <-s.ech:
116+
return errors.Trace(err)
117+
}
118+
}
119+
103120
// BackupEventHandler handles writing events for backup
104121
type BackupEventHandler struct {
105-
handler func(binlogFilename string) (io.WriteCloser, error)
106-
w io.WriteCloser
107-
mutex sync.Mutex
108-
fsyncedChan chan struct{}
109-
eventCount int // eventCount used for testing
122+
handler func(binlogFilename string) (io.WriteCloser, error)
123+
w io.WriteCloser
124+
mutex sync.Mutex
110125

111126
filename string
112127
}
113128

129+
// HandleEvent processes a single event for the backup.
114130
func (h *BackupEventHandler) HandleEvent(e *BinlogEvent) error {
115131
h.mutex.Lock()
116132
defer h.mutex.Unlock()
117133

118134
var err error
119-
120-
// Update the offset
121135
offset := e.Header.LogPos
122136

123137
if e.Header.EventType == ROTATE_EVENT {
124138
rotateEvent := e.Event.(*RotateEvent)
125139
h.filename = string(rotateEvent.NextLogName)
126-
127140
if e.Header.Timestamp == 0 || offset == 0 {
128-
// Fake rotate event, skip processing
129141
return nil
130142
}
131143
} else if e.Header.EventType == FORMAT_DESCRIPTION_EVENT {
132-
// Close the current writer and open a new one
133144
if h.w != nil {
134145
if err = h.w.Close(); err != nil {
135146
h.w = nil
@@ -146,14 +157,12 @@ func (h *BackupEventHandler) HandleEvent(e *BinlogEvent) error {
146157
return errors.Trace(err)
147158
}
148159

149-
// Write binlog header fe'bin'
150160
_, err = h.w.Write(BinLogFileHeader)
151161
if err != nil {
152162
return errors.Trace(err)
153163
}
154164
}
155165

156-
// Write raw event data to the current writer
157166
if h.w != nil {
158167
n, err := h.w.Write(e.RawData)
159168
if err != nil {
@@ -162,23 +171,9 @@ func (h *BackupEventHandler) HandleEvent(e *BinlogEvent) error {
162171
if n != len(e.RawData) {
163172
return errors.Trace(io.ErrShortWrite)
164173
}
165-
166-
// Perform Sync if the writer supports it
167-
if f, ok := h.w.(*os.File); ok {
168-
if err := f.Sync(); err != nil {
169-
return errors.Trace(err)
170-
}
171-
// Signal that fsync has completed
172-
if h.fsyncedChan != nil {
173-
h.fsyncedChan <- struct{}{}
174-
}
175-
}
176174
} else {
177-
// If writer is nil and event is not FORMAT_DESCRIPTION_EVENT, we can't write
178-
// This should not happen if events are in expected order
179175
return errors.New("writer is not initialized")
180176
}
181177

182-
h.eventCount++
183178
return nil
184179
}

replication/backup_test.go

+30-7
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ package replication
22

33
import (
44
"context"
5+
"io"
56
"os"
7+
"path"
68
"time"
79

810
"github.com/stretchr/testify/require"
@@ -50,24 +52,36 @@ func (t *testSyncerSuite) TestStartBackupEndInGivenTime() {
5052

5153
// TestAsyncBackup runs the backup process in asynchronous mode and verifies binlog file creation.
5254
func (t *testSyncerSuite) TestAsyncBackup() {
53-
testSyncModeBackup(t, SyncModeAsync)
55+
testBackup(t, false) // false indicates asynchronous mode
5456
}
5557

5658
// TestSyncBackup runs the backup process in synchronous mode and verifies binlog file creation.
5759
func (t *testSyncerSuite) TestSyncBackup() {
58-
testSyncModeBackup(t, SyncModeSync)
60+
testBackup(t, true) // true indicates synchronous mode
5961
}
6062

61-
// testSyncModeBackup is a helper function that runs the backup process for a given sync mode and checks if binlog files are written correctly.
62-
func testSyncModeBackup(t *testSyncerSuite, syncMode SyncMode) {
63+
// testBackup is a helper function that runs the backup process in the specified mode and checks if binlog files are written correctly.
64+
func testBackup(t *testSyncerSuite, isSynchronous bool) {
6365
t.setupTest(mysql.MySQLFlavor)
6466
t.b.cfg.SemiSyncEnabled = false // Ensure semi-sync is disabled
65-
t.b.cfg.SyncMode = syncMode // Set the sync mode
6667

6768
binlogDir := "./var"
6869
os.RemoveAll(binlogDir)
6970
timeout := 3 * time.Second
7071

72+
if isSynchronous {
73+
// Set up a BackupEventHandler for synchronous mode
74+
backupHandler := &BackupEventHandler{
75+
handler: func(filename string) (io.WriteCloser, error) {
76+
return os.OpenFile(path.Join(binlogDir, filename), os.O_CREATE|os.O_WRONLY, 0644)
77+
},
78+
}
79+
t.b.cfg.SynchronousEventHandler = backupHandler
80+
} else {
81+
// Ensure SynchronousEventHandler is nil for asynchronous mode
82+
t.b.cfg.SynchronousEventHandler = nil
83+
}
84+
7185
done := make(chan bool)
7286

7387
// Start the backup process in a goroutine
@@ -88,8 +102,17 @@ func testSyncModeBackup(t *testSyncerSuite, syncMode SyncMode) {
88102
files, err := os.ReadDir(binlogDir)
89103
require.NoError(t.T(), err, "Failed to read binlog directory")
90104
require.Greater(t.T(), len(files), 0, "Binlog files were not written to the directory")
91-
t.T().Logf("Backup completed successfully in %v mode with %d binlog file(s).", syncMode, len(files))
105+
mode := modeLabel(isSynchronous)
106+
t.T().Logf("Backup completed successfully in %s mode with %d binlog file(s).", mode, len(files))
92107
case <-ctx.Done():
93-
t.T().Fatalf("Timeout error during backup in %v mode.", syncMode)
108+
mode := modeLabel(isSynchronous)
109+
t.T().Fatalf("Timeout error during backup in %s mode.", mode)
110+
}
111+
}
112+
113+
func modeLabel(isSynchronous bool) string {
114+
if isSynchronous {
115+
return "synchronous"
94116
}
117+
return "asynchronous"
95118
}

replication/binlogsyncer.go

+9-27
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,6 @@ var (
2525
errSyncRunning = errors.New("Sync is running, must Close first")
2626
)
2727

28-
type SyncMode int
29-
30-
const (
31-
SyncModeAsync SyncMode = iota // Asynchronous mode (default)
32-
SyncModeSync // Synchronous mode
33-
)
34-
3528
// BinlogSyncerConfig is the configuration for BinlogSyncer.
3629
type BinlogSyncerConfig struct {
3730
// ServerID is the unique ID in cluster.
@@ -134,10 +127,9 @@ type BinlogSyncerConfig struct {
134127

135128
EventCacheCount int
136129

137-
// SyncMode specifies whether to operate in synchronous or asynchronous mode.
138-
// - SyncModeAsync (default): Events are sent to the BinlogStreamer and can be consumed via GetEvent().
139-
// - SyncModeSync: Events are processed synchronously using the EventHandler.
140-
SyncMode SyncMode
130+
// SynchronousEventHandler is used for synchronous event handling.
131+
// This should not be used together with StartBackupWithHandler.
132+
SynchronousEventHandler EventHandler
141133
}
142134

143135
// EventHandler defines the interface for processing binlog events.
@@ -905,23 +897,13 @@ func (b *BinlogSyncer) handleEventAndACK(s *BinlogStreamer, e *BinlogEvent, need
905897
}
906898
}
907899

908-
// Process the event based on the configured SyncMode
909-
switch b.cfg.SyncMode {
910-
case SyncModeSync:
911-
// Synchronous mode: use EventHandler
912-
b.m.RLock()
913-
handler := b.eventHandler
914-
b.m.RUnlock()
915-
if handler != nil {
916-
err := handler.HandleEvent(e)
917-
if err != nil {
918-
return errors.Trace(err)
919-
}
920-
} else {
921-
return errors.New("no EventHandler set for synchronous mode")
900+
// Use SynchronousEventHandler if it's set
901+
if b.cfg.SynchronousEventHandler != nil {
902+
err := b.cfg.SynchronousEventHandler.HandleEvent(e)
903+
if err != nil {
904+
return errors.Trace(err)
922905
}
923-
924-
case SyncModeAsync:
906+
} else {
925907
// Asynchronous mode: send the event to the streamer channel
926908
select {
927909
case s.ch <- e:

0 commit comments

Comments
 (0)