Skip to content

Commit c66a8e6

Browse files
authored
Merge pull request #69 from launchdarkly/eb/ch19217/http-tests
end-to-end tests for new SSE client [2 of 2]
2 parents 502e99b + dcb7af8 commit c66a8e6

File tree

5 files changed

+334
-19
lines changed

5 files changed

+334
-19
lines changed

lib/sse_client/sse_client.rb

+14-13
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ def initialize(uri, options = {})
1919
@uri = URI(uri)
2020
@stopped = Concurrent::AtomicBoolean.new(false)
2121

22-
@headers = options[:headers].clone || {}
22+
@headers = options[:headers] ? options[:headers].clone : {}
2323
@connect_timeout = options[:connect_timeout] || DEFAULT_CONNECT_TIMEOUT
2424
@read_timeout = options[:read_timeout] || DEFAULT_READ_TIMEOUT
2525
@logger = options[:logger] || default_logger
@@ -40,7 +40,7 @@ def initialize(uri, options = {})
4040

4141
yield self if block_given?
4242

43-
@worker = Thread.new do
43+
Thread.new do
4444
run_stream
4545
end
4646
end
@@ -59,7 +59,7 @@ def on_error(&action)
5959

6060
def close
6161
if @stopped.make_true
62-
@worker.raise ShutdownSignal.new
62+
@cxn.close if !@cxn.nil?
6363
end
6464
end
6565

@@ -73,23 +73,24 @@ def default_logger
7373

7474
def run_stream
7575
while !@stopped.value
76-
cxn = nil
76+
@cxn = nil
7777
begin
78-
cxn = connect
79-
read_stream(cxn)
80-
rescue ShutdownSignal
81-
return
78+
@cxn = connect
79+
read_stream(@cxn)
80+
rescue Errno::EBADF
81+
# don't log this - it probably means we closed our own connection deliberately
8282
rescue StandardError => e
8383
@logger.error { "Unexpected error from event source: #{e.inspect}" }
8484
@logger.debug { "Exception trace: #{e.backtrace}" }
8585
end
86-
cxn.close if !cxn.nil?
86+
@cxn.close if !cxn.nil?
8787
end
8888
end
8989

9090
# Try to establish a streaming connection. Returns the StreamingHTTPConnection object if successful.
9191
def connect
9292
loop do
93+
return if @stopped.value
9394
interval = @backoff.next_interval
9495
if interval > 0
9596
@logger.warn { "Will retry connection after #{'%.3f' % interval} seconds" }
@@ -101,10 +102,13 @@ def connect
101102
body = cxn.read_all # grab the whole response body in case it has error details
102103
cxn.close
103104
@on[:error].call({status_code: cxn.status, body: body})
105+
next
104106
elsif cxn.headers["content-type"] && cxn.headers["content-type"].start_with?("text/event-stream")
105107
return cxn # we're good to proceed
106108
end
107109
@logger.error { "Event source returned unexpected content type '#{cxn.headers["content-type"]}'" }
110+
rescue Errno::EBADF
111+
raise
108112
rescue StandardError => e
109113
@logger.error { "Unexpected error from event source: #{e.inspect}" }
110114
@logger.debug { "Exception trace: #{e.backtrace}" }
@@ -124,6 +128,7 @@ def open_connection(headers)
124128
def read_stream(cxn)
125129
event_parser = EventParser.new(cxn.read_lines)
126130
event_parser.items.each do |item|
131+
return if @stopped.value
127132
case item
128133
when SSEEvent
129134
dispatch_event(item)
@@ -154,8 +159,4 @@ def build_headers
154159
h.merge(@headers)
155160
end
156161
end
157-
158-
# Custom exception that we use to tell the worker thread to stop
159-
class ShutdownSignal < StandardError
160-
end
161162
end

lib/sse_client/streaming_http.rb

+6
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ def initialize(socket, read_timeout)
9191
@read_timeout = read_timeout
9292
@parser = HTTPTools::Parser.new
9393
@buffer = ""
94+
@done = false
9495
@lock = Mutex.new
9596

9697
# Provide callbacks for the Parser to give us the headers and body. This has to be done
@@ -102,6 +103,9 @@ def initialize(socket, read_timeout)
102103
@parser.on(:stream) do |data|
103104
@lock.synchronize { @buffer << data } # synchronize because we're called from another thread in Socketry
104105
end
106+
@parser.on(:finish) do
107+
@lock.synchronize { @done = true }
108+
end
105109

106110
# Block until the status code and headers have been successfully read.
107111
while !have_headers
@@ -132,6 +136,8 @@ def read_all
132136
# Attempt to read some more data from the socket. Return true if successful, false if EOF.
133137
# A read timeout will result in an exception from Socketry's readpartial method.
134138
def read_chunk_into_buffer
139+
# If @done is set, it means the Parser has signaled end of response body
140+
@lock.synchronize { return false if @done }
135141
data = @socket.readpartial(DEFAULT_CHUNK_SIZE, timeout: @read_timeout)
136142
return false if data == :eof
137143
@parser << data

spec/sse_client/sse_client_spec.rb

