@@ -410,14 +410,14 @@ internal class CoroutineScheduler(
410
410
* this [block] may execute blocking operations (IO, system calls, locking primitives etc.)
411
411
*
412
412
* [taskContext] -- concurrency context of given [block].
413
- * [fair] -- whether this [dispatch] call is fair.
414
- * If `true` then the task will be dispatched in a FIFO manner.
413
+ * [tailDispatch] -- whether this [dispatch] call is the last action the (presumably) worker thread does in its current task.
414
+ * If `true`, then the task will be dispatched in a FIFO manner and no additional workers will be requested,
415
+ * but only if the current thread is a corresponding worker thread.
415
416
* Note that caller cannot be ensured that it is being executed on worker thread for the following reasons:
416
417
* - [CoroutineStart.UNDISPATCHED]
417
- * - Concurrent [close] that effectively shutdowns the worker thread.
418
- * Used for [yield].
418
+ * - Concurrent [close] that effectively shutdowns the worker thread
419
419
*/
420
- fun dispatch (block : Runnable , taskContext : TaskContext = NonBlockingContext , fair : Boolean = false) {
420
+ fun dispatch (block : Runnable , taskContext : TaskContext = NonBlockingContext , tailDispatch : Boolean = false) {
421
421
trackTask() // this is needed for virtual time support
422
422
val task = createTask(block, taskContext)
423
423
val isBlockingTask = task.isBlocking
@@ -426,18 +426,20 @@ internal class CoroutineScheduler(
426
426
val stateSnapshot = if (isBlockingTask) incrementBlockingTasks() else 0
427
427
// try to submit the task to the local queue and act depending on the result
428
428
val currentWorker = currentWorker()
429
- val notAdded = currentWorker.submitToLocalQueue(task, fair )
429
+ val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch )
430
430
if (notAdded != null ) {
431
431
if (! addToGlobalQueue(notAdded)) {
432
432
// Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
433
433
throw RejectedExecutionException (" $schedulerName was terminated" )
434
434
}
435
435
}
436
+ val skipUnpark = tailDispatch && currentWorker != null
436
437
// Checking 'task' instead of 'notAdded' is completely okay
437
438
if (isBlockingTask) {
438
439
// Use state snapshot to better estimate the number of running threads
439
- signalBlockingWork(stateSnapshot)
440
+ signalBlockingWork(stateSnapshot, skipUnpark = skipUnpark )
440
441
} else {
442
+ if (skipUnpark) return
441
443
signalCpuWork()
442
444
}
443
445
}
@@ -453,7 +455,8 @@ internal class CoroutineScheduler(
453
455
}
454
456
455
457
// NB: should only be called from 'dispatch' method due to blocking tasks increment
456
- private fun signalBlockingWork (stateSnapshot : Long ) {
458
+ private fun signalBlockingWork (stateSnapshot : Long , skipUnpark : Boolean ) {
459
+ if (skipUnpark) return
457
460
if (tryUnpark()) return
458
461
// Use state snapshot to avoid accidental thread overprovision
459
462
if (tryCreateWorker(stateSnapshot)) return
@@ -526,7 +529,7 @@ internal class CoroutineScheduler(
526
529
* Returns `null` if task was successfully added or an instance of the
527
530
* task that was not added or replaced (thus should be added to global queue).
528
531
*/
529
- private fun Worker?.submitToLocalQueue (task : Task , fair : Boolean ): Task ? {
532
+ private fun Worker?.submitToLocalQueue (task : Task , tailDispatch : Boolean ): Task ? {
530
533
if (this == null ) return task
531
534
/*
532
535
* This worker could have been already terminated from this thread by close/shutdown and it should not
@@ -538,7 +541,7 @@ internal class CoroutineScheduler(
538
541
return task
539
542
}
540
543
mayHaveLocalTasks = true
541
- return localQueue.add(task, fair = fair )
544
+ return localQueue.add(task, fair = tailDispatch )
542
545
}
543
546
544
547
private fun currentWorker (): Worker ? = (Thread .currentThread() as ? Worker )?.takeIf { it.scheduler == this }
0 commit comments