Skip to content

Commit 5abb2e6

Browse files
committed
Rework client handling to enable per-thread client cache.
1 parent dc1da8d commit 5abb2e6

File tree

4 files changed

+216
-96
lines changed

4 files changed

+216
-96
lines changed

lib/async/http/faraday/adapter.rb

+27-13
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
require 'async/http/client'
1717
require 'async/http/proxy'
1818

19-
require_relative 'client_cache'
19+
require_relative 'clients'
2020

2121
module Async
2222
module HTTP
@@ -71,7 +71,12 @@ def initialize(...)
7171
super
7272

7373
@timeout = @connection_options.delete(:timeout)
74-
@clients = ClientCache.new(**@connection_options)
74+
75+
if clients = @connection_options.delete(:clients)
76+
@clients = clients.call(**@connection_options, &@config_block)
77+
else
78+
@clients = PersistentClients.new(**@connection_options, &@config_block)
79+
end
7580
end
7681

7782
# Close all clients.
@@ -89,19 +94,10 @@ def close
8994
def call(env)
9095
super
9196

92-
# for compatibility with the default adapter
97+
# For compatibility with the default adapter:
9398
env.url.path = '/' if env.url.path.empty?
9499

95-
Sync do
96-
endpoint = Endpoint.new(env.url)
97-
98-
if proxy = env.request.proxy
99-
proxy_endpoint = Endpoint.new(proxy.uri)
100-
client = @clients.proxy_client_for(proxy_endpoint, endpoint)
101-
else
102-
client = @clients.client_for(endpoint)
103-
end
104-
100+
with_client(env) do |endpoint, client|
105101
if body = env.body
106102
# We need to ensure the body is wrapped in a Readable object so that it can be read in chunks:
107103
# Faraday's body only responds to `#read`.
@@ -140,6 +136,24 @@ def call(env)
140136

141137
private
142138

139+
def with_client(env)
140+
Sync do
141+
endpoint = Endpoint.new(env.url)
142+
143+
if proxy = env.request.proxy
144+
proxy_endpoint = Endpoint.new(proxy.uri)
145+
146+
@clients.with_proxied_client(proxy_endpoint, endpoint) do |client|
147+
yield endpoint, client
148+
end
149+
else
150+
@clients.with_client(endpoint) do |client|
151+
yield endpoint, client
152+
end
153+
end
154+
end
155+
end
156+
143157
def with_timeout(task: Async::Task.current)
144158
if @timeout
145159
task.with_timeout(@timeout, ::Faraday::TimeoutError) do

lib/async/http/faraday/client_cache.rb

-83
This file was deleted.

lib/async/http/faraday/clients.rb

