Skip to content

Commit 63a80bf

Browse files
pat-spat-s
authored and
Gusted
committed
feat: use XORM EngineGroup instead of single Engine connection (go-gitea#7212)
Resolves go-gitea#7207 Add new configuration to make XORM work with a main and replicas database instances. The follow configuration parameters were added: - `HOST_PRIMARY` - `HOST_REPLICAS` - `LOAD_BALANCE_POLICY`. Options: - `"WeightRandom"` -> `xorm.WeightRandomPolicy` - `"WeightRoundRobin` -> `WeightRoundRobinPolicy` - `"LeastCon"` -> `LeastConnPolicy` - `"RoundRobin"` -> `xorm.RoundRobinPolicy()` - default: `xorm.RandomPolicy()` - `LOAD_BALANCE_WEIGHTS` Co-authored-by: pat-s <patrick.schratz@gmail.com@> Reviewed-on: https://codeberg.org/forgejo/forgejo/pulls/7212 Reviewed-by: Gusted <gusted@noreply.codeberg.org> Co-authored-by: pat-s <patrick.schratz@gmail.com> Co-committed-by: pat-s <patrick.schratz@gmail.com>
1 parent a23d045 commit 63a80bf

File tree

19 files changed

+463
-129
lines changed

19 files changed

+463
-129
lines changed

cmd/doctor.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"forgejo.org/services/doctor"
2121

2222
"github.com/urfave/cli/v2"
23-
"xorm.io/xorm"
2423
)
2524

