Skip to content

async/await and AsyncSequence Prototype #44

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 25 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
36856c9
async/await prototype
dnadoba Apr 18, 2021
61bbfc1
use @available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *)
dnadoba Apr 18, 2021
8af659c
concurrency features are now enabled by default and do no longer need…
dnadoba Apr 18, 2021
cc1ef4b
use YieldingContinuation and UnsafeContinuation instead of promises
dnadoba Apr 18, 2021
f8295e8
use buffered continuation
dnadoba Apr 18, 2021
dd5d64c
use `CheckedContinuation` instead of `UnsafeContinuation`
dnadoba Apr 20, 2021
2f04254
Merge branch 'main'
dnadoba Jul 13, 2021
b436abf
fix merge conflicts
dnadoba Jul 13, 2021
178bd54
remove unsafe flags because they are no longer needed
dnadoba Jul 13, 2021
baa4ff3
run tests on Xcode beta too
dnadoba Jul 13, 2021
f105267
remove unsafe flags
dnadoba Jul 13, 2021
5b21af9
use maxim-lobanov/setup-xcode@v1 to select Xcode version
dnadoba Jul 13, 2021
36160ed
use macOS 11
dnadoba Jul 14, 2021
4a71071
use new AsyncThrowingStream instead of YieldingContinuation
dnadoba Jul 14, 2021
2ccd242
remove unsafe flags
dnadoba Jul 15, 2021
603353e
implement requestChannel
dnadoba Jul 20, 2021
affca72
split into multiple files
dnadoba Jul 26, 2021
34f7042
async/await responder prototype
dnadoba Jul 26, 2021
1f52068
Merge remote-tracking branch 'refs/remotes/origin/main'
dnadoba Jul 26, 2021
bf8d21f
fix merge conflict
dnadoba Jul 26, 2021
2c800d8
use SwiftPM directly with Xcode 13
dnadoba Jul 26, 2021
a5d9b4a
use `swift test` with Xcode 13
dnadoba Jul 26, 2021
9524acf
new requester API for ReactiveSwift
dnadoba Aug 4, 2021
0b0131b
Merge branch 'requester-api'
dnadoba Aug 4, 2021
66f4fab
implement new requester API for async/await
dnadoba Aug 4, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions .github/workflows/swift.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ on:
jobs:
test-on-macOS-and-iOS:
runs-on: macos-11

steps:
- uses: actions/checkout@v2

- uses: maxim-lobanov/setup-xcode@v1
with:
xcode-version: 12.5.1
- name: Test on iOS Simulator
run: >
xcodebuild test
Expand All @@ -31,12 +32,29 @@ jobs:
-skip-testing:RSocketCorePerformanceTests
-parallel-testing-enabled
-destination 'platform=macOS'

test-on-macOS-with-Xcode-13-Beta:
runs-on: macos-11
steps:
- uses: actions/checkout@v2
- uses: maxim-lobanov/setup-xcode@v1
with:
xcode-version: 13.0

- name: Test on macOS
run: swift test

performance-tests-on-macOS:
runs-on: macos-11
strategy:
matrix:
xcode: ['12.5.1', '13.0']

steps:
- uses: actions/checkout@v2
- uses: maxim-lobanov/setup-xcode@v1
with:
xcode-version: ${{ matrix.xcode }}
- name: Build & Run Performance Tests on macOS
run: >
swift test
Expand Down
20 changes: 20 additions & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ let package = Package(
// Transport protocol
.library(name: "RSocketWSTransport", targets: ["RSocketWSTransport"]),
.library(name: "RSocketTCPTransport", targets: ["RSocketTCPTransport"]),
.library(name: "RSocketAsync", targets: ["RSocketAsync"]),

// Examples
.executable(name: "timer-client-example", targets: ["TimerClientExample"]),
.executable(name: "twitter-client-example", targets: ["TwitterClientExample"]),
.executable(name: "vanilla-client-example", targets: ["VanillaClientExample"]),
.executable(name: "async-twitter-client-example", targets: ["AsyncTwitterClientExample"]),
],
dependencies: [
.package(url: "https://github.com/ReactiveCocoa/ReactiveSwift.git", from: "6.6.0"),
Expand All @@ -46,6 +48,11 @@ let package = Package(
"RSocketCore",
.product(name: "ReactiveSwift", package: "ReactiveSwift")
]),
.target(name: "RSocketAsync", dependencies: [
"RSocketCore",
.product(name: "NIO", package: "swift-nio"),
.product(name: "_NIOConcurrency", package: "swift-nio"),
]),

