Skip to content

Commit 0590176

Browse files
authored
Only use boost workers for leveldb shadow queues (#15696)
* The leveldb shadow queue of a persistable channel queue should always start with 0 workers and just use boost to add additional workers if necessary. * create a zero boost so that if there are no workers in a pool - boost to start the workers * actually set timeout appropriately on boosted workers Signed-off-by: Andrew Thornton <art27@cantab.net>
1 parent 6ebd833 commit 0590176

File tree

3 files changed

+48
-10
lines changed

3 files changed

+48
-10
lines changed

modules/queue/queue_disk_channel.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,10 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
7575
BatchLength: config.BatchLength,
7676
BlockTimeout: 1 * time.Second,
7777
BoostTimeout: 5 * time.Minute,
78-
BoostWorkers: 5,
79-
MaxWorkers: 6,
78+
BoostWorkers: 1,
79+
MaxWorkers: 5,
8080
},
81-
Workers: 1,
81+
Workers: 0,
8282
Name: config.Name + "-level",
8383
},
8484
DataDir: config.DataDir,

modules/queue/unique_queue_disk_channel.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,12 @@ func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interfac
7373
WorkerPoolConfiguration: WorkerPoolConfiguration{
7474
QueueLength: config.QueueLength,
7575
BatchLength: config.BatchLength,
76-
BlockTimeout: 0,
77-
BoostTimeout: 0,
78-
BoostWorkers: 0,
79-
MaxWorkers: 1,
76+
BlockTimeout: 1 * time.Second,
77+
BoostTimeout: 5 * time.Minute,
78+
BoostWorkers: 1,
79+
MaxWorkers: 5,
8080
},
81-
Workers: 1,
81+
Workers: 0,
8282
Name: config.Name + "-level",
8383
},
8484
DataDir: config.DataDir,

modules/queue/workerpool.go

+40-2
Original file line numberDiff line numberDiff line change
@@ -70,14 +70,52 @@ func (p *WorkerPool) Push(data Data) {
7070
atomic.AddInt64(&p.numInQueue, 1)
7171
p.lock.Lock()
7272
if p.blockTimeout > 0 && p.boostTimeout > 0 && (p.numberOfWorkers <= p.maxNumberOfWorkers || p.maxNumberOfWorkers < 0) {
73-
p.lock.Unlock()
73+
if p.numberOfWorkers == 0 {
74+
p.zeroBoost()
75+
} else {
76+
p.lock.Unlock()
77+
}
7478
p.pushBoost(data)
7579
} else {
7680
p.lock.Unlock()
7781
p.dataChan <- data
7882
}
7983
}
8084

85+
func (p *WorkerPool) zeroBoost() {
86+
ctx, cancel := context.WithCancel(p.baseCtx)
87+
mq := GetManager().GetManagedQueue(p.qid)
88+
boost := p.boostWorkers
89+
if (boost+p.numberOfWorkers) > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0 {
90+
boost = p.maxNumberOfWorkers - p.numberOfWorkers
91+
}
92+
if mq != nil {
93+
log.Warn("WorkerPool: %d (for %s) has zero workers - adding %d temporary workers for %s", p.qid, mq.Name, boost, p.boostTimeout)
94+
95+
start := time.Now()
96+
pid := mq.RegisterWorkers(boost, start, true, start.Add(p.boostTimeout), cancel, false)
97+
go func() {
98+
select {
99+
case <-ctx.Done():
100+
case <-time.After(p.boostTimeout):
101+
}
102+
mq.RemoveWorkers(pid)
103+
cancel()
104+
}()
105+
} else {
106+
log.Warn("WorkerPool: %d has zero workers - adding %d temporary workers for %s", p.qid, p.boostWorkers, p.boostTimeout)
107+
go func() {
108+
select {
109+
case <-ctx.Done():
110+
case <-time.After(p.boostTimeout):
111+
}
112+
cancel()
113+
}()
114+
}
115+
p.lock.Unlock()
116+
p.addWorkers(ctx, boost)
117+
}
118+
81119
func (p *WorkerPool) pushBoost(data Data) {
82120
select {
83121
case p.dataChan <- data:
@@ -112,7 +150,7 @@ func (p *WorkerPool) pushBoost(data Data) {
112150
log.Warn("WorkerPool: %d (for %s) Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, mq.Name, ourTimeout, boost, p.boostTimeout, p.blockTimeout)
113151

114152
start := time.Now()
115-
pid := mq.RegisterWorkers(boost, start, false, start, cancel, false)
153+
pid := mq.RegisterWorkers(boost, start, true, start.Add(p.boostTimeout), cancel, false)
116154
go func() {
117155
<-ctx.Done()
118156
mq.RemoveWorkers(pid)

0 commit comments

Comments
 (0)