Skip to content

Adding replication protocol support to mysql server implementation #759

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Jan 16, 2023
23 changes: 22 additions & 1 deletion replication/binlogstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,32 @@ func (s *BinlogStreamer) closeWithError(err error) {
}
}

func newBinlogStreamer() *BinlogStreamer {
func NewBinlogStreamer() *BinlogStreamer {
s := new(BinlogStreamer)

s.ch = make(chan *BinlogEvent, 10240)
s.ech = make(chan error, 4)

return s
}

// AddEventToStreamer adds a binlog event to the streamer. You can use it when you want to add an event to the streamer manually.
// can be used in replication handlers
func (s *BinlogStreamer) AddEventToStreamer(ev *BinlogEvent) error {
select {
case s.ch <- ev:
return nil
case err := <-s.ech:
return err
}
}

// AddErrorToStreamer adds an error to the streamer.
func (s *BinlogStreamer) AddErrorToStreamer(err error) bool {
select {
case s.ech <- err:
return true
default:
return false
}
}
2 changes: 1 addition & 1 deletion replication/binlogsyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func (b *BinlogSyncer) prepare() error {
func (b *BinlogSyncer) startDumpStream() *BinlogStreamer {
b.running = true

s := newBinlogStreamer()
s := NewBinlogStreamer()

b.wg.Add(1)
go b.onStream(s)
Expand Down
58 changes: 58 additions & 0 deletions server/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

. "github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
"github.com/siddontang/go/hack"
)

Expand All @@ -30,6 +31,13 @@ type Handler interface {
HandleOtherCommand(cmd byte, data []byte) error
}

type ReplicationHandler interface {
// handle Replication command
HandleRegisterSlave(data []byte) error
HandleBinlogDump(pos Position) (*replication.BinlogStreamer, error)
HandleBinlogDumpGTID(gtidSet *MysqlGTIDSet) (*replication.BinlogStreamer, error)
}

func (c *Conn) HandleCommand() error {
if c.Conn == nil {
return fmt.Errorf("connection closed")
Expand Down Expand Up @@ -131,6 +139,40 @@ func (c *Conn) dispatch(data []byte) interface{} {
}

return eofResponse{}
case COM_REGISTER_SLAVE:
if h, ok := c.h.(ReplicationHandler); ok {
return h.HandleRegisterSlave(data)
} else {
return c.h.HandleOtherCommand(cmd, data)
}
case COM_BINLOG_DUMP:
if h, ok := c.h.(ReplicationHandler); ok {
pos, err := parseBinlogDump(data)
if err != nil {
return err
}
if s, err := h.HandleBinlogDump(pos); err != nil {
return err
} else {
return s
}
} else {
return c.h.HandleOtherCommand(cmd, data)
}
case COM_BINLOG_DUMP_GTID:
if h, ok := c.h.(ReplicationHandler); ok {
gtidSet, err := parseBinlogDumpGTID(data)
if err != nil {
return err
}
if s, err := h.HandleBinlogDumpGTID(gtidSet); err != nil {
return err
} else {
return s
}
} else {
return c.h.HandleOtherCommand(cmd, data)
}
default:
return c.h.HandleOtherCommand(cmd, data)
}
Expand All @@ -139,6 +181,10 @@ func (c *Conn) dispatch(data []byte) interface{} {
type EmptyHandler struct {
}

type EmptyReplicationHandler struct {
EmptyHandler
}

func (h EmptyHandler) UseDB(dbName string) error {
return nil
}
Expand All @@ -160,6 +206,18 @@ func (h EmptyHandler) HandleStmtClose(context interface{}) error {
return nil
}

func (h EmptyReplicationHandler) HandleRegisterSlave(data []byte) error {
return fmt.Errorf("not supported now")
}

func (h EmptyReplicationHandler) HandleBinlogDump(pos Position) (*replication.BinlogStreamer, error) {
return nil, fmt.Errorf("not supported now")
}

func (h EmptyReplicationHandler) HandleBinlogDumpGTID(gtidSet *MysqlGTIDSet) (*replication.BinlogStreamer, error) {
return nil, fmt.Errorf("not supported now")
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest adding r.close() here, just to gracefully release resources.


func (h EmptyHandler) HandleOtherCommand(cmd byte, data []byte) error {
return NewError(
ER_UNKNOWN_ERROR,
Expand Down
1 change: 1 addition & 0 deletions server/command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ package server

// Ensure EmptyHandler implements Handler interface or cause compile time error
var _ Handler = EmptyHandler{}
var _ ReplicationHandler = EmptyReplicationHandler{}
30 changes: 30 additions & 0 deletions server/replication.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package server

import (
"encoding/binary"

"github.com/go-mysql-org/go-mysql/mysql"
)

func parseBinlogDump(data []byte) (mysql.Position, error) {
if len(data) < 10 {
return mysql.Position{}, mysql.ErrMalformPacket
}
var p mysql.Position
p.Pos = binary.LittleEndian.Uint32(data[0:4])
p.Name = string(data[10:])

return p, nil
}

func parseBinlogDumpGTID(data []byte) (*mysql.MysqlGTIDSet, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you plan to support MariaDB GTID set? you can see ParseMariadbGTID

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MariaDB doesn't implement COM_BINLOG_DUMP_GTID - so, we can assume that this message sent by MySQL.

https://github.com/MariaDB/server/blob/10.9/include/mysql_com.h#L118

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I mean if there's not too much work to do we can support MariaDB by the way. But as you said MariaDB does not use COM_BINLOG_DUMP_GTID so it may not be a trivial work

if len(data) < 15 {
return nil, mysql.ErrMalformPacket
}
lenPosName := binary.LittleEndian.Uint32(data[11:15])
if len(data) < 22+int(lenPosName) {
return nil, mysql.ErrMalformPacket
}

return mysql.DecodeMysqlGTIDSet(data[22+lenPosName:])
}
21 changes: 21 additions & 0 deletions server/resp.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package server

import (
"context"
"fmt"

. "github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
)

func (c *Conn) writeOK(r *Result) error {
Expand Down Expand Up @@ -197,6 +199,23 @@ func (c *Conn) writeFieldValues(fv []FieldValue) error {
return c.WritePacket(data)
}

// see: https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_replication.html
func (c *Conn) writeBinlogEvents(s *replication.BinlogStreamer) error {
for {
ev, err := s.GetEvent(context.Background())
if err != nil {
return err
}
data := make([]byte, 4, 4+len(ev.RawData))
data = append(data, OK_HEADER)

data = append(data, ev.RawData...)
if err := c.WritePacket(data); err != nil {
return err
}
}
}

type noResponse struct{}
type eofResponse struct{}

Expand All @@ -220,6 +239,8 @@ func (c *Conn) WriteValue(value interface{}) error {
return c.writeFieldList(v, nil)
case []FieldValue:
return c.writeFieldValues(v)
case *replication.BinlogStreamer:
return c.writeBinlogEvents(v)
case *Stmt:
return c.writePrepare(v)
default:
Expand Down