+139
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
require "spec_helper"
2+
require "socketry"
3+
require "sse_client/sse_shared"
4+
5+
#
6+
# End-to-end tests of SSEClient against a real server
7+
#
8+
describe SSE::SSEClient do
9+
subject { SSE::SSEClient }
10+
11+
def with_client(client)
12+
begin
13+
yield client
14+
ensure
15+
client.close
16+
end
17+
end
18+
19+
it "sends expected headers" do
20+
with_server do |server|
21+
requests = Queue.new
22+
server.setup_response("/") do |req,res|
23+
requests << req
24+
res.content_type = "text/event-stream"
25+
res.status = 200
26+
end
27+
28+
headers = {
29+
"Authorization" => "secret"
30+
}
31+
32+
with_client(subject.new(server.base_uri, headers: headers)) do |client|
33+
received_req = requests.pop
34+
expect(received_req.header).to eq({
35+
"accept" => ["text/event-stream"],
36+
"cache-control" => ["no-cache"],
37+
"host" => ["127.0.0.1"],
38+
"authorization" => ["secret"]
39+
})
40+
end
41+
end
42+
end
43+
44+
it "receives messages" do
45+
events_body = <<-EOT
46+
event: go
47+
data: foo
48+
id: 1
49+
50+
event: stop
51+
data: bar
52+
53+
EOT
54+
with_server do |server|
55+
server.setup_response("/") do |req,res|
56+
res.content_type = "text/event-stream"
57+
res.status = 200
58+
res.body = events_body
59+
end
60+
61+
event_sink = Queue.new
62+
client = subject.new(server.base_uri) do |c|
63+
c.on_event { |event| event_sink << event }
64+
end
65+
66+
with_client(client) do |client|
67+
expect(event_sink.pop).to eq(SSE::SSEEvent.new(:go, "foo", "1"))
68+
expect(event_sink.pop).to eq(SSE::SSEEvent.new(:stop, "bar", nil))
69+
end
70+
end
71+
end
72+
73+
it "reconnects after error response" do
74+
events_body = <<-EOT
75+
event: go
76+
data: foo
77+
78+
EOT
79+
with_server do |server|
80+
attempt = 0
81+
server.setup_response("/") do |req,res|
82+
attempt += 1
83+
if attempt == 1
84+
res.status = 500
85+
res.body = "sorry"
86+
res.keep_alive = false
87+
else
88+
res.content_type = "text/event-stream"
89+
res.status = 200
90+
res.body = events_body
91+
end
92+
end
93+
94+
event_sink = Queue.new
95+
error_sink = Queue.new
96+
client = subject.new(server.base_uri, reconnect_time: 0.25) do |c|
97+
c.on_event { |event| event_sink << event }
98+
c.on_error { |error| error_sink << error }
99+
end
100+
101+
with_client(client) do |client|
102+
expect(event_sink.pop).to eq(SSE::SSEEvent.new(:go, "foo", nil))
103+
expect(error_sink.pop).to eq({ status_code: 500, body: "sorry" })
104+
expect(attempt).to be >= 2
105+
end
106+
end
107+
end
108+
109+
it "reconnects after read timeout" do
110+
events_body = <<-EOT
111+
event: go
112+
data: foo
113+
114+
EOT
115+
with_server do |server|
116+
attempt = 0
117+
server.setup_response("/") do |req,res|
118+
attempt += 1
119+
if attempt == 1
120+
sleep(2)
121+
end
122+
res.content_type = "text/event-stream"
123+
res.status = 200
124+
res.body = events_body
125+
end
126+
127+
event_sink = Queue.new
128+
client = subject.new(server.base_uri,
129+
reconnect_time: 0.25, read_timeout: 0.25) do |c|
130+
c.on_event { |event| event_sink << event }
131+
end
132+
133+
with_client(client) do |client|
134+
expect(event_sink.pop).to eq(SSE::SSEEvent.new(:go, "foo", nil))
135+
expect(attempt).to be >= 2
136+
end
137+
end
138+
end
139+
end

spec/sse_client/sse_shared.rb

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
require "spec_helper"
2+
require "webrick"
3+
4+
class StubHTTPServer
5+
def initialize
6+
@port = 50000
7+
begin
8+
@server = WEBrick::HTTPServer.new(
9+
BindAddress: '127.0.0.1',
10+
Port: @port,
11+
AccessLog: [],
12+
Logger: NullLogger.new
13+
)
14+
rescue Errno::EADDRINUSE
15+
@port += 1
16+
retry
17+
end
18+
end
19+
20+
def start
21+
Thread.new { @server.start }
22+
end
23+
24+
def stop
25+
@server.shutdown
26+
end
27+
28+
def base_uri
29+
URI("http://127.0.0.1:#{@port}")
30+
end
31+
32+
def setup_response(uri_path, &action)
33+
@server.mount_proc(uri_path, action)
34+
end
35+
end
36+
37+
class NullLogger
38+
def method_missing(*)
39+
self
40+
end
41+
end
42+
43+
def with_server
44+
server = StubHTTPServer.new
45+
begin
46+
server.start
47+
yield server
48+
ensure
49+
server.stop
50+
end
51+
end

0 commit comments

Comments
 (0)