2625
// CmdDoctor represents the available doctor sub-command.
@@ -120,7 +119,7 @@ func runRecreateTable(ctx *cli.Context) error {
120119

121120
args := ctx.Args()
122121
names := make([]string, 0, ctx.NArg())
123-
for i := 0; i < ctx.NArg(); i++ {
122+
for i := range ctx.NArg() {
124123
names = append(names, args.Get(i))
125124
}
126125

@@ -130,11 +129,17 @@ func runRecreateTable(ctx *cli.Context) error {
130129
}
131130
recreateTables := migrate_base.RecreateTables(beans...)
132131

133-
return db.InitEngineWithMigration(stdCtx, func(x *xorm.Engine) error {
134-
if err := migrations.EnsureUpToDate(x); err != nil {
132+
return db.InitEngineWithMigration(stdCtx, func(x db.Engine) error {
133+
engine, err := db.GetMasterEngine(x)
134+
if err != nil {
135135
return err
136136
}
137-
return recreateTables(x)
137+
138+
if err := migrations.EnsureUpToDate(engine); err != nil {
139+
return err
140+
}
141+
142+
return recreateTables(engine)
138143
})
139144
}
140145

cmd/migrate.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,13 @@ func runMigrate(ctx *cli.Context) error {
3636
log.Info("Log path: %s", setting.Log.RootPath)
3737
log.Info("Configuration file: %s", setting.CustomConf)
3838

39-
if err := db.InitEngineWithMigration(context.Background(), migrations.Migrate); err != nil {
39+
if err := db.InitEngineWithMigration(context.Background(), func(dbEngine db.Engine) error {
40+
masterEngine, err := db.GetMasterEngine(dbEngine)
41+
if err != nil {
42+
return err
43+
}
44+
return migrations.Migrate(masterEngine)
45+
}); err != nil {
4046
log.Fatal("Failed to initialize ORM engine: %v", err)
4147
return err
4248
}

cmd/migrate_storage.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"forgejo.org/modules/storage"
2424

2525
"github.com/urfave/cli/v2"
26+
"xorm.io/xorm"
2627
)
2728

2829
// CmdMigrateStorage represents the available migrate storage sub-command.
@@ -195,7 +196,9 @@ func runMigrateStorage(ctx *cli.Context) error {
195196
log.Info("Log path: %s", setting.Log.RootPath)
196197
log.Info("Configuration file: %s", setting.CustomConf)
197198

198-
if err := db.InitEngineWithMigration(context.Background(), migrations.Migrate); err != nil {
199+
if err := db.InitEngineWithMigration(context.Background(), func(e db.Engine) error {
200+
return migrations.Migrate(e.(*xorm.Engine))
201+
}); err != nil {
199202
log.Fatal("Failed to initialize ORM engine: %v", err)
200203
return err
201204
}

models/db/engine.go

Lines changed: 124 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -95,87 +95,132 @@ func init() {
9595
}
9696
}
9797

98-
// newXORMEngine returns a new XORM engine from the configuration
99-
func newXORMEngine() (*xorm.Engine, error) {
100-
connStr, err := setting.DBConnStr()
98+
// newXORMEngineGroup creates an xorm.EngineGroup (with one master and one or more slaves).
99+
// It assumes you have separate master and slave DSNs defined via the settings package.
100+
func newXORMEngineGroup() (Engine, error) {
101+
// Retrieve master DSN from settings.
102+
masterConnStr, err := setting.DBMasterConnStr()
101103
if err != nil {
102-
return nil, err
104+
return nil, fmt.Errorf("failed to determine master DSN: %w", err)
103105
}
104106

105-
var engine *xorm.Engine
106-
107+
var masterEngine *xorm.Engine
108+
// For PostgreSQL: if a schema is provided, we use the special "postgresschema" driver.
107109
if setting.Database.Type.IsPostgreSQL() && len(setting.Database.Schema) > 0 {
108-
// OK whilst we sort out our schema issues - create a schema aware postgres
109110
registerPostgresSchemaDriver()
110-
engine, err = xorm.NewEngine("postgresschema", connStr)
111+
masterEngine, err = xorm.NewEngine("postgresschema", masterConnStr)
111112
} else {
112-
engine, err = xorm.NewEngine(setting.Database.Type.String(), connStr)
113+
masterEngine, err = xorm.NewEngine(setting.Database.Type.String(), masterConnStr)
113114
}
114-
115115
if err != nil {
116-
return nil, err
116+
return nil, fmt.Errorf("failed to create master engine: %w", err)
117117
}
118118
if setting.Database.Type.IsMySQL() {
119-
engine.Dialect().SetParams(map[string]string{"rowFormat": "DYNAMIC"})
119+
masterEngine.Dialect().SetParams(map[string]string{"rowFormat": "DYNAMIC"})
120120
}
121-
engine.SetSchema(setting.Database.Schema)
122-
return engine, nil
121+
masterEngine.SetSchema(setting.Database.Schema)
122+
123+
slaveConnStrs, err := setting.DBSlaveConnStrs()
124+
if err != nil {
125+
return nil, fmt.Errorf("failed to load slave DSNs: %w", err)
126+
}
127+
128+
var slaveEngines []*xorm.Engine
129+
// Iterate over all slave DSNs and create engines
130+
for _, dsn := range slaveConnStrs {
131+
slaveEngine, err := xorm.NewEngine(setting.Database.Type.String(), dsn)
132+
if err != nil {
133+
return nil, fmt.Errorf("failed to create slave engine for dsn %q: %w", dsn, err)
134+
}
135+
if setting.Database.Type.IsMySQL() {
136+
slaveEngine.Dialect().SetParams(map[string]string{"rowFormat": "DYNAMIC"})
137+
}
138+
slaveEngine.SetSchema(setting.Database.Schema)
139+
slaveEngines = append(slaveEngines, slaveEngine)
140+
}
141+
142+
policy := setting.BuildLoadBalancePolicy(&setting.Database, slaveEngines)
143+
144+
// Create the EngineGroup using the selected policy
145+
group, err := xorm.NewEngineGroup(masterEngine, slaveEngines, policy)
146+
if err != nil {
147+
return nil, fmt.Errorf("failed to create engine group: %w", err)
148+
}
149+
return engineGroupWrapper{group}, nil
150+
}
151+
152+
type engineGroupWrapper struct {
153+
*xorm.EngineGroup
154+
}
155+
156+
func (w engineGroupWrapper) AddHook(hook contexts.Hook) bool {
157+
w.EngineGroup.AddHook(hook)
158+
return true
123159
}
124160

125-
// SyncAllTables sync the schemas of all tables, is required by unit test code
161+
// SyncAllTables sync the schemas of all tables
126162
func SyncAllTables() error {
127163
_, err := x.StoreEngine("InnoDB").SyncWithOptions(xorm.SyncOptions{
128164
WarnIfDatabaseColumnMissed: true,
129165
}, tables...)
130166
return err
131167
}
132168

133-
// InitEngine initializes the xorm.Engine and sets it as db.DefaultContext
169+
// InitEngine initializes the xorm EngineGroup and sets it as db.DefaultContext
134170
func InitEngine(ctx context.Context) error {
135-
xormEngine, err := newXORMEngine()
171+
xormEngine, err := newXORMEngineGroup()
136172
if err != nil {
137173
return fmt.Errorf("failed to connect to database: %w", err)
138174
}
175+
// Try to cast to the concrete type to access diagnostic methods
176+
if eng, ok := xormEngine.(engineGroupWrapper); ok {
177+
eng.SetMapper(names.GonicMapper{})
178+
// WARNING: for serv command, MUST remove the output to os.Stdout,
179+
// so use a log file instead of printing to stdout.
180+
eng.SetLogger(NewXORMLogger(setting.Database.LogSQL))
181+
eng.ShowSQL(setting.Database.LogSQL)
182+
eng.SetMaxOpenConns(setting.Database.MaxOpenConns)
183+
eng.SetMaxIdleConns(setting.Database.MaxIdleConns)
184+
eng.SetConnMaxLifetime(setting.Database.ConnMaxLifetime)
185+
eng.SetConnMaxIdleTime(setting.Database.ConnMaxIdleTime)
186+
eng.SetDefaultContext(ctx)
187+
188+
if setting.Database.SlowQueryThreshold > 0 {
189+
eng.AddHook(&SlowQueryHook{
190+
Treshold: setting.Database.SlowQueryThreshold,
191+
Logger: log.GetLogger("xorm"),
192+
})
193+
}
139194

140-
xormEngine.SetMapper(names.GonicMapper{})
141-
// WARNING: for serv command, MUST remove the output to os.stdout,
142-
// so use log file to instead print to stdout.
143-
xormEngine.SetLogger(NewXORMLogger(setting.Database.LogSQL))
144-
xormEngine.ShowSQL(setting.Database.LogSQL)
145-
xormEngine.SetMaxOpenConns(setting.Database.MaxOpenConns)
146-
xormEngine.SetMaxIdleConns(setting.Database.MaxIdleConns)
147-
xormEngine.SetConnMaxLifetime(setting.Database.ConnMaxLifetime)
148-
xormEngine.SetConnMaxIdleTime(setting.Database.ConnMaxIdleTime)
149-
xormEngine.SetDefaultContext(ctx)
150-
151-
if setting.Database.SlowQueryThreshold > 0 {
152-
xormEngine.AddHook(&SlowQueryHook{
153-
Treshold: setting.Database.SlowQueryThreshold,
154-
Logger: log.GetLogger("xorm"),
155-
})
156-
}
157-
158-
errorLogger := log.GetLogger("xorm")
159-
if setting.IsInTesting {
160-
errorLogger = log.GetLogger(log.DEFAULT)
161-
}
195+
errorLogger := log.GetLogger("xorm")
196+
if setting.IsInTesting {
197+
errorLogger = log.GetLogger(log.DEFAULT)
198+
}
162199

163-
xormEngine.AddHook(&ErrorQueryHook{
164-
Logger: errorLogger,
165-
})
200+
eng.AddHook(&ErrorQueryHook{
201+
Logger: errorLogger,
202+
})
166203

167-
xormEngine.AddHook(&TracingHook{})
204+
eng.AddHook(&TracingHook{})
168205

169-
SetDefaultEngine(ctx, xormEngine)
206+
SetDefaultEngine(ctx, eng)
207+
} else {
208+
// Fallback: if type assertion fails, set default engine without extended diagnostics
209+
SetDefaultEngine(ctx, xormEngine)
210+
}
170211
return nil
171212
}
172213

173-
// SetDefaultEngine sets the default engine for db
174-
func SetDefaultEngine(ctx context.Context, eng *xorm.Engine) {
175-
x = eng
214+
// SetDefaultEngine sets the default engine for db.
215+
func SetDefaultEngine(ctx context.Context, eng Engine) {
216+
masterEngine, err := GetMasterEngine(eng)
217+
if err == nil {
218+
x = masterEngine
219+
}
220+
176221
DefaultContext = &Context{
177222
Context: ctx,
178-
e: x,
223+
e: eng,
179224
}
180225
}
181226

@@ -191,12 +236,12 @@ func UnsetDefaultEngine() {
191236
DefaultContext = nil
192237
}
193238

194-
// InitEngineWithMigration initializes a new xorm.Engine and sets it as the db.DefaultContext
239+
// InitEngineWithMigration initializes a new xorm EngineGroup, runs migrations, and sets it as db.DefaultContext
195240
// This function must never call .Sync() if the provided migration function fails.
196241
// When called from the "doctor" command, the migration function is a version check
197242
// that prevents the doctor from fixing anything in the database if the migration level
198243
// is different from the expected value.
199-
func InitEngineWithMigration(ctx context.Context, migrateFunc func(*xorm.Engine) error) (err error) {
244+
func InitEngineWithMigration(ctx context.Context, migrateFunc func(Engine) error) (err error) {
200245
if err = InitEngine(ctx); err != nil {
201246
return err
202247
}
@@ -230,14 +275,14 @@ func InitEngineWithMigration(ctx context.Context, migrateFunc func(*xorm.Engine)
230275
return nil
231276
}
232277

233-
// NamesToBean return a list of beans or an error
278+
// NamesToBean returns a list of beans given names
234279
func NamesToBean(names ...string) ([]any, error) {
235280
beans := []any{}
236281
if len(names) == 0 {
237282
beans = append(beans, tables...)
238283
return beans, nil
239284
}
240-
// Need to map provided names to beans...
285+
// Map provided names to beans
241286
beanMap := make(map[string]any)
242287
for _, bean := range tables {
243288
beanMap[strings.ToLower(reflect.Indirect(reflect.ValueOf(bean)).Type().Name())] = bean
@@ -259,7 +304,7 @@ func NamesToBean(names ...string) ([]any, error) {
259304
return beans, nil
260305
}
261306

262-
// DumpDatabase dumps all data from database according the special database SQL syntax to file system.
307+
// DumpDatabase dumps all data from database using special SQL syntax to the file system.
263308
func DumpDatabase(filePath, dbType string) error {
264309
var tbs []*schemas.Table
265310
for _, t := range tables {
@@ -295,29 +340,33 @@ func MaxBatchInsertSize(bean any) int {
295340
return 999 / len(t.ColumnsSeq())
296341
}
297342

298-
// IsTableNotEmpty returns true if table has at least one record
343+
// IsTableNotEmpty returns true if the table has at least one record
299344
func IsTableNotEmpty(beanOrTableName any) (bool, error) {
300345
return x.Table(beanOrTableName).Exist()
301346
}
302347

303-
// DeleteAllRecords will delete all the records of this table
348+
// DeleteAllRecords deletes all records in the given table.
304349
func DeleteAllRecords(tableName string) error {
305350
_, err := x.Exec(fmt.Sprintf("DELETE FROM %s", tableName))
306351
return err
307352
}
308353

309-
// GetMaxID will return max id of the table
354+
// GetMaxID returns the maximum id in the table
310355
func GetMaxID(beanOrTableName any) (maxID int64, err error) {
311356
_, err = x.Select("MAX(id)").Table(beanOrTableName).Get(&maxID)
312357
return maxID, err
313358
}
314359

315360
func SetLogSQL(ctx context.Context, on bool) {
316-
e := GetEngine(ctx)
317-
if x, ok := e.(*xorm.Engine); ok {
318-
x.ShowSQL(on)
319-
} else if sess, ok := e.(*xorm.Session); ok {
361+
ctxEngine := GetEngine(ctx)
362+
363+
if sess, ok := ctxEngine.(*xorm.Session); ok {
320364
sess.Engine().ShowSQL(on)
365+
} else if wrapper, ok := ctxEngine.(engineGroupWrapper); ok {
366+
// Handle engineGroupWrapper directly
367+
wrapper.ShowSQL(on)
368+
} else if masterEngine, err := GetMasterEngine(ctxEngine); err == nil {
369+
masterEngine.ShowSQL(on)
321370
}
322371
}
323372

@@ -374,3 +423,18 @@ func (h *ErrorQueryHook) AfterProcess(c *contexts.ContextHook) error {
374423
}
375424
return nil
376425
}
426+
427+
// GetMasterEngine extracts the master xorm.Engine from the provided xorm.Engine.
428+
// This handles both direct xorm.Engine cases and engines that implement a Master() method.
429+
func GetMasterEngine(x Engine) (*xorm.Engine, error) {
430+
if getter, ok := x.(interface{ Master() *xorm.Engine }); ok {
431+
return getter.Master(), nil
432+
}
433+
434+
engine, ok := x.(*xorm.Engine)
435+
if !ok {
436+
return nil, fmt.Errorf("unsupported engine type: %T", x)
437+
}
438+
439+
return engine, nil
440+
}

models/db/index_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,11 @@ func getCurrentResourceIndex(ctx context.Context, tableName string, groupID int6
3333

3434
func TestSyncMaxResourceIndex(t *testing.T) {
3535
require.NoError(t, unittest.PrepareTestDatabase())
36-
xe := unittest.GetXORMEngine()
36+
xe, err := unittest.GetXORMEngine()
37+
require.NoError(t, err)
3738
require.NoError(t, xe.Sync(&TestIndex{}))
3839

39-
err := db.SyncMaxResourceIndex(db.DefaultContext, "test_index", 10, 51)
40+
err = db.SyncMaxResourceIndex(db.DefaultContext, "test_index", 10, 51)
4041
require.NoError(t, err)
4142

4243
// sync new max index
@@ -88,7 +89,8 @@ func TestSyncMaxResourceIndex(t *testing.T) {
8889

8990
func TestGetNextResourceIndex(t *testing.T) {
9091
require.NoError(t, unittest.PrepareTestDatabase())
91-
xe := unittest.GetXORMEngine()
92+
xe, err := unittest.GetXORMEngine()
93+
require.NoError(t, err)
9294
require.NoError(t, xe.Sync(&TestIndex{}))
9395

9496
// create a new record

models/db/iterate_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ import (
1717

1818
func TestIterate(t *testing.T) {
1919
require.NoError(t, unittest.PrepareTestDatabase())
20-
xe := unittest.GetXORMEngine()
20+
xe, err := unittest.GetXORMEngine()
21+
require.NoError(t, err)
2122
require.NoError(t, xe.Sync(&repo_model.RepoUnit{}))
2223

2324
cnt, err := db.GetEngine(db.DefaultContext).Count(&repo_model.RepoUnit{})

0 commit comments

Comments
 (0)