@@ -7,8 +7,10 @@ package git
7
7
import (
8
8
"context"
9
9
"crypto/sha1"
10
+ "errors"
10
11
"fmt"
11
12
"net/url"
13
+ "strconv"
12
14
"strings"
13
15
"time"
14
16
@@ -49,79 +51,67 @@ func init() {
49
51
db .RegisterModel (new (CommitStatusIndex ))
50
52
}
51
53
52
- // upsertCommitStatusIndex the function will not return until it acquires the lock or receives an error.
53
- func upsertCommitStatusIndex (ctx context.Context , repoID int64 , sha string ) (err error ) {
54
- // An atomic UPSERT operation (INSERT/UPDATE) is the only operation
55
- // that ensures that the key is actually locked.
56
- switch {
57
- case setting .Database .UseSQLite3 || setting .Database .UsePostgreSQL :
58
- _ , err = db .Exec (ctx , "INSERT INTO `commit_status_index` (repo_id, sha, max_index) " +
59
- "VALUES (?,?,1) ON CONFLICT (repo_id,sha) DO UPDATE SET max_index = `commit_status_index`.max_index+1" ,
60
- repoID , sha )
61
- case setting .Database .UseMySQL :
62
- _ , err = db .Exec (ctx , "INSERT INTO `commit_status_index` (repo_id, sha, max_index) " +
63
- "VALUES (?,?,1) ON DUPLICATE KEY UPDATE max_index = max_index+1" ,
64
- repoID , sha )
65
- case setting .Database .UseMSSQL :
66
- // https://weblogs.sqlteam.com/dang/2009/01/31/upsert-race-condition-with-merge/
67
- _ , err = db .Exec (ctx , "MERGE `commit_status_index` WITH (HOLDLOCK) as target " +
68
- "USING (SELECT ? AS repo_id, ? AS sha) AS src " +
69
- "ON src.repo_id = target.repo_id AND src.sha = target.sha " +
70
- "WHEN MATCHED THEN UPDATE SET target.max_index = target.max_index+1 " +
71
- "WHEN NOT MATCHED THEN INSERT (repo_id, sha, max_index) " +
72
- "VALUES (src.repo_id, src.sha, 1);" ,
73
- repoID , sha )
74
- default :
75
- return fmt .Errorf ("database type not supported" )
54
+ func postgresGetCommitStatusIndex (ctx context.Context , repoID int64 , sha string ) (int64 , error ) {
55
+ res , err := db .GetEngine (ctx ).Query ("INSERT INTO `commit_status_index` (repo_id, sha, max_index) " +
56
+ "VALUES (?,?,1) ON CONFLICT (repo_id, sha) DO UPDATE SET max_index = `commit_status_index`.max_index+1 RETURNING max_index" ,
57
+ repoID , sha )
58
+ if err != nil {
59
+ return 0 , err
60
+ }
61
+ if len (res ) == 0 {
62
+ return 0 , db .ErrGetResourceIndexFailed
76
63
}
77
- return err
64
+ return strconv . ParseInt ( string ( res [ 0 ][ "max_index" ]), 10 , 64 )
78
65
}
79
66
80
67
// GetNextCommitStatusIndex retried 3 times to generate a resource index
81
- func GetNextCommitStatusIndex (repoID int64 , sha string ) (int64 , error ) {
82
- for i := 0 ; i < db .MaxDupIndexAttempts ; i ++ {
83
- idx , err := getNextCommitStatusIndex (repoID , sha )
84
- if err == db .ErrResouceOutdated {
85
- continue
86
- }
87
- if err != nil {
88
- return 0 , err
89
- }
90
- return idx , nil
68
+ func GetNextCommitStatusIndex (ctx context.Context , repoID int64 , sha string ) (int64 , error ) {
69
+ if setting .Database .UsePostgreSQL {
70
+ return postgresGetCommitStatusIndex (ctx , repoID , sha )
91
71
}
92
- return 0 , db .ErrGetResourceIndexFailed
93
- }
94
72
95
- // getNextCommitStatusIndex return the next index
96
- func getNextCommitStatusIndex (repoID int64 , sha string ) (int64 , error ) {
97
- ctx , commiter , err := db .TxContext ()
73
+ e := db .GetEngine (ctx )
74
+
75
+ // try to update the max_index to next value, and acquire the write-lock for the record
76
+ res , err := e .Exec ("UPDATE `commit_status_index` SET max_index=max_index+1 WHERE repo_id=? AND sha=?" , repoID , sha )
98
77
if err != nil {
99
78
return 0 , err
100
79
}
101
- defer commiter .Close ()
102
-
103
- var preIdx int64
104
- _ , err = db .GetEngine (ctx ).SQL ("SELECT max_index FROM `commit_status_index` WHERE repo_id = ? AND sha = ?" , repoID , sha ).Get (& preIdx )
80
+ affected , err := res .RowsAffected ()
105
81
if err != nil {
106
82
return 0 , err
107
83
}
84
+ if affected == 0 {
85
+ // this slow path is only for the first time of creating a resource index
86
+ _ , errIns := e .Exec ("INSERT INTO `commit_status_index` (repo_id, sha, max_index) VALUES (?, ?, 0)" , repoID , sha )
87
+ res , err = e .Exec ("UPDATE `commit_status_index` SET max_index=max_index+1 WHERE repo_id=? AND sha=?" , repoID , sha )
88
+ if err != nil {
89
+ return 0 , err
90
+ }
108
91
109
- if err := upsertCommitStatusIndex (ctx , repoID , sha ); err != nil {
110
- return 0 , err
92
+ affected , err = res .RowsAffected ()
93
+ if err != nil {
94
+ return 0 , err
95
+ }
96
+ // if the update still can not update any records, the record must not exist and there must be some errors (insert error)
97
+ if affected == 0 {
98
+ if errIns == nil {
99
+ return 0 , errors .New ("impossible error when GetNextCommitStatusIndex, insert and update both succeeded but no record is updated" )
100
+ }
101
+ return 0 , errIns
102
+ }
111
103
}
112
104
113
- var curIdx int64
114
- has , err := db .GetEngine (ctx ).SQL ("SELECT max_index FROM `commit_status_index` WHERE repo_id = ? AND sha = ? AND max_index=?" , repoID , sha , preIdx + 1 ).Get (& curIdx )
105
+ // now, the new index is in database (protected by the transaction and write-lock)
106
+ var newIdx int64
107
+ has , err := e .SQL ("SELECT max_index FROM `commit_status_index` WHERE repo_id=? AND sha=?" , repoID , sha ).Get (& newIdx )
115
108
if err != nil {
116
109
return 0 , err
117
110
}
118
111
if ! has {
119
- return 0 , db .ErrResouceOutdated
120
- }
121
- if err := commiter .Commit (); err != nil {
122
- return 0 , err
112
+ return 0 , errors .New ("impossible error when GetNextCommitStatusIndex, upsert succeeded but no record can be selected" )
123
113
}
124
- return curIdx , nil
114
+ return newIdx , nil
125
115
}
126
116
127
117
func (status * CommitStatus ) loadAttributes (ctx context.Context ) (err error ) {
@@ -291,18 +281,18 @@ func NewCommitStatus(opts NewCommitStatusOptions) error {
291
281
return fmt .Errorf ("NewCommitStatus[%s, %s]: no user specified" , repoPath , opts .SHA )
292
282
}
293
283
294
- // Get the next Status Index
295
- idx , err := GetNextCommitStatusIndex (opts .Repo .ID , opts .SHA )
296
- if err != nil {
297
- return fmt .Errorf ("generate commit status index failed: %w" , err )
298
- }
299
-
300
284
ctx , committer , err := db .TxContext ()
301
285
if err != nil {
302
286
return fmt .Errorf ("NewCommitStatus[repo_id: %d, user_id: %d, sha: %s]: %w" , opts .Repo .ID , opts .Creator .ID , opts .SHA , err )
303
287
}
304
288
defer committer .Close ()
305
289
290
+ // Get the next Status Index
291
+ idx , err := GetNextCommitStatusIndex (ctx , opts .Repo .ID , opts .SHA )
292
+ if err != nil {
293
+ return fmt .Errorf ("generate commit status index failed: %w" , err )
294
+ }
295
+
306
296
opts .CommitStatus .Description = strings .TrimSpace (opts .CommitStatus .Description )
307
297
opts .CommitStatus .Context = strings .TrimSpace (opts .CommitStatus .Context )
308
298
opts .CommitStatus .TargetURL = strings .TrimSpace (opts .CommitStatus .TargetURL )
@@ -316,7 +306,7 @@ func NewCommitStatus(opts NewCommitStatusOptions) error {
316
306
317
307
// Insert new CommitStatus
318
308
if _ , err = db .GetEngine (ctx ).Insert (opts .CommitStatus ); err != nil {
319
- return fmt .Errorf ("Insert CommitStatus[%s, %s]: %w" , repoPath , opts .SHA , err )
309
+ return fmt .Errorf ("insert CommitStatus[%s, %s]: %w" , repoPath , opts .SHA , err )
320
310
}
321
311
322
312
return committer .Commit ()
0 commit comments