// Channel
.target(name: "RSocketTSChannel", dependencies: [
Expand Down Expand Up @@ -135,6 +142,19 @@ let package = Package(
],
path: "Sources/Examples/VanillaClient"
),
.executableTarget(
name: "AsyncTwitterClientExample",
dependencies: [
"RSocketCore",
"RSocketNIOChannel",
"RSocketWSTransport",
"RSocketAsync",
.product(name: "ArgumentParser", package: "swift-argument-parser"),
.product(name: "NIO", package: "swift-nio"),
.product(name: "_NIOConcurrency", package: "swift-nio"),
],
path: "Sources/Examples/AsyncTwitterClient"
),
],
swiftLanguageVersions: [.v5]
)
82 changes: 82 additions & 0 deletions Sources/Examples/AsyncTwitterClient/main.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#if compiler(>=5.5)
import ArgumentParser
import Foundation
import NIO
import RSocketAsync
import RSocketCore
import RSocketNIOChannel
import RSocketWSTransport

struct Tweet: Decodable {
struct User: Decodable {
let screen_name, name: String
let followers_count: Int
}
let user: User
let text: String
let reply_count, retweet_count, favorite_count: Int
}

extension URL: ExpressibleByArgument {
public init?(argument: String) {
guard let url = URL(string: argument) else { return nil }
self = url
}
public var defaultValueDescription: String { description }
}

/// the server-side code can be found here -> https://github.com/rsocket/rsocket-demo/tree/master/src/main/kotlin/io/rsocket/demo/twitter
@available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *)
struct TwitterClientExample: ParsableCommand {
static var configuration = CommandConfiguration(
abstract: "connects to an RSocket endpoint using WebSocket transport, requests a stream at the route `searchTweets` to search for tweets that match the `searchString` and logs all events."
)

@Argument(help: "used to find tweets that match the given string")
var searchString = "swift"

@Option
var url = URL(string: "wss://demo.rsocket.io/rsocket")!

@Option(help: "maximum number of tweets that are taken before it cancels the stream")
var limit = 1000

func run() throws {
let eventLoop = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { try! eventLoop.syncShutdownGracefully() }
let promise = eventLoop.next().makePromise(of: Void.self)
promise.completeWithAsync {
try await self.runAsync()
}
try promise.futureResult.wait()
}
func runAsync() async throws {
let bootstrap = ClientBootstrap(
transport: WSTransport(),
config: .mobileToServer
.set(\.encoding.metadata, to: .messageXRSocketRoutingV0)
.set(\.encoding.data, to: .applicationJson),
timeout: .seconds(30)
)
let client = try await bootstrap.connect(to: .init(url: url), payload: .empty)

let stream = try client.requester(
RequestStream {
Encoder()
.encodeStaticMetadata("searchTweets", using: .routing)
.mapData { (string: String) in Data(string.utf8) }
Decoder()
.decodeData(using: JSONDataDecoder(type: Tweet.self))
},
request: searchString
)

for try await tweet in stream.prefix(limit) {
dump(tweet)
}
}
}
if #available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *) {
TwitterClientExample.main()
}
#endif
20 changes: 6 additions & 14 deletions Sources/Examples/TimerClient/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,6 @@ import RSocketNIOChannel
import RSocketReactiveSwift
import RSocketWSTransport

func route(_ route: String) -> Data {
let encodedRoute = Data(route.utf8)
precondition(encodedRoute.count <= Int(UInt8.max), "route is to long to be encoded")
let encodedRouteLength = Data([UInt8(encodedRoute.count)])

return encodedRouteLength + encodedRoute
}

