Skip to content

Introduces reworked Payload API and new Metadata API #117

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.rsocket.kotlin.benchmarks

import io.ktor.utils.io.core.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.core.*
import io.rsocket.kotlin.payload.*
Expand Down Expand Up @@ -62,9 +63,9 @@ class RSocketKotlinBenchmark : RSocketBenchmark<Payload>() {
}
}

override fun createPayload(size: Int): Payload = if (size == 0) Payload.Empty else Payload.wrap(
data = ByteArray(size / 2).also { Random.nextBytes(it) },
metadata = ByteArray(size / 2).also { Random.nextBytes(it) }
override fun createPayload(size: Int): Payload = if (size == 0) Payload.Empty else Payload(
data = ByteReadPacket(ByteArray(size / 2).also { Random.nextBytes(it) }),
metadata = ByteReadPacket(ByteArray(size / 2).also { Random.nextBytes(it) })
)

override fun releasePayload(payload: Payload) {
Expand Down
5 changes: 5 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -206,13 +206,18 @@ subprojects {
if (name.contains("test", ignoreCase = true) || project.name == "rsocket-test") {
useExperimentalAnnotation("kotlin.time.ExperimentalTime")
useExperimentalAnnotation("kotlin.ExperimentalStdlibApi")

useExperimentalAnnotation("kotlinx.coroutines.ExperimentalCoroutinesApi")
useExperimentalAnnotation("kotlinx.coroutines.InternalCoroutinesApi")
useExperimentalAnnotation("kotlinx.coroutines.ObsoleteCoroutinesApi")
useExperimentalAnnotation("kotlinx.coroutines.FlowPreview")

useExperimentalAnnotation("io.ktor.util.KtorExperimentalAPI")
useExperimentalAnnotation("io.ktor.util.InternalAPI")
useExperimentalAnnotation("io.ktor.utils.io.core.internal.DangerousInternalIoApi")

useExperimentalAnnotation("io.rsocket.kotlin.TransportApi")
useExperimentalAnnotation("io.rsocket.kotlin.ExperimentalMetadataApi")
}
}
}
Expand Down
59 changes: 59 additions & 0 deletions examples/interactions/src/jvmMain/kotlin/MetadataExample.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import io.ktor.utils.io.core.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.core.*
import io.rsocket.kotlin.metadata.*
import io.rsocket.kotlin.payload.*

@OptIn(ExperimentalMetadataApi::class)
public fun main() {
val p1 = buildPayload {
data("Hello")

metadata("some text")
}

println("Case: 1")
println(p1.data.readText())
println(p1.metadata?.readText())

val p2 = buildPayload {
data("Hello")

metadata(RoutingMetadata("tag1", "tag2"))
}

val tags = p2.metadata?.read(RoutingMetadata)?.tags.orEmpty()
println("Case: 2")
println(tags)


val p3 = buildPayload {
data("hello")

compositeMetadata {
add(RoutingMetadata("tag3", "t4"))
add(RawMetadata(WellKnownMimeType.ApplicationJson, buildPacket { writeText("{s: 2}") }))
}
}

val cm = p3.metadata!!.read(CompositeMetadata)
println("Case: 3")
println(cm[RoutingMetadata].tags)
println(cm[WellKnownMimeType.ApplicationJson].readText())
}
28 changes: 28 additions & 0 deletions examples/interactions/src/jvmMain/kotlin/shared.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import io.ktor.utils.io.core.*
import io.rsocket.kotlin.payload.*

/**
* Simple custom [Payload] factory function with string data and metadata.
* Has almost no overhead over call to [Payload] constructor with data and metadata as [ByteReadPacket].
* Similar functions can be created for all needed use cases
*/
fun Payload(data: String, metadata: String? = null): Payload = buildPayload {
data(data)
if (metadata != null) metadata(metadata)
}
2 changes: 1 addition & 1 deletion examples/multiplatform-chat/src/clientMain/kotlin/Api.kt
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,6 @@ suspend fun connectToApiUsingTCP(name: String): Api {

private fun connector(name: String): RSocketConnector = RSocketConnector {
connectionConfig {
setupPayload { Payload(name) }
setupPayload { buildPayload { data(name) } }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change this line. Seems simpler before

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Old Payload constructors were removed in favor of buildPayload. They were really like example constructors. F.e. in that particular case it was Payload(data: String, metadata: String? = null) but for sure, such constructor will be needed in very small amount of cases. And f.e. upper I've used general buildPayload to create such narrow case constructor.
So from user perspective, similar functions can be created per use cases they need

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a matter of preference. I'd consider using buildPayload() when I want to incrementally build the payload like buildString(). But if I have the full content already ready, even allowing string -> byte conversions, I'd like Payload(...).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

main question for me is, Do we really need functions like Payload(String, String?)?

anyway, may be it's better to change name of buildPayload ? to what? Payload {} ? not sure about this

Im not sure that a constructor per combination of data/metadata possible types is good.
So, if we want to support all cases with Payload we need 15 functions for that to combine data=String|BRP|ByteArray and metadata=String|ByteArray|BRP|Metadata|null.
In current solution those constructors can be emulated on user side, if they use it frequently with 3-4 lines of code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was only asking for constructor for complete string or byte[] cases.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, will return back string/string and bytes/bytes functions

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
*/

import io.ktor.utils.io.core.*
import io.rsocket.kotlin.metadata.*
import io.rsocket.kotlin.payload.*

fun Payload(route: String, packet: ByteReadPacket): Payload = Payload {
fun Payload(route: String, packet: ByteReadPacket): Payload = buildPayload {
data(packet)
metadata(route)
metadata(RoutingMetadata(route))
}
18 changes: 7 additions & 11 deletions examples/multiplatform-chat/src/commonMain/kotlin/Serialization.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

import io.ktor.utils.io.core.*
import io.rsocket.kotlin.metadata.*
import io.rsocket.kotlin.payload.*
import kotlinx.serialization.*
import kotlinx.serialization.protobuf.*
Expand All @@ -27,20 +28,15 @@ val ConfiguredProtoBuf = ProtoBuf
inline fun <reified T> ProtoBuf.decodeFromPayload(payload: Payload): T = decodeFromByteArray(payload.data.readBytes())

@ExperimentalSerializationApi
inline fun <reified T> ProtoBuf.encodeToPayload(route: String, value: T): Payload {
return kotlin.runCatching { //TODO some ktor issue...
Payload {
data(encodeToByteArray(value))
metadata(route)
}
}.getOrNull() ?: Payload {
data(encodeToByteArray(value))
metadata(route)
}
inline fun <reified T> ProtoBuf.encodeToPayload(route: String, value: T): Payload = buildPayload {
data(encodeToByteArray(value))
metadata(RoutingMetadata(route))
}

@ExperimentalSerializationApi
inline fun <reified T> ProtoBuf.encodeToPayload(value: T): Payload = Payload(encodeToByteArray(value))
inline fun <reified T> ProtoBuf.encodeToPayload(value: T): Payload = buildPayload {
data(encodeToByteArray(value))
}

@ExperimentalSerializationApi
inline fun <reified I, reified O> ProtoBuf.decoding(payload: Payload, block: (I) -> O): Payload =
Expand Down
30 changes: 14 additions & 16 deletions examples/multiplatform-chat/src/serverJvmMain/kotlin/App.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ import io.ktor.util.*
import io.ktor.websocket.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.core.*
import io.rsocket.kotlin.metadata.*
import io.rsocket.kotlin.payload.*
import io.rsocket.kotlin.transport.ktor.*
import io.rsocket.kotlin.transport.ktor.server.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.serialization.*

@OptIn(KtorExperimentalAPI::class, ExperimentalSerializationApi::class)
@OptIn(KtorExperimentalAPI::class, ExperimentalSerializationApi::class, ExperimentalMetadataApi::class)
fun main() {
val proto = ConfiguredProtoBuf
val users = Users()
Expand All @@ -43,6 +45,8 @@ fun main() {

val rSocketServer = RSocketServer()

fun Payload.route(): String = metadata?.read(RoutingMetadata)?.tags?.first() ?: error("No route provided")
Copy link
Member

@yschimke yschimke Nov 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should put these as part of the public API for each specific domain via extension methods. Seems here it's a local helper we would duplicate a lot.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?
It's really local helper to get one route, for particular example. In real world, getting only first tag will not be enough I assume

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you gave the best example, for each domain there will be different conventions to use these. Rather that just providing access as a dictionary, it would be nice to have extension methods you can import to make it feel natural to each domain.


//create acceptor
val acceptor = ConnectionAcceptor {
val userName = config.setupPayload.data.readText()
Expand All @@ -52,9 +56,7 @@ fun main() {
RSocketRequestHandler {
fireAndForget {
withContext(session) {
when (val route = it.metadata?.readText()) {
null -> error("No route provided")

when (val route = it.route()) {
"users.deleteMe" -> userApi.deleteMe()

else -> error("Wrong route: $route")
Expand All @@ -63,17 +65,15 @@ fun main() {
}
requestResponse {
withContext(session) {
when (val route = it.metadata?.readText()) {
null -> error("No route provided")
when (val route = it.route()) {
"users.getMe" -> proto.encodeToPayload(userApi.getMe())
"users.all" -> proto.encodeToPayload(userApi.all())

"users.getMe" -> proto.encodeToPayload(userApi.getMe())
"users.all" -> proto.encodeToPayload(userApi.all())
"chats.all" -> proto.encodeToPayload(chatsApi.all())
"chats.new" -> proto.decoding<NewChat, Chat>(it) { (name) -> chatsApi.new(name) }
"chats.delete" -> proto.decoding<DeleteChat>(it) { (id) -> chatsApi.delete(id) }

"chats.all" -> proto.encodeToPayload(chatsApi.all())
"chats.new" -> proto.decoding<NewChat, Chat>(it) { (name) -> chatsApi.new(name) }
"chats.delete" -> proto.decoding<DeleteChat>(it) { (id) -> chatsApi.delete(id) }

"messages.send" -> proto.decoding<SendMessage, Message>(it) { (chatId, content) ->
"messages.send" -> proto.decoding<SendMessage, Message>(it) { (chatId, content) ->
messagesApi.send(chatId, content)
}
"messages.history" -> proto.decoding<HistoryMessages, List<Message>>(it) { (chatId, limit) ->
Expand All @@ -85,9 +85,7 @@ fun main() {
}
}
requestStream {
when (val route = it.metadata?.readText()) {
null -> error("No route provided")

when (val route = it.route()) {
"messages.stream" -> {
val (chatId, fromMessageId) = proto.decodeFromPayload<StreamMessages>(it)
messagesApi.messages(chatId, fromMessageId).map { m -> proto.encodeToPayload(m) }
Expand Down
2 changes: 1 addition & 1 deletion examples/nodejs-tcp-transport/src/jsMain/kotlin/Server.kt
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ fun main() {
RSocketRequestHandler {
requestResponse {
println("Received: ${it.data.readText()}")
Payload("Hello from nodejs")
buildPayload { data("Hello from nodejs") }
}
}
}
Expand Down
109 changes: 109 additions & 0 deletions playground/src/commonMain/kotlin/Metadata.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import io.ktor.utils.io.core.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.core.*
import io.rsocket.kotlin.metadata.*
import io.rsocket.kotlin.payload.*

@OptIn(ExperimentalMetadataApi::class)
public fun metadata() {

//writing

val routing = RoutingMetadata("tag1", "route1", "something")
val tracing = ZipkinTracingMetadata(
kind = ZipkinTracingMetadata.Kind.Sample,
traceId = 123L,
spanId = 123L
)
val cm1 = buildCompositeMetadata {
add(routing)
add(tracing)
}
// or
val cm2 = CompositeMetadata(routing, tracing)

//all lambdas are inline, and don't create another objects for builders
//so no overhead comparing to using plain Payload(data, metadata) function
val payload = buildPayload {
data("Some data")
metadata(cm2)
// or
metadata(cm1)
// or
compositeMetadata {
add(routing)
add(tracing)

add(WellKnownMimeType.ApplicationAvro, buildPacket { writeText("some custom metadata building") })
//or
val raw = RawMetadata(WellKnownMimeType.ApplicationAvro, ByteReadPacket.Empty)

add(raw)
}
}

//reading
//property types are not needed, just for better online readability
val cm: CompositeMetadata = payload.metadata?.read(CompositeMetadata) ?: return

val rmr = RawMetadata.reader(WellKnownMimeType.ApplicationAvro)


val rm: RoutingMetadata = cm.get(RoutingMetadata)
val tm: ZipkinTracingMetadata = cm.get(ZipkinTracingMetadata)

//or
val rm1: RoutingMetadata = cm[RoutingMetadata]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think examples like this are better through well known extension methods

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain a little?
Or, show better example?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cm.route()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see above.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, you propose to add something like val Payload.routingMetadata: RoutingMetadata or fun Payload.routingMetadata(): RoutingMetadata which can be read once, right?
and same accessors for other out-of-the-box metadatas?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's weird on Payload, but as an extension method you choose to import it.

Also the domain specific helper could hide any logic to check is CompositeMetadata vs single (non-composite) types, or potentially include defaults that make sense for that type.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks ok, will add it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not overthink it. Let's land and iterate for now. I use the routing rules from client side, so let's see that in the context of rsocket-cli and my own project.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, So may be, let's merge it now and you will check it
If so, we can at least release 0.11.0 with those changes already done
and continues with incremental releases per feature

val tm1: ZipkinTracingMetadata = cm[ZipkinTracingMetadata]

//or
val rmNull: RoutingMetadata? = cm.getOrNull(RoutingMetadata)
val tmNull: ZipkinTracingMetadata? = cm.getOrNull(ZipkinTracingMetadata)

//or
val rmList: List<RoutingMetadata> = cm.list(RoutingMetadata) //spec allow this

//or
val c: Boolean = cm.contains(RoutingMetadata)
val c2: Boolean = RoutingMetadata in cm

//or

cm.entries.forEach {
if (it.hasMimeTypeOf(RoutingMetadata)) {
val rm2: RoutingMetadata = it.read(RoutingMetadata)
}
//or
val tm2: ZipkinTracingMetadata? = it.readOrNull(ZipkinTracingMetadata)


}

//usage

rm.tags.forEach {
println("tag: $it")
}

tm.traceId
tm.traceIdHigh

val span = "${tm.parentSpanId}-${tm.spanId}"

}
5 changes: 4 additions & 1 deletion playground/src/commonMain/kotlin/Stub.kt
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ suspend fun RSocket.doSomething() {
// launch { rSocket.fireAndForget(Payload(byteArrayOf(1, 1, 1), byteArrayOf(2, 2, 2))) }
// launch { rSocket.metadataPush(byteArrayOf(1, 2, 3)) }
var i = 0
requestStream(Payload(byteArrayOf(1, 1, 1), byteArrayOf(2, 2, 2))).buffer(10000).collect {
requestStream(buildPayload {
data(byteArrayOf(1, 1, 1))
metadata(byteArrayOf(2, 2, 2))
}).buffer(10000).collect {
println(it.data.readBytes().contentToString())
if (++i == 10000) error("")
}
Expand Down
2 changes: 1 addition & 1 deletion playground/src/commonMain/kotlin/TCP.kt
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,6 @@ suspend fun testNodeJsServer(dispatcher: CoroutineContext) {
val transport = aSocket(SelectorManager(dispatcher)).tcp().clientTransport("127.0.0.1", 9000)
val client = RSocketConnector().connect(transport)

val response = client.requestResponse(Payload("Hello from JVM"))
val response = client.requestResponse(buildPayload { data("Hello from JVM") })
println(response.data.readText())
}
Loading