Skip to content

Commit 6f95a1b

Browse files
authored
[Unsafe API 1/5] Organize Buffer's segments as a regular list (#332)
Previously, Buffer's segments were organized into a circular list. That allowed storing only a single reference to the buffer's head and also facilitated the insertion/removal of new list nodes. The downside of a circular list is that one has to always compare a current node with a head when iterating over segments. That complicates the implementation of a public API for segment iterations. See #135 (comment) for details on segment iteration API.
1 parent e0d5fa9 commit 6f95a1b

File tree

11 files changed

+167
-114
lines changed

11 files changed

+167
-114
lines changed

core/apple/src/AppleCore.kt

+2-4
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,7 @@ private open class OutputStreamSink(
5252
source.size -= bytesWritten
5353

5454
if (head.pos == head.limit) {
55-
source.head = head.pop()
56-
SegmentPool.recycle(head)
55+
source.recycleHead()
5756
}
5857
}
5958
}
@@ -101,8 +100,7 @@ private open class NSInputStreamSource(
101100
if (bytesRead == 0L) {
102101
if (tail.pos == tail.limit) {
103102
// We allocated a tail segment, but didn't end up needing it. Recycle!
104-
sink.head = tail.pop()
105-
SegmentPool.recycle(tail)
103+
sink.recycleTail()
106104
}
107105
return -1
108106
}

core/apple/src/BuffersApple.kt

+5-4
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@
88
package kotlinx.io
99

1010
import kotlinx.cinterop.*
11-
import platform.Foundation.*
11+
import platform.Foundation.NSData
12+
import platform.Foundation.create
13+
import platform.Foundation.data
1214
import platform.darwin.NSUIntegerMax
1315
import platform.posix.*
1416

@@ -44,8 +46,7 @@ internal fun Buffer.readAtMostTo(sink: CPointer<uint8_tVar>, maxLength: Int): In
4446
size -= toCopy.toLong()
4547

4648
if (s.pos == s.limit) {
47-
head = s.pop()
48-
SegmentPool.recycle(s)
49+
recycleHead()
4950
}
5051

5152
return toCopy
@@ -70,6 +71,6 @@ internal fun Buffer.snapshotAsNSData(): NSData {
7071
}
7172
curr = curr.next
7273
index += length
73-
} while (curr !== head)
74+
} while (curr != null)
7475
return NSData.create(bytesNoCopy = bytes, length = size.convert())
7576
}

core/common/src/Buffer.kt

