Skip to content

Commit becd1ef

Browse files
authored
Merge pull request #707 from viciious/canal_dialer
Add support for custom dialer in canal and binlog syncer
2 parents be3ff84 + a73f5e5 commit becd1ef

File tree

4 files changed

+47
-16
lines changed

4 files changed

+47
-16
lines changed

canal/canal.go

+20-10
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,7 @@ func (c *Canal) prepareSyncer() error {
427427
TimestampStringLocation: c.cfg.TimestampStringLocation,
428428
TLSConfig: c.cfg.TLSConfig,
429429
Logger: c.cfg.Logger,
430+
Dialer: c.cfg.Dialer,
430431
}
431432

432433
if strings.Contains(c.cfg.Addr, "/") {
@@ -451,6 +452,14 @@ func (c *Canal) prepareSyncer() error {
451452
return nil
452453
}
453454

455+
func (c *Canal) connect(options ...func(*client.Conn)) (*client.Conn, error) {
456+
ctx, cancel := context.WithTimeout(c.ctx, time.Second*10)
457+
defer cancel()
458+
459+
return client.ConnectWithDialer(ctx, "", c.cfg.Addr,
460+
c.cfg.User, c.cfg.Password, "", c.cfg.Dialer, options...)
461+
}
462+
454463
// Execute a SQL
455464
func (c *Canal) Execute(cmd string, args ...interface{}) (rr *mysql.Result, err error) {
456465
c.connLock.Lock()
@@ -461,27 +470,28 @@ func (c *Canal) Execute(cmd string, args ...interface{}) (rr *mysql.Result, err
461470
conn.SetTLSConfig(c.cfg.TLSConfig)
462471
})
463472
}
473+
464474
retryNum := 3
465475
for i := 0; i < retryNum; i++ {
466476
if c.conn == nil {
467-
c.conn, err = client.Connect(c.cfg.Addr, c.cfg.User, c.cfg.Password, "", argF...)
477+
c.conn, err = c.connect(argF...)
468478
if err != nil {
469479
return nil, errors.Trace(err)
470480
}
471481
}
472482

473483
rr, err = c.conn.Execute(cmd, args...)
474-
if err != nil && !mysql.ErrorEqual(err, mysql.ErrBadConn) {
475-
return
476-
} else if mysql.ErrorEqual(err, mysql.ErrBadConn) {
477-
c.conn.Close()
478-
c.conn = nil
479-
continue
480-
} else {
481-
return
484+
if err != nil {
485+
if mysql.ErrorEqual(err, mysql.ErrBadConn) {
486+
c.conn.Close()
487+
c.conn = nil
488+
continue
489+
}
490+
return nil, err
482491
}
492+
break
483493
}
484-
return
494+
return rr, err
485495
}
486496

487497
func (c *Canal) SyncedPosition() mysql.Position {

canal/config.go

+8
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@ import (
44
"crypto/tls"
55
"io/ioutil"
66
"math/rand"
7+
"net"
78
"os"
89
"time"
910

1011
"github.com/BurntSushi/toml"
12+
"github.com/go-mysql-org/go-mysql/client"
1113
"github.com/go-mysql-org/go-mysql/mysql"
1214
"github.com/pingcap/errors"
1315
"github.com/siddontang/go-log/log"
@@ -91,6 +93,9 @@ type Config struct {
9193

9294
//Set Logger
9395
Logger *log.Logger
96+
97+
//Set Dialer
98+
Dialer client.Dialer
9499
}
95100

96101
func NewConfigWithFile(name string) (*Config, error) {
@@ -132,5 +137,8 @@ func NewDefaultConfig() *Config {
132137
streamHandler, _ := log.NewStreamHandler(os.Stdout)
133138
c.Logger = log.NewDefault(streamHandler)
134139

140+
dialer := &net.Dialer{}
141+
c.Dialer = dialer.DialContext
142+
135143
return c
136144
}

client/conn.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,12 @@ func getNetProto(addr string) string {
6161
// Connect to a MySQL server, addr can be ip:port, or a unix socket domain like /var/sock.
6262
// Accepts a series of configuration functions as a variadic argument.
6363
func Connect(addr string, user string, password string, dbName string, options ...func(*Conn)) (*Conn, error) {
64-
proto := getNetProto(addr)
65-
6664
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
6765
defer cancel()
6866

6967
dialer := &net.Dialer{}
7068

71-
return ConnectWithDialer(ctx, proto, addr, user, password, dbName, dialer.DialContext, options...)
69+
return ConnectWithDialer(ctx, "", addr, user, password, dbName, dialer.DialContext, options...)
7270
}
7371

7472
// Dialer connects to the address on the named network using the provided context.
@@ -78,6 +76,10 @@ type Dialer func(ctx context.Context, network, address string) (net.Conn, error)
7876
func ConnectWithDialer(ctx context.Context, network string, addr string, user string, password string, dbName string, dialer Dialer, options ...func(*Conn)) (*Conn, error) {
7977
c := new(Conn)
8078

79+
if network == "" {
80+
network = getNetProto(addr)
81+
}
82+
8183
var err error
8284
conn, err := dialer(ctx, network, addr)
8385
if err != nil {

replication/binlogsyncer.go

+14-3
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,9 @@ type BinlogSyncerConfig struct {
112112

113113
// Set Logger
114114
Logger *log.Logger
115+
116+
// Set Dialer
117+
Dialer client.Dialer
115118
}
116119

117120
// BinlogSyncer syncs binlog event from server.
@@ -149,6 +152,10 @@ func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer {
149152
if cfg.ServerID == 0 {
150153
cfg.Logger.Fatal("can't use 0 as the server ID")
151154
}
155+
if cfg.Dialer == nil {
156+
dialer := &net.Dialer{}
157+
cfg.Dialer = dialer.DialContext
158+
}
152159

153160
// Clear the Password to avoid outputing it in log.
154161
pass := cfg.Password
@@ -864,9 +871,13 @@ func (b *BinlogSyncer) newConnection() (*client.Conn, error) {
864871
addr = b.cfg.Host
865872
}
866873

867-
return client.Connect(addr, b.cfg.User, b.cfg.Password, "", func(c *client.Conn) {
868-
c.SetTLSConfig(b.cfg.TLSConfig)
869-
})
874+
ctx, cancel := context.WithTimeout(b.ctx, time.Second*10)
875+
defer cancel()
876+
877+
return client.ConnectWithDialer(ctx, "", addr, b.cfg.User, b.cfg.Password,
878+
"", b.cfg.Dialer, func(c *client.Conn) {
879+
c.SetTLSConfig(b.cfg.TLSConfig)
880+
})
870881
}
871882

872883
func (b *BinlogSyncer) killConnection(conn *client.Conn, id uint32) {

0 commit comments

Comments
 (0)