diff --git a/.circleci/config.yml b/.circleci/config.yml index c94d18e4..df9dac51 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -8,7 +8,8 @@ workflows: - test-2.2 - test-2.3 - test-2.4 - - test-jruby-9.1 + - test-2.5 + - test-jruby-9.2 ruby-docker-template: &ruby-docker-template steps: @@ -17,6 +18,7 @@ ruby-docker-template: &ruby-docker-template if [[ $CIRCLE_JOB == test-jruby* ]]; then gem install jruby-openssl; # required by bundler, no effect on Ruby MRI fi + - run: ruby -v - run: gem install bundler - run: bundle install - run: mkdir ./rspec @@ -40,9 +42,14 @@ jobs: test-2.4: <<: *ruby-docker-template docker: - - image: circleci/ruby:2.4.3-jessie + - image: circleci/ruby:2.4.4-stretch - image: redis - test-jruby-9.1: + test-2.5: + <<: *ruby-docker-template + docker: + - image: circleci/ruby:2.5.1-stretch + - image: redis + test-jruby-9.2: <<: *ruby-docker-template docker: - image: circleci/jruby:9-jdk @@ -54,7 +61,7 @@ jobs: machine: image: circleci/classic:latest environment: - - RUBIES: "ruby-2.1.9 ruby-2.0.0 ruby-1.9.3 jruby-9.0.5.0" + - RUBIES: "jruby-9.1.17.0" steps: - run: sudo apt-get -q update - run: sudo apt-get -qy install redis-server diff --git a/README.md b/README.md index 69908087..1790b2d4 100644 --- a/README.md +++ b/README.md @@ -7,15 +7,19 @@ LaunchDarkly SDK for Ruby [![Test Coverage](https://codeclimate.com/github/launchdarkly/ruby-client/badges/coverage.svg)](https://codeclimate.com/github/launchdarkly/ruby-client/coverage) [![security](https://hakiri.io/github/launchdarkly/ruby-client/master.svg)](https://hakiri.io/github/launchdarkly/ruby-client/master) +Supported Ruby versions +----------------------- + +This version of the LaunchDarkly SDK has a minimum Ruby version of 2.2.6, or 9.1.6 for JRuby. + Quick setup ----------- 0. Install the Ruby SDK with `gem` ```shell -gem install ldclient-rb --prerelease +gem install ldclient-rb ``` -Note: The `--prerelease` flag is there to satisfy the dependency of celluloid 0.18pre which we have tested extensively and have found stable in our use case. Unfortunately, the upstream provider has not promoted this version to stable yet. See [here](https://github.com/celluloid/celluloid/issues/762) This is not required for use in a Gemfile. 1. Require the LaunchDarkly client: @@ -79,7 +83,7 @@ Note that this gem will automatically switch to using the Rails logger it is det HTTPS proxy ------------ -The Ruby SDK uses Faraday to handle all of its network traffic. Faraday provides built-in support for the use of an HTTPS proxy. If the HTTPS_PROXY environment variable is present then the SDK will proxy all network requests through the URL provided. +The Ruby SDK uses Faraday and Socketry to handle its network traffic. Both of these provide built-in support for the use of an HTTPS proxy. If the HTTPS_PROXY environment variable is present then the SDK will proxy all network requests through the URL provided. How to set the HTTPS_PROXY environment variable on Mac/Linux systems: ``` diff --git a/ldclient-rb.gemspec b/ldclient-rb.gemspec index 4c6e8eeb..a9bbfb23 100644 --- a/ldclient-rb.gemspec +++ b/ldclient-rb.gemspec @@ -26,36 +26,18 @@ Gem::Specification.new do |spec| spec.add_development_dependency "codeclimate-test-reporter", "~> 0" spec.add_development_dependency "redis", "~> 3.3.5" spec.add_development_dependency "connection_pool", ">= 2.1.2" - if RUBY_VERSION >= "2.0.0" - spec.add_development_dependency "rake", "~> 10.0" - spec.add_development_dependency "rspec_junit_formatter", "~> 0.3.0" - else - spec.add_development_dependency "rake", "12.1.0" - # higher versions of rake fail to install in JRuby 1.7 - end + spec.add_development_dependency "rake", "~> 10.0" + spec.add_development_dependency "rspec_junit_formatter", "~> 0.3.0" spec.add_development_dependency "timecop", "~> 0.9.1" spec.add_runtime_dependency "json", [">= 1.8", "< 3"] - if RUBY_VERSION >= "2.1.0" - spec.add_runtime_dependency "faraday", [">= 0.9", "< 2"] - spec.add_runtime_dependency "faraday-http-cache", [">= 1.3.0", "< 3"] - else - spec.add_runtime_dependency "faraday", [">= 0.9", "< 0.14.0"] - spec.add_runtime_dependency "faraday-http-cache", [">= 1.3.0", "< 2"] - end + spec.add_runtime_dependency "faraday", [">= 0.9", "< 2"] + spec.add_runtime_dependency "faraday-http-cache", [">= 1.3.0", "< 3"] spec.add_runtime_dependency "semantic", "~> 1.6.0" spec.add_runtime_dependency "thread_safe", "~> 0.3" spec.add_runtime_dependency "net-http-persistent", "~> 2.9" spec.add_runtime_dependency "concurrent-ruby", "~> 1.0.4" spec.add_runtime_dependency "hashdiff", "~> 0.2" - spec.add_runtime_dependency "ld-celluloid-eventsource", "~> 0.11.0" - spec.add_runtime_dependency "celluloid", "~> 0.18.0.pre" # transitive dep; specified here for more control - - if RUBY_VERSION >= "2.2.2" - spec.add_runtime_dependency "nio4r", "< 3" # for maximum ruby version compatibility. - else - spec.add_runtime_dependency "nio4r", "~> 1.1" # for maximum ruby version compatibility. - end - - spec.add_runtime_dependency "waitutil", "0.2" + spec.add_runtime_dependency "http_tools", '~> 0.4.5' + spec.add_runtime_dependency "socketry", "~> 0.5.1" end diff --git a/lib/ldclient-rb.rb b/lib/ldclient-rb.rb index 541cf4d7..ce9d0307 100644 --- a/lib/ldclient-rb.rb +++ b/lib/ldclient-rb.rb @@ -1,4 +1,5 @@ require "ldclient-rb/version" +require "ldclient-rb/util" require "ldclient-rb/evaluation" require "ldclient-rb/ldclient" require "ldclient-rb/cache_store" diff --git a/lib/ldclient-rb/events.rb b/lib/ldclient-rb/events.rb index 96db3f46..0c9a0ece 100644 --- a/lib/ldclient-rb/events.rb +++ b/lib/ldclient-rb/events.rb @@ -222,17 +222,24 @@ def trigger_flush(buffer, flush_workers) if !payload.events.empty? || !payload.summary.counters.empty? # If all available worker threads are busy, success will be false and no job will be queued. success = flush_workers.post do - resp = EventPayloadSendTask.new.run(@sdk_key, @config, @client, payload, @formatter) - handle_response(resp) if !resp.nil? + begin + resp = EventPayloadSendTask.new.run(@sdk_key, @config, @client, payload, @formatter) + handle_response(resp) if !resp.nil? + rescue => e + @config.logger.warn { "[LDClient] Unexpected error in event processor: #{e.inspect}. \nTrace: #{e.backtrace}" } + end end buffer.clear if success # Reset our internal state, these events now belong to the flush worker end end def handle_response(res) - if res.status == 401 - @config.logger.error { "[LDClient] Received 401 error, no further events will be posted since SDK key is invalid" } - @disabled.value = true + if res.status >= 400 + message = Util.http_error_message(res.status, "event delivery", "some events were dropped") + @config.logger.error { "[LDClient] #{message}" } + if !Util.http_error_recoverable?(res.status) + @disabled.value = true + end else if !res.headers.nil? && res.headers.has_key?("Date") begin @@ -309,8 +316,7 @@ def run(sdk_key, config, client, payload, formatter) next end if res.status < 200 || res.status >= 300 - config.logger.error { "[LDClient] Unexpected status code while processing events: #{res.status}" } - if res.status >= 500 + if Util.http_error_recoverable?(res.status) next end end diff --git a/lib/ldclient-rb/ldclient.rb b/lib/ldclient-rb/ldclient.rb index ece7c4ec..5c0e872d 100644 --- a/lib/ldclient-rb/ldclient.rb +++ b/lib/ldclient-rb/ldclient.rb @@ -1,7 +1,7 @@ +require "concurrent/atomics" require "digest/sha1" require "logger" require "benchmark" -require "waitutil" require "json" require "openssl" @@ -41,7 +41,9 @@ def initialize(sdk_key, config = Config.default, wait_for_sec = 5) requestor = Requestor.new(sdk_key, config) - if !@config.offline? + if @config.offline? + @update_processor = NullUpdateProcessor.new + else if @config.update_processor.nil? if @config.stream? @update_processor = StreamProcessor.new(sdk_key, config, requestor) @@ -53,16 +55,15 @@ def initialize(sdk_key, config = Config.default, wait_for_sec = 5) else @update_processor = @config.update_processor end - @update_processor.start end - if !@config.offline? && wait_for_sec > 0 - begin - WaitUtil.wait_for_condition("LaunchDarkly client initialization", timeout_sec: wait_for_sec, delay_sec: 0.1) do - initialized? - end - rescue WaitUtil::TimeoutError + ready = @update_processor.start + if wait_for_sec > 0 + ok = ready.wait(wait_for_sec) + if !ok @config.logger.error { "[LDClient] Timeout encountered waiting for LaunchDarkly client initialization" } + elsif !@update_processor.initialized? + @config.logger.error { "[LDClient] LaunchDarkly client initialization failed" } end end end @@ -220,9 +221,7 @@ def all_flags(user) # @return [void] def close @config.logger.info { "[LDClient] Closing LaunchDarkly client..." } - if not @config.offline? - @update_processor.stop - end + @update_processor.stop @event_processor.stop @store.stop end @@ -255,4 +254,22 @@ def make_feature_event(flag, user, variation, value, default) private :evaluate, :log_exception, :sanitize_user, :make_feature_event end + + # + # Used internally when the client is offline. + # + class NullUpdateProcessor + def start + e = Concurrent::Event.new + e.set + e + end + + def initialized? + true + end + + def stop + end + end end diff --git a/lib/ldclient-rb/polling.rb b/lib/ldclient-rb/polling.rb index cc391bca..15965201 100644 --- a/lib/ldclient-rb/polling.rb +++ b/lib/ldclient-rb/polling.rb @@ -9,6 +9,7 @@ def initialize(config, requestor) @initialized = Concurrent::AtomicBoolean.new(false) @started = Concurrent::AtomicBoolean.new(false) @stopped = Concurrent::AtomicBoolean.new(false) + @ready = Concurrent::Event.new end def initialized? @@ -16,9 +17,10 @@ def initialized? end def start - return unless @started.make_true + return @ready unless @started.make_true @config.logger.info { "[LDClient] Initializing polling connection" } create_worker + @ready end def stop @@ -39,6 +41,7 @@ def poll }) if @initialized.make_true @config.logger.info { "[LDClient] Polling connection initialized" } + @ready.set end end end @@ -47,20 +50,24 @@ def create_worker @worker = Thread.new do @config.logger.debug { "[LDClient] Starting polling worker" } while !@stopped.value do + started_at = Time.now begin - started_at = Time.now poll - delta = @config.poll_interval - (Time.now - started_at) - if delta > 0 - sleep(delta) + rescue UnexpectedResponseError => e + message = Util.http_error_message(e.status, "polling request", "will retry") + @config.logger.error { "[LDClient] #{message}" }; + if !Util.http_error_recoverable?(e.status) + @ready.set # if client was waiting on us, make it stop waiting - has no effect if already set + stop end - rescue InvalidSDKKeyError - @config.logger.error { "[LDClient] Received 401 error, no further polling requests will be made since SDK key is invalid" }; - stop rescue StandardError => exn @config.logger.error { "[LDClient] Exception while polling: #{exn.inspect}" } # TODO: log_exception(__method__.to_s, exn) end + delta = @config.poll_interval - (Time.now - started_at) + if delta > 0 + sleep(delta) + end end end end diff --git a/lib/ldclient-rb/requestor.rb b/lib/ldclient-rb/requestor.rb index abaab854..25cce121 100644 --- a/lib/ldclient-rb/requestor.rb +++ b/lib/ldclient-rb/requestor.rb @@ -4,7 +4,14 @@ module LaunchDarkly - class InvalidSDKKeyError < StandardError + class UnexpectedResponseError < StandardError + def initialize(status) + @status = status + end + + def status + @status + end end class Requestor @@ -13,7 +20,7 @@ def initialize(sdk_key, config) @config = config @client = Faraday.new do |builder| builder.use :http_cache, store: @config.cache_store - + builder.adapter :net_http_persistent end end @@ -44,19 +51,8 @@ def make_request(path) @config.logger.debug { "[LDClient] Got response from uri: #{uri}\n\tstatus code: #{res.status}\n\theaders: #{res.headers}\n\tbody: #{res.body}" } - if res.status == 401 - @config.logger.error { "[LDClient] Invalid SDK key" } - raise InvalidSDKKeyError - end - - if res.status == 404 - @config.logger.error { "[LDClient] Resource not found" } - return nil - end - if res.status < 200 || res.status >= 300 - @config.logger.error { "[LDClient] Unexpected status code #{res.status}" } - return nil + raise UnexpectedResponseError.new(res.status) end JSON.parse(res.body, symbolize_names: true) diff --git a/lib/ldclient-rb/stream.rb b/lib/ldclient-rb/stream.rb index 4ec1052a..2151e945 100644 --- a/lib/ldclient-rb/stream.rb +++ b/lib/ldclient-rb/stream.rb @@ -1,6 +1,6 @@ require "concurrent/atomics" require "json" -require "celluloid/eventsource" +require "sse_client" module LaunchDarkly PUT = :put @@ -24,6 +24,7 @@ def initialize(sdk_key, config, requestor) @initialized = Concurrent::AtomicBoolean.new(false) @started = Concurrent::AtomicBoolean.new(false) @stopped = Concurrent::AtomicBoolean.new(false) + @ready = Concurrent::Event.new end def initialized? @@ -31,37 +32,34 @@ def initialized? end def start - return unless @started.make_true + return @ready unless @started.make_true @config.logger.info { "[LDClient] Initializing stream connection" } - headers = - { + headers = { 'Authorization' => @sdk_key, 'User-Agent' => 'RubyClient/' + LaunchDarkly::VERSION } - opts = {:headers => headers, :with_credentials => true, :proxy => @config.proxy, :read_timeout => READ_TIMEOUT_SECONDS} - @es = Celluloid::EventSource.new(@config.stream_uri + "/all", opts) do |conn| - conn.on(PUT) { |message| process_message(message, PUT) } - conn.on(PATCH) { |message| process_message(message, PATCH) } - conn.on(DELETE) { |message| process_message(message, DELETE) } - conn.on(INDIRECT_PUT) { |message| process_message(message, INDIRECT_PUT) } - conn.on(INDIRECT_PATCH) { |message| process_message(message, INDIRECT_PATCH) } + opts = { + headers: headers, + proxy: @config.proxy, + read_timeout: READ_TIMEOUT_SECONDS, + logger: @config.logger + } + @es = SSE::SSEClient.new(@config.stream_uri + "/all", opts) do |conn| + conn.on_event { |event| process_message(event, event.type) } conn.on_error { |err| - @config.logger.error { "[LDClient] Unexpected status code #{err[:status_code]} from streaming connection" } - if err[:status_code] == 401 - @config.logger.error { "[LDClient] Received 401 error, no further streaming connection will be made since SDK key is invalid" } + status = err[:status_code] + message = Util.http_error_message(status, "streaming connection", "will retry") + @config.logger.error { "[LDClient] #{message}" } + if !Util.http_error_recoverable?(status) + @ready.set # if client was waiting on us, make it stop waiting - has no effect if already set stop end } end - end - - def stop - if @stopped.make_true - @es.close - @config.logger.info { "[LDClient] Stream connection stopped" } - end + + @ready end def stop @@ -83,21 +81,22 @@ def process_message(message, method) }) @initialized.make_true @config.logger.info { "[LDClient] Stream initialized" } + @ready.set elsif method == PATCH - message = JSON.parse(message.data, symbolize_names: true) + data = JSON.parse(message.data, symbolize_names: true) for kind in [FEATURES, SEGMENTS] - key = key_for_path(kind, message[:path]) + key = key_for_path(kind, data[:path]) if key - @feature_store.upsert(kind, message[:data]) + @feature_store.upsert(kind, data[:data]) break end end elsif method == DELETE - message = JSON.parse(message.data, symbolize_names: true) + data = JSON.parse(message.data, symbolize_names: true) for kind in [FEATURES, SEGMENTS] - key = key_for_path(kind, message[:path]) + key = key_for_path(kind, data[:path]) if key - @feature_store.delete(kind, key, message[:version]) + @feature_store.delete(kind, key, data[:version]) break end end diff --git a/lib/ldclient-rb/user_filter.rb b/lib/ldclient-rb/user_filter.rb index 9f4bce82..449d8d2e 100644 --- a/lib/ldclient-rb/user_filter.rb +++ b/lib/ldclient-rb/user_filter.rb @@ -1,4 +1,5 @@ require "json" +require "set" module LaunchDarkly class UserFilter diff --git a/lib/ldclient-rb/util.rb b/lib/ldclient-rb/util.rb new file mode 100644 index 00000000..6ba70dbc --- /dev/null +++ b/lib/ldclient-rb/util.rb @@ -0,0 +1,18 @@ + +module LaunchDarkly + module Util + def self.http_error_recoverable?(status) + if status >= 400 && status < 500 + status == 400 || status == 408 || status == 429 + else + true + end + end + + def self.http_error_message(status, context, recoverable_message) + desc = (status == 401 || status == 403) ? " (invalid SDK key)" : "" + message = Util.http_error_recoverable?(status) ? recoverable_message : "giving up permanently" + "HTTP error #{status}#{desc} for #{context} - #{message}" + end + end +end diff --git a/lib/sse_client.rb b/lib/sse_client.rb new file mode 100644 index 00000000..dd24c3a6 --- /dev/null +++ b/lib/sse_client.rb @@ -0,0 +1,4 @@ +require "sse_client/streaming_http" +require "sse_client/sse_events" +require "sse_client/backoff" +require "sse_client/sse_client" diff --git a/lib/sse_client/backoff.rb b/lib/sse_client/backoff.rb new file mode 100644 index 00000000..73e0754f --- /dev/null +++ b/lib/sse_client/backoff.rb @@ -0,0 +1,38 @@ + +module SSE + # + # A simple backoff algorithm that can be reset at any time, or reset itself after a given + # interval has passed without errors. + # + class Backoff + def initialize(base_interval, max_interval, auto_reset_interval = 60) + @base_interval = base_interval + @max_interval = max_interval + @auto_reset_interval = auto_reset_interval + @attempts = 0 + @last_good_time = nil + @jitter_rand = Random.new + end + + attr_accessor :base_interval + + def next_interval + if !@last_good_time.nil? && (Time.now.to_i - @last_good_time) >= @auto_reset_interval + @attempts = 0 + end + @last_good_time = nil + if @attempts == 0 + @attempts += 1 + return 0 + end + @last_good_time = nil + target = ([@base_interval * (2 ** @attempts), @max_interval].min).to_f + @attempts += 1 + (target / 2) + @jitter_rand.rand(target / 2) + end + + def mark_success + @last_good_time = Time.now.to_i if @last_good_time.nil? + end + end +end diff --git a/lib/sse_client/sse_client.rb b/lib/sse_client/sse_client.rb new file mode 100644 index 00000000..e31b8607 --- /dev/null +++ b/lib/sse_client/sse_client.rb @@ -0,0 +1,162 @@ +require "concurrent/atomics" +require "logger" +require "thread" +require "uri" + +module SSE + # + # A lightweight Server-Sent Events implementation, relying on two gems: socketry for sockets with + # read timeouts, and http_tools for HTTP response parsing. The overall logic is based on + # [https://github.com/Tonkpils/celluloid-eventsource]. + # + class SSEClient + DEFAULT_CONNECT_TIMEOUT = 10 + DEFAULT_READ_TIMEOUT = 300 + DEFAULT_RECONNECT_TIME = 1 + MAX_RECONNECT_TIME = 30 + + def initialize(uri, options = {}) + @uri = URI(uri) + @stopped = Concurrent::AtomicBoolean.new(false) + + @headers = options[:headers] ? options[:headers].clone : {} + @connect_timeout = options[:connect_timeout] || DEFAULT_CONNECT_TIMEOUT + @read_timeout = options[:read_timeout] || DEFAULT_READ_TIMEOUT + @logger = options[:logger] || default_logger + + if options[:proxy] + @proxy = options[:proxy] + else + proxyUri = @uri.find_proxy + if !proxyUri.nil? && (proxyUri.scheme == 'http' || proxyUri.scheme == 'https') + @proxy = proxyUri + end + end + + reconnect_time = options[:reconnect_time] || DEFAULT_RECONNECT_TIME + @backoff = Backoff.new(reconnect_time, MAX_RECONNECT_TIME) + + @on = { event: ->(_) {}, error: ->(_) {} } + @last_id = nil + + yield self if block_given? + + Thread.new do + run_stream + end + end + + def on(event_name, &action) + @on[event_name.to_sym] = action + end + + def on_event(&action) + @on[:event] = action + end + + def on_error(&action) + @on[:error] = action + end + + def close + if @stopped.make_true + @cxn.close if !@cxn.nil? + end + end + + private + + def default_logger + log = ::Logger.new($stdout) + log.level = ::Logger::WARN + log + end + + def run_stream + while !@stopped.value + @cxn = nil + begin + @cxn = connect + read_stream(@cxn) if !@cxn.nil? + rescue Errno::EBADF + # don't log this - it probably means we closed our own connection deliberately + rescue StandardError => e + @logger.error { "Unexpected error from event source: #{e.inspect}" } + @logger.debug { "Exception trace: #{e.backtrace}" } + end + @cxn.close if !cxn.nil? + end + end + + # Try to establish a streaming connection. Returns the StreamingHTTPConnection object if successful. + def connect + loop do + return if @stopped.value + interval = @backoff.next_interval + if interval > 0 + @logger.warn { "Will retry connection after #{'%.3f' % interval} seconds" } + sleep(interval) + end + begin + cxn = open_connection(build_headers) + if cxn.status != 200 + body = cxn.read_all # grab the whole response body in case it has error details + cxn.close + @on[:error].call({status_code: cxn.status, body: body}) + next + elsif cxn.headers["content-type"] && cxn.headers["content-type"].start_with?("text/event-stream") + return cxn # we're good to proceed + end + @logger.error { "Event source returned unexpected content type '#{cxn.headers["content-type"]}'" } + rescue Errno::EBADF + raise + rescue StandardError => e + @logger.error { "Unexpected error from event source: #{e.inspect}" } + @logger.debug { "Exception trace: #{e.backtrace}" } + cxn.close if !cxn.nil? + end + # if unsuccessful, continue the loop to connect again + end + end + + # Just calls the StreamingHTTPConnection constructor - factored out for test purposes + def open_connection(headers) + StreamingHTTPConnection.new(@uri, @proxy, headers, @connect_timeout, @read_timeout) + end + + # Pipe the output of the StreamingHTTPConnection into the EventParser, and dispatch events as + # they arrive. + def read_stream(cxn) + event_parser = EventParser.new(cxn.read_lines) + event_parser.items.each do |item| + return if @stopped.value + case item + when SSEEvent + dispatch_event(item) + when SSESetRetryInterval + @backoff.base_interval = event.milliseconds.t-Of / 1000 + end + end + end + + def dispatch_event(event) + @last_id = event.id + + # Tell the Backoff object that as of the current time, we have succeeded in getting some data. It + # uses that information so it can automatically reset itself if enough time passes between failures. + @backoff.mark_success + + # Pass the event to the caller + @on[:event].call(event) + end + + def build_headers + h = { + 'Accept' => 'text/event-stream', + 'Cache-Control' => 'no-cache' + } + h['Last-Event-Id'] = @last_id if !@last_id.nil? + h.merge(@headers) + end + end +end diff --git a/lib/sse_client/sse_events.rb b/lib/sse_client/sse_events.rb new file mode 100644 index 00000000..762cc2b0 --- /dev/null +++ b/lib/sse_client/sse_events.rb @@ -0,0 +1,67 @@ + +module SSE + # Server-Sent Event type used by SSEClient and EventParser. + SSEEvent = Struct.new(:type, :data, :id) + + SSESetRetryInterval = Struct.new(:milliseconds) + + # + # Accepts lines of text via an iterator, and parses them into SSE messages. + # + class EventParser + def initialize(lines) + @lines = lines + reset_buffers + end + + # Generator that parses the input interator and returns instances of SSEEvent or SSERetryInterval. + def items + Enumerator.new do |gen| + @lines.each do |line| + line.chomp! + if line.empty? + event = maybe_create_event + reset_buffers + gen.yield event if !event.nil? + else + case line + when /^(\w+): ?(.*)$/ + item = process_field($1, $2) + gen.yield item if !item.nil? + end + end + end + end + end + + private + + def reset_buffers + @id = nil + @type = nil + @data = "" + end + + def process_field(name, value) + case name + when "event" + @type = value.to_sym + when "data" + @data << "\n" if !@data.empty? + @data << value + when "id" + @id = value + when "retry" + if /^(?\d+)$/ =~ value + return SSESetRetryInterval.new(num.to_i) + end + end + nil + end + + def maybe_create_event + return nil if @data.empty? + SSEEvent.new(@type || :message, @data, @id) + end + end +end diff --git a/lib/sse_client/streaming_http.rb b/lib/sse_client/streaming_http.rb new file mode 100644 index 00000000..1c0ed52b --- /dev/null +++ b/lib/sse_client/streaming_http.rb @@ -0,0 +1,195 @@ +require "http_tools" +require "socketry" + +module SSE + # + # Wrapper around a socket providing a simplified HTTP request-response cycle including streaming. + # The socket is created and managed by Socketry, which we use so that we can have a read timeout. + # + class StreamingHTTPConnection + attr_reader :status, :headers + + def initialize(uri, proxy, headers, connect_timeout, read_timeout) + @socket = HTTPConnectionFactory.connect(uri, proxy, connect_timeout, read_timeout) + @socket.write(build_request(uri, headers)) + @reader = HTTPResponseReader.new(@socket, read_timeout) + @status = @reader.status + @headers = @reader.headers + end + + def close + @socket.close if @socket + @socket = nil + end + + # Generator that returns one line of the response body at a time (delimited by \r, \n, + # or \r\n) until the response is fully consumed or the socket is closed. + def read_lines + @reader.read_lines + end + + # Consumes the entire response body and returns it. + def read_all + @reader.read_all + end + + private + + # Build an HTTP request line and headers. + def build_request(uri, headers) + ret = "GET #{uri.request_uri} HTTP/1.1\r\n" + ret << "Host: #{uri.host}\r\n" + headers.each { |k, v| + ret << "#{k}: #{v}\r\n" + } + ret + "\r\n" + end + end + + # + # Used internally to send the HTTP request, including the proxy dialogue if necessary. + # + class HTTPConnectionFactory + def self.connect(uri, proxy, connect_timeout, read_timeout) + if !proxy + return open_socket(uri, connect_timeout) + end + + socket = open_socket(proxy, connect_timeout) + socket.write(build_proxy_request(uri, proxy)) + + # temporarily create a reader just for the proxy connect response + proxy_reader = HTTPResponseReader.new(socket, read_timeout) + if proxy_reader.status != 200 + raise ProxyError, "proxy connection refused, status #{proxy_reader.status}" + end + + # start using TLS at this point if appropriate + if uri.scheme.downcase == 'https' + wrap_socket_in_ssl_socket(socket) + else + socket + end + end + + private + + def self.open_socket(uri, connect_timeout) + if uri.scheme.downcase == 'https' + Socketry::SSL::Socket.connect(uri.host, uri.port, timeout: connect_timeout) + else + Socketry::TCP::Socket.connect(uri.host, uri.port, timeout: connect_timeout) + end + end + + # Build a proxy connection header. + def self.build_proxy_request(uri, proxy) + ret = "CONNECT #{uri.host}:#{uri.port} HTTP/1.1\r\n" + ret << "Host: #{uri.host}:#{uri.port}\r\n" + if proxy.user || proxy.password + encoded_credentials = Base64.strict_encode64([proxy.user || '', proxy.password || ''].join(":")) + ret << "Proxy-Authorization: Basic #{encoded_credentials}\r\n" + end + ret << "\r\n" + ret + end + + def self.wrap_socket_in_ssl_socket(socket) + io = IO.try_convert(socket) + ssl_sock = OpenSSL::SSL::SSLSocket.new(io, OpenSSL::SSL::SSLContext.new) + ssl_sock.connect + Socketry::SSL::Socket.new.from_socket(ssl_sock) + end + end + + class ProxyError < StandardError + def initialize(message) + super + end + end + + # + # Used internally to read the HTTP response, either all at once or as a stream of text lines. + # Incoming data is fed into an instance of HTTPTools::Parser, which gives us the header and + # chunks of the body via callbacks. + # + class HTTPResponseReader + DEFAULT_CHUNK_SIZE = 10000 + + attr_reader :status, :headers + + def initialize(socket, read_timeout) + @socket = socket + @read_timeout = read_timeout + @parser = HTTPTools::Parser.new + @buffer = "" + @done = false + @lock = Mutex.new + + # Provide callbacks for the Parser to give us the headers and body. This has to be done + # before we start piping any data into the parser. + have_headers = false + @parser.on(:header) do + have_headers = true + end + @parser.on(:stream) do |data| + @lock.synchronize { @buffer << data } # synchronize because we're called from another thread in Socketry + end + @parser.on(:finish) do + @lock.synchronize { @done = true } + end + + # Block until the status code and headers have been successfully read. + while !have_headers + raise EOFError if !read_chunk_into_buffer + end + @headers = Hash[@parser.header.map { |k,v| [k.downcase, v] }] + @status = @parser.status_code + end + + def read_lines + Enumerator.new do |gen| + loop do + line = read_line + break if line.nil? + gen.yield line + end + end + end + + def read_all + while read_chunk_into_buffer + end + @buffer + end + + private + + # Attempt to read some more data from the socket. Return true if successful, false if EOF. + # A read timeout will result in an exception from Socketry's readpartial method. + def read_chunk_into_buffer + # If @done is set, it means the Parser has signaled end of response body + @lock.synchronize { return false if @done } + data = @socket.readpartial(DEFAULT_CHUNK_SIZE, timeout: @read_timeout) + return false if data == :eof + @parser << data + # We are piping the content through the parser so that it can handle things like chunked + # encoding for us. The content ends up being appended to @buffer via our callback. + true + end + + # Extract the next line of text from the read buffer, refilling the buffer as needed. + def read_line + loop do + @lock.synchronize do + i = @buffer.index(/[\r\n]/) + if !i.nil? + i += 1 if (@buffer[i] == "\r" && i < @buffer.length - 1 && @buffer[i + 1] == "\n") + return @buffer.slice!(0, i + 1).force_encoding(Encoding::UTF_8) + end + end + return nil if !read_chunk_into_buffer + end + end + end +end diff --git a/spec/events_spec.rb b/spec/events_spec.rb index cbce1fbe..56bd14a2 100644 --- a/spec/events_spec.rb +++ b/spec/events_spec.rb @@ -351,12 +351,12 @@ expect(hc.get_request.headers["Authorization"]).to eq "sdk_key" end - it "stops posting events after getting a 401 error" do + def verify_unrecoverable_http_error(status) @ep = subject.new("sdk_key", default_config, hc) e = { kind: "identify", user: user } @ep.add_event(e) - hc.set_response_status(401) + hc.set_response_status(status) @ep.flush @ep.wait_until_inactive expect(hc.get_request).not_to be_nil @@ -368,7 +368,7 @@ expect(hc.get_request).to be_nil end - it "retries flush once after 5xx error" do + def verify_recoverable_http_error(status) @ep = subject.new("sdk_key", default_config, hc) e = { kind: "identify", user: user } @ep.add_event(e) @@ -380,6 +380,33 @@ expect(hc.get_request).not_to be_nil expect(hc.get_request).not_to be_nil expect(hc.get_request).to be_nil # no 3rd request + + # now verify that a subsequent flush still generates a request + hc.reset + @ep.add_event(e) + @ep.flush + @ep.wait_until_inactive + expect(hc.get_request).not_to be_nil + end + + it "stops posting events after getting a 401 error" do + verify_unrecoverable_http_error(401) + end + + it "stops posting events after getting a 403 error" do + verify_unrecoverable_http_error(403) + end + + it "retries after 408 error" do + verify_recoverable_http_error(408) + end + + it "retries after 429 error" do + verify_recoverable_http_error(429) + end + + it "retries after 503 error" do + verify_recoverable_http_error(503) end it "retries flush once after connection error" do diff --git a/spec/ldclient_spec.rb b/spec/ldclient_spec.rb index 405e0d53..8e4b5eb5 100644 --- a/spec/ldclient_spec.rb +++ b/spec/ldclient_spec.rb @@ -7,7 +7,7 @@ let(:offline_client) do subject.new("secret", offline_config) end - let(:update_processor) { NullUpdateProcessor.new } + let(:update_processor) { LaunchDarkly::NullUpdateProcessor.new } let(:config) { LaunchDarkly::Config.new({send_events: false, update_processor: update_processor}) } let(:client) do subject.new("secret", config) @@ -160,13 +160,4 @@ def event_processor expect(ep).not_to be_a(LaunchDarkly::NullEventProcessor) end end - - class NullUpdateProcessor - def start - end - - def initialized? - true - end - end end \ No newline at end of file diff --git a/spec/polling_spec.rb b/spec/polling_spec.rb new file mode 100644 index 00000000..8183b8c3 --- /dev/null +++ b/spec/polling_spec.rb @@ -0,0 +1,89 @@ +require "spec_helper" +require 'ostruct' + +describe LaunchDarkly::PollingProcessor do + subject { LaunchDarkly::PollingProcessor } + let(:store) { LaunchDarkly::InMemoryFeatureStore.new } + let(:config) { LaunchDarkly::Config.new(feature_store: store) } + let(:requestor) { double() } + let(:processor) { subject.new(config, requestor) } + + describe 'successful request' do + flag = { key: 'flagkey', version: 1 } + segment = { key: 'segkey', version: 1 } + all_data = { + flags: { + flagkey: flag + }, + segments: { + segkey: segment + } + } + + it 'puts feature data in store' do + allow(requestor).to receive(:request_all_data).and_return(all_data) + ready = processor.start + ready.wait + expect(store.get(LaunchDarkly::FEATURES, "flagkey")).to eq(flag) + expect(store.get(LaunchDarkly::SEGMENTS, "segkey")).to eq(segment) + end + + it 'sets initialized to true' do + allow(requestor).to receive(:request_all_data).and_return(all_data) + ready = processor.start + ready.wait + expect(processor.initialized?).to be true + expect(store.initialized?).to be true + end + end + + describe 'connection error' do + it 'does not cause immediate failure, does not set initialized' do + allow(requestor).to receive(:request_all_data).and_raise(StandardError.new("test error")) + ready = processor.start + finished = ready.wait(0.2) + expect(finished).to be false + expect(processor.initialized?).to be false + expect(store.initialized?).to be false + end + end + + describe 'HTTP errors' do + def verify_unrecoverable_http_error(status) + allow(requestor).to receive(:request_all_data).and_raise(LaunchDarkly::UnexpectedResponseError.new(status)) + ready = processor.start + finished = ready.wait(0.2) + expect(finished).to be true + expect(processor.initialized?).to be false + end + + def verify_recoverable_http_error(status) + allow(requestor).to receive(:request_all_data).and_raise(LaunchDarkly::UnexpectedResponseError.new(status)) + ready = processor.start + finished = ready.wait(0.2) + expect(finished).to be false + expect(processor.initialized?).to be false + end + + it 'stops immediately for error 401' do + verify_unrecoverable_http_error(401) + end + + it 'stops immediately for error 403' do + verify_unrecoverable_http_error(403) + end + + it 'does not stop immediately for error 408' do + verify_recoverable_http_error(408) + end + + it 'does not stop immediately for error 429' do + verify_recoverable_http_error(429) + end + + it 'does not stop immediately for error 503' do + verify_recoverable_http_error(503) + end + end +end + diff --git a/spec/sse_client/sse_client_spec.rb b/spec/sse_client/sse_client_spec.rb new file mode 100644 index 00000000..ecb7c4e4 --- /dev/null +++ b/spec/sse_client/sse_client_spec.rb @@ -0,0 +1,139 @@ +require "spec_helper" +require "socketry" +require "sse_client/sse_shared" + +# +# End-to-end tests of SSEClient against a real server +# +describe SSE::SSEClient do + subject { SSE::SSEClient } + + def with_client(client) + begin + yield client + ensure + client.close + end + end + + it "sends expected headers" do + with_server do |server| + requests = Queue.new + server.setup_response("/") do |req,res| + requests << req + res.content_type = "text/event-stream" + res.status = 200 + end + + headers = { + "Authorization" => "secret" + } + + with_client(subject.new(server.base_uri, headers: headers)) do |client| + received_req = requests.pop + expect(received_req.header).to eq({ + "accept" => ["text/event-stream"], + "cache-control" => ["no-cache"], + "host" => ["127.0.0.1"], + "authorization" => ["secret"] + }) + end + end + end + + it "receives messages" do + events_body = <<-EOT +event: go +data: foo +id: 1 + +event: stop +data: bar + +EOT + with_server do |server| + server.setup_response("/") do |req,res| + res.content_type = "text/event-stream" + res.status = 200 + res.body = events_body + end + + event_sink = Queue.new + client = subject.new(server.base_uri) do |c| + c.on_event { |event| event_sink << event } + end + + with_client(client) do |client| + expect(event_sink.pop).to eq(SSE::SSEEvent.new(:go, "foo", "1")) + expect(event_sink.pop).to eq(SSE::SSEEvent.new(:stop, "bar", nil)) + end + end + end + + it "reconnects after error response" do + events_body = <<-EOT +event: go +data: foo + +EOT + with_server do |server| + attempt = 0 + server.setup_response("/") do |req,res| + attempt += 1 + if attempt == 1 + res.status = 500 + res.body = "sorry" + res.keep_alive = false + else + res.content_type = "text/event-stream" + res.status = 200 + res.body = events_body + end + end + + event_sink = Queue.new + error_sink = Queue.new + client = subject.new(server.base_uri, reconnect_time: 0.25) do |c| + c.on_event { |event| event_sink << event } + c.on_error { |error| error_sink << error } + end + + with_client(client) do |client| + expect(event_sink.pop).to eq(SSE::SSEEvent.new(:go, "foo", nil)) + expect(error_sink.pop).to eq({ status_code: 500, body: "sorry" }) + expect(attempt).to be >= 2 + end + end + end + + it "reconnects after read timeout" do + events_body = <<-EOT +event: go +data: foo + +EOT + with_server do |server| + attempt = 0 + server.setup_response("/") do |req,res| + attempt += 1 + if attempt == 1 + sleep(2) + end + res.content_type = "text/event-stream" + res.status = 200 + res.body = events_body + end + + event_sink = Queue.new + client = subject.new(server.base_uri, + reconnect_time: 0.25, read_timeout: 0.25) do |c| + c.on_event { |event| event_sink << event } + end + + with_client(client) do |client| + expect(event_sink.pop).to eq(SSE::SSEEvent.new(:go, "foo", nil)) + expect(attempt).to be >= 2 + end + end + end +end diff --git a/spec/sse_client/sse_events_spec.rb b/spec/sse_client/sse_events_spec.rb new file mode 100644 index 00000000..438cfa7a --- /dev/null +++ b/spec/sse_client/sse_events_spec.rb @@ -0,0 +1,100 @@ +require "spec_helper" + +describe SSE::EventParser do + subject { SSE::EventParser } + + it "parses an event with all fields" do + lines = [ + "event: abc\r\n", + "data: def\r\n", + "id: 1\r\n", + "\r\n" + ] + ep = subject.new(lines) + + expected_event = SSE::SSEEvent.new(:abc, "def", "1") + output = ep.items.to_a + expect(output).to eq([ expected_event ]) + end + + it "parses an event with only data" do + lines = [ + "data: def\r\n", + "\r\n" + ] + ep = subject.new(lines) + + expected_event = SSE::SSEEvent.new(:message, "def", nil) + output = ep.items.to_a + expect(output).to eq([ expected_event ]) + end + + it "parses an event with multi-line data" do + lines = [ + "data: def\r\n", + "data: ghi\r\n", + "\r\n" + ] + ep = subject.new(lines) + + expected_event = SSE::SSEEvent.new(:message, "def\nghi", nil) + output = ep.items.to_a + expect(output).to eq([ expected_event ]) + end + + it "ignores comments" do + lines = [ + ":", + "data: def\r\n", + ":", + "\r\n" + ] + ep = subject.new(lines) + + expected_event = SSE::SSEEvent.new(:message, "def", nil) + output = ep.items.to_a + expect(output).to eq([ expected_event ]) + end + + it "parses reconnect interval" do + lines = [ + "retry: 2500\r\n", + "\r\n" + ] + ep = subject.new(lines) + + expected_item = SSE::SSESetRetryInterval.new(2500) + output = ep.items.to_a + expect(output).to eq([ expected_item ]) + end + + it "parses multiple events" do + lines = [ + "event: abc\r\n", + "data: def\r\n", + "id: 1\r\n", + "\r\n", + "data: ghi\r\n", + "\r\n" + ] + ep = subject.new(lines) + + expected_event_1 = SSE::SSEEvent.new(:abc, "def", "1") + expected_event_2 = SSE::SSEEvent.new(:message, "ghi", nil) + output = ep.items.to_a + expect(output).to eq([ expected_event_1, expected_event_2 ]) + end + + it "ignores events with no data" do + lines = [ + "event: nothing\r\n", + "\r\n", + "event: nada\r\n", + "\r\n" + ] + ep = subject.new(lines) + + output = ep.items.to_a + expect(output).to eq([]) + end +end diff --git a/spec/sse_client/sse_shared.rb b/spec/sse_client/sse_shared.rb new file mode 100644 index 00000000..3ecabb57 --- /dev/null +++ b/spec/sse_client/sse_shared.rb @@ -0,0 +1,82 @@ +require "spec_helper" +require "webrick" +require "webrick/httpproxy" +require "webrick/https" + +class StubHTTPServer + def initialize + @port = 50000 + begin + @server = create_server(@port) + rescue Errno::EADDRINUSE + @port += 1 + retry + end + end + + def create_server(port) + WEBrick::HTTPServer.new( + BindAddress: '127.0.0.1', + Port: port, + AccessLog: [], + Logger: NullLogger.new + ) + end + + def start + Thread.new { @server.start } + end + + def stop + @server.shutdown + end + + def base_uri + URI("http://127.0.0.1:#{@port}") + end + + def setup_response(uri_path, &action) + @server.mount_proc(uri_path, action) + end +end + +class StubProxyServer < StubHTTPServer + attr_reader :request_count + attr_accessor :connect_status + + def initialize + super + @request_count = 0 + end + + def create_server(port) + WEBrick::HTTPProxyServer.new( + BindAddress: '127.0.0.1', + Port: port, + AccessLog: [], + Logger: NullLogger.new, + ProxyContentHandler: proc do |req,res| + if !@connect_status.nil? + res.status = @connect_status + end + @request_count += 1 + end + ) + end +end + +class NullLogger + def method_missing(*) + self + end +end + +def with_server(server = nil) + server = StubHTTPServer.new if server.nil? + begin + server.start + yield server + ensure + server.stop + end +end diff --git a/spec/sse_client/streaming_http_spec.rb b/spec/sse_client/streaming_http_spec.rb new file mode 100644 index 00000000..7dfac9bd --- /dev/null +++ b/spec/sse_client/streaming_http_spec.rb @@ -0,0 +1,263 @@ +require "spec_helper" +require "socketry" +require "sse_client/sse_shared" + +# +# End-to-end tests of HTTP requests against a real server +# +describe SSE::StreamingHTTPConnection do + subject { SSE::StreamingHTTPConnection } + + def with_connection(cxn) + begin + yield cxn + ensure + cxn.close + end + end + + it "makes HTTP connection and sends request" do + with_server do |server| + requests = Queue.new + server.setup_response("/foo") do |req,res| + requests << req + res.status = 200 + end + headers = { + "Accept" => "text/plain" + } + with_connection(subject.new(server.base_uri.merge("/foo?bar"), nil, headers, 30, 30)) do + received_req = requests.pop + expect(received_req.unparsed_uri).to eq("/foo?bar") + expect(received_req.header).to eq({ + "accept" => ["text/plain"], + "host" => [server.base_uri.host] + }) + end + end + end + + it "receives response status" do + with_server do |server| + server.setup_response("/foo") do |req,res| + res.status = 204 + end + with_connection(subject.new(server.base_uri.merge("/foo"), nil, {}, 30, 30)) do |cxn| + expect(cxn.status).to eq(204) + end + end + end + + it "receives response headers" do + with_server do |server| + server.setup_response("/foo") do |req,res| + res["Content-Type"] = "application/json" + end + with_connection(subject.new(server.base_uri.merge("/foo"), nil, {}, 30, 30)) do |cxn| + expect(cxn.headers["content-type"]).to eq("application/json") + end + end + end + + it "can read response as lines" do + body = <<-EOT +This is +a response +EOT + with_server do |server| + server.setup_response("/foo") do |req,res| + res.body = body + end + with_connection(subject.new(server.base_uri.merge("/foo"), nil, {}, 30, 30)) do |cxn| + lines = cxn.read_lines + expect(lines.next).to eq("This is\n") + expect(lines.next).to eq("a response\n") + end + end + end + + it "can read entire response body" do + body = <<-EOT +This is +a response +EOT + with_server do |server| + server.setup_response("/foo") do |req,res| + res.body = body + end + with_connection(subject.new(server.base_uri.merge("/foo"), nil, {}, 30, 30)) do |cxn| + read_body = cxn.read_all + expect(read_body).to eq("This is\na response\n") + end + end + end + + it "enforces read timeout" do + with_server do |server| + server.setup_response("/") do |req,res| + sleep(2) + res.status = 200 + end + expect { subject.new(server.base_uri, nil, {}, 30, 0.25) }.to raise_error(Socketry::TimeoutError) + end + end + + it "connects to HTTP server through proxy" do + body = "hi" + with_server do |server| + server.setup_response("/") do |req,res| + res.body = body + end + with_server(StubProxyServer.new) do |proxy| + with_connection(subject.new(server.base_uri, proxy.base_uri, {}, 30, 30)) do |cxn| + read_body = cxn.read_all + expect(read_body).to eq("hi") + expect(proxy.request_count).to eq(1) + end + end + end + end + + it "throws error if proxy responds with error status" do + with_server do |server| + server.setup_response("/") do |req,res| + res.body = body + end + with_server(StubProxyServer.new) do |proxy| + proxy.connect_status = 403 + expect { subject.new(server.base_uri, proxy.base_uri, {}, 30, 30) }.to raise_error(SSE::ProxyError) + end + end + end + + # The following 2 tests were originally written to connect to an embedded HTTPS server made with + # WEBrick. Unfortunately, some unknown problem prevents WEBrick's self-signed certificate feature + # from working in JRuby 9.1 (but not in any other Ruby version). Therefore these tests currently + # hit an external URL. + + it "connects to HTTPS server" do + with_connection(subject.new(URI("https://app.launchdarkly.com"), nil, {}, 30, 30)) do |cxn| + expect(cxn.status).to eq 200 + end + end + + it "connects to HTTPS server through proxy" do + with_server(StubProxyServer.new) do |proxy| + with_connection(subject.new(URI("https://app.launchdarkly.com"), proxy.base_uri, {}, 30, 30)) do |cxn| + expect(cxn.status).to eq 200 + expect(proxy.request_count).to eq(1) + end + end + end +end + +# +# Tests of response parsing functionality without a real HTTP request +# +describe SSE::HTTPResponseReader do + subject { SSE::HTTPResponseReader } + + let(:simple_response) { <<-EOT +HTTP/1.1 200 OK +Cache-Control: no-cache +Content-Type: text/event-stream + +line1\r +line2 +\r +EOT + } + + def make_chunks(str) + # arbitrarily split content into 5-character blocks + str.scan(/.{1,5}/m).to_enum + end + + def mock_socket_without_timeout(chunks) + mock_socket(chunks) { :eof } + end + + def mock_socket_with_timeout(chunks) + mock_socket(chunks) { raise Socketry::TimeoutError } + end + + def mock_socket(chunks) + sock = double + allow(sock).to receive(:readpartial) do + begin + chunks.next + rescue StopIteration + yield + end + end + sock + end + + it "parses status code" do + socket = mock_socket_without_timeout(make_chunks(simple_response)) + reader = subject.new(socket, 0) + expect(reader.status).to eq(200) + end + + it "parses headers" do + socket = mock_socket_without_timeout(make_chunks(simple_response)) + reader = subject.new(socket, 0) + expect(reader.headers).to eq({ + 'cache-control' => 'no-cache', + 'content-type' => 'text/event-stream' + }) + end + + it "can read entire response body" do + socket = mock_socket_without_timeout(make_chunks(simple_response)) + reader = subject.new(socket, 0) + expect(reader.read_all).to eq("line1\r\nline2\n\r\n") + end + + it "can read response body as lines" do + socket = mock_socket_without_timeout(make_chunks(simple_response)) + reader = subject.new(socket, 0) + expect(reader.read_lines.to_a).to eq([ + "line1\r\n", + "line2\n", + "\r\n" + ]) + end + + it "handles chunked encoding" do + chunked_response = <<-EOT +HTTP/1.1 200 OK +Content-Type: text/plain +Transfer-Encoding: chunked + +6\r +things\r +A\r + and stuff\r +0\r +\r +EOT + socket = mock_socket_without_timeout(make_chunks(chunked_response)) + reader = subject.new(socket, 0) + expect(reader.read_all).to eq("things and stuff") + end + + it "raises error if response ends without complete headers" do + malformed_response = <<-EOT +HTTP/1.1 200 OK +Cache-Control: no-cache +EOT + socket = mock_socket_without_timeout(make_chunks(malformed_response)) + expect { subject.new(socket, 0) }.to raise_error(EOFError) + end + + it "throws timeout if thrown by socket read" do + socket = mock_socket_with_timeout(make_chunks(simple_response)) + reader = subject.new(socket, 0) + lines = reader.read_lines + lines.next + lines.next + lines.next + expect { lines.next }.to raise_error(Socketry::TimeoutError) + end +end