+97-47
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ public class Buffer : Source, Sink {
4242
@JvmField
4343
internal var head: Segment? = null
4444

45+
@JvmField
46+
internal var tail: Segment? = null
47+
4548
/**
4649
* The number of bytes accessible for read from this buffer.
4750
*/
@@ -76,8 +79,7 @@ public class Buffer : Source, Sink {
7679
val b = data[pos++]
7780
size -= 1L
7881
if (pos == limit) {
79-
head = segment.pop()
80-
SegmentPool.recycle(segment)
82+
recycleHead()
8183
} else {
8284
segment.pos = pos
8385
}
@@ -102,8 +104,7 @@ public class Buffer : Source, Sink {
102104
size -= 2L
103105

104106
if (pos == limit) {
105-
head = segment.pop()
106-
SegmentPool.recycle(segment)
107+
recycleHead()
107108
} else {
108109
segment.pos = pos
109110
}
@@ -138,8 +139,7 @@ public class Buffer : Source, Sink {
138139
size -= 4L
139140

140141
if (pos == limit) {
141-
head = segment.pop()
142-
SegmentPool.recycle(segment)
142+
recycleHead()
143143
} else {
144144
segment.pos = pos
145145
}
@@ -176,8 +176,7 @@ public class Buffer : Source, Sink {
176176
size -= 8L
177177

178178
if (pos == limit) {
179-
head = segment.pop()
180-
SegmentPool.recycle(segment)
179+
recycleHead()
181180
} else {
182181
segment.pos = pos
183182
}
@@ -241,13 +240,7 @@ public class Buffer : Source, Sink {
241240
val copy = s!!.sharedCopy()
242241
copy.pos += currentOffset.toInt()
243242
copy.limit = minOf(copy.pos + remainingByteCount.toInt(), copy.limit)
244-
if (out.head == null) {
245-
copy.prev = copy
246-
copy.next = copy.prev
247-
out.head = copy.next
248-
} else {
249-
out.head!!.prev!!.push(copy)
250-
}
243+
out.pushSegment(copy)
251244
remainingByteCount -= (copy.limit - copy.pos).toLong()
252245
currentOffset = 0L
253246
s = s.next
@@ -264,7 +257,7 @@ public class Buffer : Source, Sink {
264257
if (result == 0L) return 0L
265258

266259
// Omit the tail if it's still writable.
267-
val tail = head!!.prev!!
260+
val tail = tail!!
268261
if (tail.limit < Segment.SIZE && tail.owner) {
269262
result -= (tail.limit - tail.pos).toLong()
270263
}
@@ -317,8 +310,7 @@ public class Buffer : Source, Sink {
317310
head.pos += toSkip
318311

319312
if (head.pos == head.limit) {
320-
this.head = head.pop()
321-
SegmentPool.recycle(head)
313+
recycleHead()
322314
}
323315
}
324316
}
@@ -336,8 +328,7 @@ public class Buffer : Source, Sink {
336328
size -= toCopy.toLong()
337329

338330
if (s.pos == s.limit) {
339-
head = s.pop()
340-
SegmentPool.recycle(s)
331+
recycleHead()
341332
}
342333

343334
return toCopy
@@ -377,19 +368,20 @@ public class Buffer : Source, Sink {
377368
internal fun writableSegment(minimumCapacity: Int): Segment {
378369
require(minimumCapacity >= 1 && minimumCapacity <= Segment.SIZE) { "unexpected capacity" }
379370

380-
if (head == null) {
371+
if (tail == null) {
381372
val result = SegmentPool.take() // Acquire a first segment.
382373
head = result
383-
result.prev = result
384-
result.next = result
374+
tail = result
385375
return result
386376
}
387377

388-
var tail = head!!.prev
389-
if (tail!!.limit + minimumCapacity > Segment.SIZE || !tail.owner) {
390-
tail = tail.push(SegmentPool.take()) // Append a new empty segment to fill up.
378+
val t = tail!!
379+
if (t.limit + minimumCapacity > Segment.SIZE || !t.owner) {
380+
val newTail = t.push(SegmentPool.take()) // Append a new empty segment to fill up.
381+
tail = newTail
382+
return newTail
391383
}
392-
return tail
384+
return t
393385
}
394386

395387
override fun write(source: ByteArray, startIndex: Int, endIndex: Int) {
@@ -486,7 +478,7 @@ public class Buffer : Source, Sink {
486478
while (remainingByteCount > 0L) {
487479
// Is a prefix of the source's head segment all that we need to move?
488480
if (remainingByteCount < source.head!!.limit - source.head!!.pos) {
489-
val tail = if (head != null) head!!.prev else null
481+
val tail = tail
490482
if (tail != null && tail.owner &&
491483
remainingByteCount + tail.limit - (if (tail.shared) 0 else tail.pos) <= Segment.SIZE
492484
) {
@@ -498,23 +490,22 @@ public class Buffer : Source, Sink {
498490
} else {
499491
// We're going to need another segment. Split the source's head
500492
// segment in two, then move the first of those two to this buffer.
501-
source.head = source.head!!.split(remainingByteCount.toInt())
493+
val newHead = source.head!!.split(remainingByteCount.toInt())
494+
if (source.head == source.tail) {
495+
source.tail = newHead
496+
}
497+
source.head = newHead
502498
}
503499
}
504500

505501
// Remove the source's head segment and append it to our tail.
506502
val segmentToMove = source.head
507503
val movedByteCount = (segmentToMove!!.limit - segmentToMove.pos).toLong()
508504
source.head = segmentToMove.pop()
509-
if (head == null) {
510-
head = segmentToMove
511-
segmentToMove.prev = segmentToMove
512-
segmentToMove.next = segmentToMove.prev
513-
} else {
514-
var tail = head!!.prev
515-
tail = tail!!.push(segmentToMove)
516-
tail.compact()
505+
if (source.head == null) {
506+
source.tail = null
517507
}
508+
pushSegment(segmentToMove, true)
518509
source.size -= movedByteCount
519510
size += movedByteCount
520511
remainingByteCount -= movedByteCount
@@ -582,16 +573,15 @@ public class Buffer : Source, Sink {
582573
val result = Buffer()
583574
if (size == 0L) return result
584575

585-
val head = head!!
576+
val head = this.head!!
586577
val headCopy = head.sharedCopy()
587578

588579
result.head = headCopy
589-
headCopy.prev = result.head
590-
headCopy.next = headCopy.prev
580+
result.tail = headCopy
591581

592582
var s = head.next
593-
while (s !== head) {
594-
headCopy.prev!!.push(s!!.sharedCopy())
583+
while (s != null) {
584+
result.tail = result.tail!!.push(s.sharedCopy())
595585
s = s.next
596586
}
597587

@@ -642,6 +632,63 @@ public class Buffer : Source, Sink {
642632

643633
return "Buffer(size=$size hex=$builder)"
644634
}
635+
636+
/**
637+
* Unlinks and recycles this buffer's head.
638+
*
639+
* If head had a successor, it'll become a new head.
640+
* Otherwise, both [head] and [tail] will be set to null.
641+
*
642+
* It's up to a caller to ensure that the head exists.
643+
*/
644+
internal fun recycleHead() {
645+
val oldHead = head!!
646+
val nextHead = oldHead.next
647+
head = nextHead
648+
if (nextHead == null) {
649+
tail = null
650+
} else {
651+
nextHead.prev = null
652+
}
653+
oldHead.next = null
654+
SegmentPool.recycle(oldHead)
655+
}
656+
657+
/**
658+
* Unlinks and recycles this buffer's tail segment.
659+
*
660+
* If tail had a predecessor, it'll become a new tail.
661+
* Otherwise, both [head] and [tail] will be set to null.
662+
*
663+
* It's up to a caller to ensure that the tail exists.
664+
*/
665+
internal fun recycleTail() {
666+
val oldTail = tail!!
667+
val newTail = oldTail.prev
668+
tail = newTail
669+
if (newTail == null) {
670+
head = null
671+
} else {
672+
newTail.next = null
673+
}
674+
oldTail.prev = null
675+
SegmentPool.recycle(oldTail)
676+
}
677+
678+
@Suppress("NOTHING_TO_INLINE")
679+
private inline fun pushSegment(newTail: Segment, tryCompact: Boolean = false) {
680+
if (head == null) {
681+
head = newTail
682+
tail = newTail
683+
} else if (tryCompact) {
684+
tail = tail!!.push(newTail).compact()
685+
if (tail!!.prev == null) {
686+
head = tail
687+
}
688+
} else {
689+
tail = tail!!.push(newTail)
690+
}
691+
}
645692
}
646693

647694
/**
@@ -652,23 +699,26 @@ internal inline fun <T> Buffer.seek(
652699
fromIndex: Long,
653700
lambda: (Segment?, Long) -> T
654701
): T {
655-
var s: Segment = head ?: return lambda(null, -1L)
702+
if (this.head == null) lambda(null, -1L)
656703

657704
if (size - fromIndex < fromIndex) {
705+
var s = tail
658706
// We're scanning in the back half of this buffer. Find the segment starting at the back.
659707
var offset = size
660-
while (offset > fromIndex) {
661-
s = s.prev!!
708+
while (s != null && offset > fromIndex) {
662709
offset -= (s.limit - s.pos).toLong()
710+
if (offset <= fromIndex) break
711+
s = s.prev
663712
}
664713
return lambda(s, offset)
665714
} else {
715+
var s = this.head
666716
// We're scanning in the front half of this buffer. Find the segment starting at the front.
667717
var offset = 0L
668-
while (true) {
718+
while (s != null) {
669719
val nextOffset = offset + (s.limit - s.pos)
670720
if (nextOffset > fromIndex) break
671-
s = s.next!!
721+
s = s.next
672722
offset = nextOffset
673723
}
674724
return lambda(s, offset)

core/common/src/Buffers.kt

+5-4
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public fun Buffer.snapshot(): ByteString {
2424
check(curr != null) { "Current segment is null" }
2525
append(curr.data, curr.pos, curr.limit)
2626
curr = curr.next
27-
} while (curr !== head)
27+
} while (curr != null)
2828
}
2929
}
3030

@@ -53,10 +53,11 @@ public fun Buffer.indexOf(byte: Byte, startIndex: Long = 0, endIndex: Long = siz
5353
if (o == -1L) {
5454
return -1L
5555
}
56-
var segment = seg!!
56+
var segment: Segment? = seg!!
5757
var offset = o
5858
do {
5959
check(endOffset > offset)
60+
segment!!
6061
val idx = segment.indexOf(
6162
byte,
6263
// If start index within this segment, the diff will be positive and
@@ -71,8 +72,8 @@ public fun Buffer.indexOf(byte: Byte, startIndex: Long = 0, endIndex: Long = siz
7172
return offset + idx.toLong()
7273
}
7374
offset += segment.size
74-
segment = segment.next!!
75-
} while (segment !== head && offset < endOffset)
75+
segment = segment.next
76+
} while (segment != null && offset < endOffset)
7677
return -1L
7778
}
7879
}

core/common/src/ByteStrings.kt

+5-4
Original file line numberDiff line numberDiff line change
@@ -129,9 +129,10 @@ public fun Buffer.indexOf(byteString: ByteString, startIndex: Long = 0): Long {
129129
if (o == -1L) {
130130
return -1L
131131
}
132-
var segment = seg!!
132+
var segment: Segment? = seg
133133
var offset = o
134134
do {
135+
segment!!
135136
// If start index within this segment, the diff will be positive and
136137
// we'll scan the segment starting from the corresponding offset.
137138
// Otherwise, the diff will be negative and we'll scan the segment from the beginning.
@@ -147,16 +148,16 @@ public fun Buffer.indexOf(byteString: ByteString, startIndex: Long = 0): Long {
147148
val firstOutboundOffset = maxOf(startOffset, segment.size - byteStringData.size + 1)
148149
// Try to find a pattern in all suffixes shorter than the pattern. These suffixes start
149150
// in the current segment, but ends in the following segments; thus we're using outbound function.
150-
val idx1 = segment.indexOfBytesOutbound(byteStringData, firstOutboundOffset, head)
151+
val idx1 = segment.indexOfBytesOutbound(byteStringData, firstOutboundOffset)
151152
if (idx1 != -1) {
152153
// Offset corresponds to the segment's start, idx - to offset within the segment.
153154
return offset + idx1.toLong()
154155
}
155156

156157
// We scanned the whole segment, so let's go to the next one
157158
offset += segment.size
158-
segment = segment.next!!
159-
} while (segment !== head && offset + byteString.size <= size)
159+
segment = segment.next
160+
} while (segment != null && offset + byteString.size <= size)
160161
return -1L
161162
}
162163
}

0 commit comments

Comments
 (0)