+173
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2018-2024, by Samuel Williams.
5+
# Copyright, 2018, by Andreas Garnaes.
6+
# Copyright, 2019, by Denis Talakevich.
7+
# Copyright, 2019-2020, by Igor Sidorov.
8+
# Copyright, 2023, by Genki Takiuchi.
9+
# Copyright, 2023, by Flavio Fernandes.
10+
# Copyright, 2024, by Jacob Frautschi.
11+
12+
require 'faraday'
13+
require 'faraday/adapter'
14+
require 'kernel/sync'
15+
16+
require 'async/http/client'
17+
require 'async/http/proxy'
18+
19+
module Async
20+
module HTTP
21+
module Faraday
22+
class Clients
23+
def self.call(...)
24+
new(...)
25+
end
26+
27+
def initialize(**options, &block)
28+
@options = options
29+
@block = block
30+
end
31+
32+
def close
33+
end
34+
35+
# Make a new client for the given endpoint.
36+
#
37+
# @parameter endpoint [IO::Endpoint::Generic] The endpoint to create the client for.
38+
def make_client(endpoint)
39+
client = Client.new(endpoint, **@options)
40+
@block&.call(client)
41+
return client
42+
end
43+
44+
# Get a client for the given endpoint. If a client already exists for the host, it will be reused.
45+
#
46+
# @parameter endpoint [IO::Endpoint::Generic] The endpoint to get the client for.
47+
def with_client(endpoint)
48+
client = make_client(endpoint)
49+
50+
yield client
51+
ensure
52+
client&.close
53+
end
54+
55+
# Get a client for the given proxy endpoint and endpoint. If a client already exists for the host, it will be reused.
56+
#
57+
# @parameter proxy_endpoint [IO::Endpoint::Generic] The proxy endpoint to use.
58+
# @parameter endpoint [IO::Endpoint::Generic] The endpoint to get the client for.
59+
def with_proxied_client(proxy_endpoint, endpoint)
60+
client = client_for(proxy_endpoint)
61+
proxied_client = client.proxied_client(endpoint)
62+
63+
yield proxied_client
64+
ensure
65+
proxied_client&.close
66+
client&.close
67+
end
68+
end
69+
70+
class PersistentClients < Clients
71+
def initialize(...)
72+
super
73+
74+
@clients = {}
75+
end
76+
77+
def close
78+
super
79+
80+
clients = @clients.values
81+
@clients.clear
82+
83+
clients.each(&:close)
84+
end
85+
86+
# Get the host key for the given endpoint.
87+
#
88+
# This is used to cache clients for the same host.
89+
#
90+
# @parameter endpoint [IO::Endpoint::Generic] The endpoint to get the host key for.
91+
def host_key(endpoint)
92+
url = endpoint.url.dup
93+
94+
url.path = ""
95+
url.fragment = nil
96+
url.query = nil
97+
98+
return url
99+
end
100+
101+
# Get a client for the given endpoint. If a client already exists for the host, it will be reused.
102+
#
103+
# @parameter endpoint [IO::Endpoint::Generic] The endpoint to get the client for.
104+
def client_for(endpoint)
105+
key = host_key(endpoint)
106+
107+
fetch(key) do
108+
make_client
109+
end
110+
end
111+
112+
def with_client(endpoint)
113+
yield make_client(endpoint)
114+
end
115+
116+
# Get a client for the given proxy endpoint and endpoint. If a client already exists for the host, it will be reused.
117+
#
118+
# @parameter proxy_endpoint [IO::Endpoint::Generic] The proxy endpoint to use.
119+
# @parameter endpoint [IO::Endpoint::Generic] The endpoint to get the client for.
120+
def with_proxied_client(proxy_endpoint, endpoint)
121+
key = [host_key(proxy_endpoint), host_key(endpoint)]
122+
123+
proxied_client = fetch(key) do
124+
client_for(proxy_endpoint).proxied_client(endpoint)
125+
end
126+
127+
yield proxied_client
128+
end
129+
130+
protected
131+
132+
def fetch(key)
133+
@clients.fetch(key) do
134+
@clients[key] = yield
135+
end
136+
end
137+
end
138+
139+
class PerThreadPersistentClients
140+
def initialize(**options, &block)
141+
@options = options
142+
@block = block
143+
144+
@key = :"#{self.class}_#{object_id}"
145+
end
146+
147+
def with_client(endpoint, &block)
148+
clients.with_client(endpoint, &block)
149+
end
150+
151+
def with_proxied_client(proxy_endpoint, endpoint, &block)
152+
clients.with_proxied_client(proxy_endpoint, endpoint, &block)
153+
end
154+
155+
def close
156+
Thread.list.each do |thread|
157+
if clients = thread[@key]
158+
clients.close
159+
160+
thread[@key] = nil
161+
end
162+
end
163+
end
164+
165+
protected
166+
167+
def clients
168+
Thread.current[@key] ||= PersistentClients.new(**@options, &@block)
169+
end
170+
end
171+
end
172+
end
173+
end

test/async/http/faraday/adapter.rb

+16
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,22 @@ def get_response(url = bound_url, path = '/index', adapter_options: {})
139139
expect(response.body).to be == File.read(__FILE__, 128)
140140
end
141141
end
142+
143+
with "a config block" do
144+
it "invokes the config block" do
145+
config_block_invoked = false
146+
147+
adapter = Faraday.new do |builder|
148+
builder.adapter :async_http do |client|
149+
config_block_invoked = true
150+
end
151+
end
152+
153+
adapter.get(bound_url)
154+
155+
expect(config_block_invoked).to be == true
156+
end
157+
end
142158
end
143159

144160
with "a remote http server" do

0 commit comments

Comments
 (0)