extension URL: ExpressibleByArgument {
public init?(argument: String) {
guard let url = URL(string: argument) else { return nil }
Expand Down Expand Up @@ -43,12 +35,12 @@ struct TimerClientExample: ParsableCommand {
)

let client = try bootstrap.connect(to: .init(url: url)).first()!.get()

try client.requester.requestStream(payload: Payload(
metadata: route("timer"),
data: Data()
))
.map() { String.init(decoding: $0.data, as: UTF8.self) }
try client.requester(RequestStream {
Encoder()
.encodeStaticMetadata("timer", using: RoutingEncoder())
Decoder()
.mapData { String(decoding: $0, as: UTF8.self) }
}, request: Data())
.logEvents(identifier: "route.timer")
.take(first: limit)
.wait()
Expand Down
35 changes: 15 additions & 20 deletions Sources/Examples/TwitterClient/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,6 @@ import RSocketNIOChannel
import RSocketReactiveSwift
import RSocketWSTransport

func route(_ route: String) -> Data {
let encodedRoute = Data(route.utf8)
precondition(encodedRoute.count <= Int(UInt8.max), "route is to long to be encoded")
let encodedRouteLength = Data([UInt8(encodedRoute.count)])

return encodedRouteLength + encodedRoute
}

extension URL: ExpressibleByArgument {
public init?(argument: String) {
guard let url = URL(string: argument) else { return nil }
Expand All @@ -40,23 +32,26 @@ struct TwitterClientExample: ParsableCommand {
func run() throws {
let bootstrap = ClientBootstrap(
transport: WSTransport(),
config: ClientConfiguration.mobileToServer
config: .mobileToServer
.set(\.encoding.metadata, to: .messageXRSocketRoutingV0)
.set(\.encoding.data, to: .applicationJson)
)

let client = try bootstrap.connect(to: .init(url: url)).first()!.get()

try client.requester.requestStream(payload: Payload(
metadata: route("searchTweets"),
data: Data(searchString.utf8)
))
.attemptMap { payload -> String in
// pretty print json
let json = try JSONSerialization.jsonObject(with: payload.data, options: [])
let data = try JSONSerialization.data(withJSONObject: json, options: [.prettyPrinted])
return String(decoding: data, as: UTF8.self)
}
try client.requester(RequestStream {
Encoder()
.encodeStaticMetadata("searchTweets", using: RoutingEncoder())
.mapData { (string: String) in
Data(string.utf8)
}
Decoder()
.mapData { data -> String in
// pretty print json
let json = try JSONSerialization.jsonObject(with: data, options: [])
let data = try JSONSerialization.data(withJSONObject: json, options: [.prettyPrinted])
return String(decoding: data, as: UTF8.self)
}
}, request: searchString)
.logEvents(identifier: "route.searchTweets")
.take(first: limit)
.wait()
Expand Down
7 changes: 5 additions & 2 deletions Sources/Examples/VanillaClient/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ struct VanillaClientExample: ParsableCommand {

let client = try bootstrap.connect(to: .init(host: host, port: port)).first()!.get()

let streamProducer = client.requester.requestStream(payload: .empty)
let requestProducer = client.requester.requestResponse(payload: Payload(data: Data("HelloWorld".utf8)))
let streamProducer = client.requester(
RequestStream(),
request: Data()
)
let requestProducer = client.requester(RequestResponse(), request: Data("HelloWorld".utf8))

streamProducer.logEvents(identifier: "stream1").take(first: 1).start()
streamProducer.logEvents(identifier: "stream3").take(first: 10).start()
Expand Down
38 changes: 38 additions & 0 deletions Sources/RSocketAsync/Client.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2015-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#if compiler(>=5.5)
import RSocketCore

@available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *)
public struct AsyncClient {
private let coreClient: RSocketCore.CoreClient

public var requester: RequesterRSocket { RequesterRSocket(requester: coreClient.requester) }

public init(_ coreClient: RSocketCore.CoreClient) {
self.coreClient = coreClient
}
}

@available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *)
extension RSocketCore.ClientBootstrap where Client == CoreClient, Responder == RSocketCore.RSocket {
public func connect(to endpoint: Transport.Endpoint, payload: Payload) async throws -> AsyncClient {
AsyncClient(try await connect(to: endpoint, payload: payload, responder: nil).get())
}
}

#endif
34 changes: 34 additions & 0 deletions Sources/RSocketAsync/RSocket.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2015-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#if compiler(>=5.5)
import Foundation
import RSocketCore

@available(macOS 9999, iOS 9999, watchOS 9999, tvOS 9999, *)
public protocol RSocket {
func metadataPush(metadata: Data)
func fireAndForget(payload: Payload)
func requestResponse(payload: Payload) async throws -> Payload
func requestStream(payload: Payload) -> AsyncThrowingStream<Payload, Swift.Error>
func requestChannel<PayloadSequence>(
initialPayload: Payload,
payloadStream: PayloadSequence?
) -> AsyncThrowingStream<Payload, Swift.Error>
where PayloadSequence: AsyncSequence, PayloadSequence.Element == Payload
}

#endif
Loading