Skip to content

Commit 7de6aad

Browse files
committed
IJPL-177878 Implement generic function for parallelism compensation
1 parent 0634a77 commit 7de6aad

File tree

5 files changed

+154
-4
lines changed

5 files changed

+154
-4
lines changed

IntelliJ-patches.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,19 @@ To acquire an analogue of `limitedParallelism` dispatcher which supports paralle
4545
with `.softLimitedParallelism`, e.g., `.limitedParallelism(1)` may be used as a synchronization manager and in this case
4646
exceeding the parallelism limit would eliminate this (likely expected) side effect.
4747

48+
### General parallelism compensation
49+
50+
`runBlocking` is not the only operation that causes thread starvation. In the IntelliJ platform, we are also dealing with
51+
the acquisition of read lock, which is essentially a blocking operation on a monitor. Any code can block on this monitor,
52+
which results in a deadlock when there are no threads in a limited-thread dispatcher that are able to cancel the cancelling coroutines.
53+
We provide a user-visible function for parallelism compensation for arbitrary blocking operation.
54+
4855
### API
4956
- `runBlockingWithParallelismCompensation` - an analogue of `runBlocking` which also compensates parallelism of the
5057
associated coroutine dispatcher when it decides to park the thread
5158
- `CoroutineDispatcher.softLimitedParallelism` – an analogue of `.limitedParallelism` which supports
5259
parallelism compensation
60+
- `runAndCompensateParallelism` - a wrapper for an arbitrary blocking operation that initiates parallelism compensation after deadline.
5361

5462
## Asynchronous stack traces for flows in the IDEA debugger
5563

kotlinx-coroutines-core/jvm/src/Builders.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package kotlinx.coroutines
77
import kotlinx.coroutines.scheduling.withCompensatedParallelism
88
import kotlin.contracts.*
99
import kotlin.coroutines.*
10+
import kotlin.time.Duration
1011

1112
/**
1213
* Runs a new coroutine and **blocks** the current thread _interruptibly_ until its completion.
@@ -113,7 +114,7 @@ private class BlockingCoroutine<T>(
113114
if (isCompleted) break
114115
if (parkNanos > 0) {
115116
if (compensateParallelism) {
116-
withCompensatedParallelism {
117+
withCompensatedParallelism(Duration.ZERO) {
117118
parkNanos(this, parkNanos)
118119
}
119120
} else {

kotlinx-coroutines-core/jvm/src/internal/intellij/intellij.kt

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@ import kotlinx.coroutines.internal.softLimitedParallelism as softLimitedParallel
1111
import kotlinx.coroutines.internal.SoftLimitedDispatcher
1212
import kotlinx.coroutines.runBlockingWithParallelismCompensation as runBlockingWithParallelismCompensationImpl
1313
import kotlinx.coroutines.Dispatchers
14+
import kotlinx.coroutines.scheduling.withCompensatedParallelism
1415
import kotlin.coroutines.CoroutineContext
1516
import kotlin.jvm.Throws
17+
import kotlin.time.Duration
1618

1719
internal val currentContextThreadLocal : ThreadLocal<CoroutineContext?> = ThreadLocal.withInitial { null }
1820

@@ -60,4 +62,23 @@ public object IntellijCoroutines {
6062
@Deprecated("use named version", level = DeprecationLevel.HIDDEN)
6163
public fun CoroutineDispatcher.softLimitedParallelism(parallelism: Int): CoroutineDispatcher =
6264
softLimitedParallelismImpl(parallelism, null)
65+
66+
/**
67+
* Executes [action] and **advises** to compensate parallelism if [action] does not finish within [timeout].
68+
*
69+
* Suppose that you are dealing with an operation that blocks the underlying thread.
70+
* If the blocked thread happens to be from a limited-thread-pool dispatcher, then you face the problem of thread starvation:
71+
* your system underutilizes the available CPU. In severe cases, this may result in a deadlock.
72+
* To increase the size of the thread pool, you can wrap the operation into this function.
73+
*
74+
* Remember that threads are expensive, so you need to choose an appropriate [timeout]
75+
* so that parallelism compensation would be less expensive than waiting for [action] to finish.
76+
*
77+
* There are intentionally few guarantees that this function will have side effects.
78+
* Generally, parallelism compensation applies only to threads that are ready for compensation,
79+
* such as [Dispatchers.Default] or [Dispatchers.IO].
80+
*/
81+
public fun <T> runAndCompensateParallelism(timeout: Duration, action: () -> T): T {
82+
return withCompensatedParallelism(timeout, action)
83+
}
6384
}

kotlinx-coroutines-core/jvm/src/scheduling/parallelismCompensation.kt

Lines changed: 68 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
package kotlinx.coroutines.scheduling
22

3+
import kotlinx.coroutines.DefaultDelay
4+
import kotlinx.coroutines.DisposableHandle
5+
import kotlinx.coroutines.internal.synchronized
6+
import kotlin.coroutines.EmptyCoroutineContext
7+
import kotlin.time.Duration
8+
39
private val parallelismCompensationEnabled: Boolean =
410
System.getProperty("kotlinx.coroutines.parallelism.compensation", "true").toBoolean()
511

@@ -11,21 +17,80 @@ private val parallelismCompensationEnabled: Boolean =
1117
* that eventually it will adjust to meet it.
1218
*/
1319
@Suppress("NOTHING_TO_INLINE") // better stacktrace
14-
internal inline fun <T> withCompensatedParallelism(noinline body: () -> T): T {
20+
internal inline fun <T> withCompensatedParallelism(timeout: Duration, noinline body: () -> T): T {
1521
if (!parallelismCompensationEnabled) {
1622
return body()
1723
}
1824
// CoroutineScheduler.Worker implements ParallelismCompensation
1925
val worker = Thread.currentThread() as? ParallelismCompensation
2026
?: return body()
21-
return worker.withCompensatedParallelism(body)
27+
return if (timeout == Duration.ZERO) {
28+
worker.withCompensatedParallelismImmediate(body)
29+
} else {
30+
worker.withCompensatedParallelismAfterDeadline(timeout, body)
31+
}
2232
}
2333

24-
private fun <T> ParallelismCompensation.withCompensatedParallelism(body: () -> T): T {
34+
private fun <T> ParallelismCompensation.withCompensatedParallelismImmediate(body: () -> T): T {
2535
increaseParallelismAndLimit()
2636
try {
2737
return body()
2838
} finally {
2939
decreaseParallelismLimit()
3040
}
41+
}
42+
43+
private fun <T> ParallelismCompensation.withCompensatedParallelismAfterDeadline(timeout: Duration, body: () -> T): T {
44+
val holder = WorkerProtector()
45+
val compensationHandle = initCompensationWithDeadline(this, holder, timeout)
46+
try {
47+
return body()
48+
} finally {
49+
endCompensationWithDeadline(this, holder, compensationHandle)
50+
}
51+
}
52+
53+
/**
54+
* The fields in [CoroutineScheduler.Worker] are not thread safe: these are plain java fields, and they are intended to
55+
* be accessed only by [CoroutineScheduler.Worker] itself. Since we want to modify them from other threads,
56+
* we need to establish proper happens-before relations for accesses to these fields.
57+
*/
58+
private class WorkerProtector() {
59+
@Volatile
60+
var wasTaken: Boolean = false
61+
}
62+
63+
private fun initCompensationWithDeadline(
64+
thread: ParallelismCompensation,
65+
state: WorkerProtector,
66+
timeout: Duration
67+
): DisposableHandle {
68+
return DefaultDelay.invokeOnTimeout(timeout.inWholeMilliseconds, {
69+
if (!state.wasTaken) {
70+
synchronized(state) {
71+
if (!state.wasTaken) {
72+
// so the task was not finished yet, which means that we can increase parallelism here
73+
thread.increaseParallelismAndLimit()
74+
state.wasTaken = true
75+
}
76+
}
77+
}
78+
}, EmptyCoroutineContext)
79+
}
80+
81+
private fun endCompensationWithDeadline(
82+
thread: ParallelismCompensation,
83+
state: WorkerProtector,
84+
compensationHandle: DisposableHandle,
85+
) {
86+
synchronized(state) {
87+
if (!state.wasTaken) {
88+
// so parallelism was not compensated at this moment; we can abort the compensating coroutine
89+
compensationHandle.dispose()
90+
state.wasTaken = true
91+
} else {
92+
// the compensating coroutine managed to publish a request for compensation. Now we need to decompensate
93+
thread.decreaseParallelismLimit()
94+
}
95+
}
3196
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package kotlinx.coroutines.scheduling
2+
3+
import kotlinx.coroutines.Dispatchers
4+
import kotlinx.coroutines.delay
5+
import kotlinx.coroutines.internal.intellij.IntellijCoroutines
6+
import kotlinx.coroutines.launch
7+
import kotlinx.coroutines.runBlocking
8+
import org.junit.Test
9+
import java.util.concurrent.CountDownLatch
10+
import kotlin.test.assertTrue
11+
import kotlin.time.Duration.Companion.milliseconds
12+
import kotlin.time.Duration.Companion.seconds
13+
14+
class GeneralParallelismCompensationTest : SchedulerTestBase() {
15+
16+
@Test
17+
fun `soundness - parallelism compensation ensures progress of operations`() {
18+
val operationsCount = CORE_POOL_SIZE * 10
19+
val barrier = CountDownLatch(operationsCount)
20+
runBlocking(Dispatchers.Default) {
21+
repeat(operationsCount) {
22+
launch {
23+
IntellijCoroutines.runAndCompensateParallelism(10.milliseconds) {
24+
barrier.countDown()
25+
barrier.await()
26+
}
27+
}
28+
}
29+
}
30+
}
31+
32+
@Test
33+
fun `laziness - parallelism compensation does not do anything before deadline`() {
34+
val operationsCount = CORE_POOL_SIZE * 3
35+
val barrier = CountDownLatch(operationsCount)
36+
runBlocking(Dispatchers.Default) {
37+
launch(Dispatchers.IO) {
38+
delay(1.seconds)
39+
assertTrue(barrier.count == CORE_POOL_SIZE.toLong() * 2)
40+
delay(2.seconds)
41+
assertTrue(barrier.count == CORE_POOL_SIZE.toLong())
42+
delay(2.seconds)
43+
assertTrue(barrier.count == 0L)
44+
}
45+
repeat(operationsCount) {
46+
launch {
47+
IntellijCoroutines.runAndCompensateParallelism(2.seconds) {
48+
barrier.countDown()
49+
barrier.await()
50+
}
51+
}
52+
}
53+
}
54+
}
55+
}

0 commit comments

Comments
 (0)