Skip to content

feat: parse server response and react to error messages #281

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Sep 6, 2022
99 changes: 80 additions & 19 deletions apmproxy/apmserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"errors"
"fmt"
"io"
Expand All @@ -30,11 +31,20 @@ import (
"time"
)

type jsonResult struct {
Errors []jsonError `json:"errors,omitempty"`
}

type jsonError struct {
Message string `json:"message"`
Document string `json:"document,omitempty"`
}

// ForwardApmData receives agent data as it comes in and posts it to the APM server.
// Stop checking for, and sending agent data when the function invocation
// has completed, signaled via a channel.
func (c *Client) ForwardApmData(ctx context.Context, metadataContainer *MetadataContainer) error {
if c.Status == Failing {
if c.IsUnhealthy() {
return nil
}
for {
Expand All @@ -59,7 +69,7 @@ func (c *Client) ForwardApmData(ctx context.Context, metadataContainer *Metadata

// FlushAPMData reads all the apm data in the apm data channel and sends it to the APM server.
func (c *Client) FlushAPMData(ctx context.Context) {
if c.Status == Failing {
if c.IsUnhealthy() {
c.logger.Debug("Flush skipped - Transport failing")
return
}
Expand All @@ -86,7 +96,7 @@ func (c *Client) FlushAPMData(ctx context.Context) {
func (c *Client) PostToApmServer(ctx context.Context, agentData AgentData) error {
// todo: can this be a streaming or streaming style call that keeps the
// connection open across invocations?
if c.Status == Failing {
if c.IsUnhealthy() {
return errors.New("transport status is unhealthy")
}

Expand Down Expand Up @@ -131,61 +141,112 @@ func (c *Client) PostToApmServer(ctx context.Context, agentData AgentData) error
c.logger.Debug("Sending data chunk to APM server")
resp, err := c.client.Do(req)
if err != nil {
c.SetApmServerTransportState(ctx, Failing)
c.UpdateStatus(ctx, Failing)
return fmt.Errorf("failed to post to APM server: %v", err)
}

//Read the response body
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
c.SetApmServerTransportState(ctx, Failing)
return fmt.Errorf("failed to read the response body after posting to the APM server")

// On success, the server will respond with a 202 Accepted status code and no body.
if resp.StatusCode == http.StatusAccepted {
c.UpdateStatus(ctx, Healthy)
return nil
}

if resp.StatusCode == http.StatusUnauthorized {
// RateLimited
if resp.StatusCode == http.StatusTooManyRequests {
c.logger.Warnf("Transport has been rate limited: response status code: %d", resp.StatusCode)
c.UpdateStatus(ctx, RateLimited)
return nil
}

jErr := jsonResult{}
if err := json.NewDecoder(resp.Body).Decode(&jErr); err != nil {
// non critical error.
// Log a warning and continue.
c.logger.Warnf("failed to decode response body: %v", err)
}

// Auth errors
if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden {
c.logger.Warnf("Authentication with the APM server failed: response status code: %d", resp.StatusCode)
c.logger.Debugf("APM server response body: %v", string(body))
for _, err := range jErr.Errors {
c.logger.Warnf("failed to authenticate: document %s: message: %s", err.Document, err.Message)
}
c.UpdateStatus(ctx, Failing)
return nil
}

// ClientErrors
if resp.StatusCode >= 400 && resp.StatusCode < 500 {
c.logger.Warnf("client error: response status code: %d", resp.StatusCode)
for _, err := range jErr.Errors {
c.logger.Warnf("client error: document %s: message: %s", err.Document, err.Message)
}
c.UpdateStatus(ctx, ClientFailing)
return nil
}

// critical errors
if resp.StatusCode == http.StatusInternalServerError || resp.StatusCode == http.StatusServiceUnavailable {
c.logger.Warnf("failed to post data to APM server: response status code: %d", resp.StatusCode)
for _, err := range jErr.Errors {
c.logger.Warnf("critical error: document %s: message: %s", err.Document, err.Message)
}
c.UpdateStatus(ctx, Failing)
return nil
}

c.SetApmServerTransportState(ctx, Healthy)
c.logger.Debug("Transport status set to healthy")
c.logger.Debugf("APM server response body: %v", string(body))
c.logger.Debugf("APM server response status code: %v", resp.StatusCode)
c.logger.Warnf("unhandled status code: %d", resp.StatusCode)
return nil
}

// SetApmServerTransportState takes a state of the APM server transport and updates
// IsUnhealthy returns true if the apmproxy is not healthy.
func (c *Client) IsUnhealthy() bool {
c.mu.Lock()
defer c.mu.Unlock()
return c.Status == Failing
}

// UpdateStatus takes a state of the APM server transport and updates
// the current state of the transport. For a change to a failing state, the grace period
// is calculated and a go routine is started that waits for that period to complete
// before changing the status to "pending". This would allow a subsequent send attempt
// to the APM server.
//
// This function is public for use in tests.
func (c *Client) SetApmServerTransportState(ctx context.Context, status Status) {
func (c *Client) UpdateStatus(ctx context.Context, status Status) {
switch status {
case Healthy:
c.mu.Lock()
c.Status = status
c.logger.Debugf("APM server Transport status set to %s", c.Status)
c.ReconnectionCount = -1
c.mu.Unlock()
case RateLimited, ClientFailing:
// No need to start backoff, this is a temporary status. It usually
// means we went over the limit of events/s.
Comment on lines +238 to +239
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would ClientFailing be temporary? From what I can see above, it's either due to auth failure (probably not temporary without user intervention?) or some validation error (probably implies a bug in the agent or server?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I can see above, it's either due to auth failure (probably not temporary without user intervention?)

Ah, good point! I've made auth errors a critical failure

When would ClientFailing be temporary?

From what I could see from the middlewares in the APM server repository, this would happen on data decoding/validation errors, request body too large or invalid query. Those are errors tied to a specific request, I don't think we should trigger a backoff and associated delay.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I could see from the middlewares in the APM server repository, this would happen on data decoding/validation errors, request body too large or invalid query. Those are errors tied to a specific request, I don't think we should trigger a backoff and associated delay.

good point, I think this is fine as is now.

c.mu.Lock()
c.Status = status
c.logger.Debugf("APM server Transport status set to %s", c.Status)
c.mu.Unlock()
case Failing:
c.mu.Lock()
c.Status = status
c.logger.Debugf("APM server Transport status set to %s", c.Status)
c.ReconnectionCount++
gracePeriodTimer := time.NewTimer(c.ComputeGracePeriod())
c.logger.Debugf("Grace period entered, reconnection count : %d", c.ReconnectionCount)
c.mu.Unlock()

go func() {
select {
case <-gracePeriodTimer.C:
c.logger.Debug("Grace period over - timer timed out")
case <-ctx.Done():
c.logger.Debug("Grace period over - context done")
}
c.Status = Pending
c.mu.Lock()
c.Status = Started
c.logger.Debugf("APM server Transport status set to %s", c.Status)
c.mu.Unlock()
}()
Expand Down
64 changes: 32 additions & 32 deletions apmproxy/apmserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func TestSetHealthyTransport(t *testing.T) {
apmproxy.WithLogger(zaptest.NewLogger(t).Sugar()),
)
require.NoError(t, err)
apmClient.SetApmServerTransportState(context.Background(), apmproxy.Healthy)
apmClient.UpdateStatus(context.Background(), apmproxy.Healthy)
assert.True(t, apmClient.Status == apmproxy.Healthy)
assert.Equal(t, apmClient.ReconnectionCount, -1)
}
Expand All @@ -184,7 +184,7 @@ func TestSetFailingTransport(t *testing.T) {
)
require.NoError(t, err)
apmClient.ReconnectionCount = 0
apmClient.SetApmServerTransportState(context.Background(), apmproxy.Failing)
apmClient.UpdateStatus(context.Background(), apmproxy.Failing)
assert.True(t, apmClient.Status == apmproxy.Failing)
assert.Equal(t, apmClient.ReconnectionCount, 1)
}
Expand All @@ -195,12 +195,12 @@ func TestSetPendingTransport(t *testing.T) {
apmproxy.WithLogger(zaptest.NewLogger(t).Sugar()),
)
require.NoError(t, err)
apmClient.SetApmServerTransportState(context.Background(), apmproxy.Healthy)
apmClient.SetApmServerTransportState(context.Background(), apmproxy.Failing)
apmClient.UpdateStatus(context.Background(), apmproxy.Healthy)
apmClient.UpdateStatus(context.Background(), apmproxy.Failing)
require.Eventually(t, func() bool {
return apmClient.Status != apmproxy.Failing
}, 5*time.Second, 50*time.Millisecond)
assert.True(t, apmClient.Status == apmproxy.Pending)
return !apmClient.IsUnhealthy()
}, 7*time.Second, 50*time.Millisecond)
assert.True(t, apmClient.Status == apmproxy.Started)
assert.Equal(t, apmClient.ReconnectionCount, 0)
}

Expand All @@ -210,8 +210,8 @@ func TestSetPendingTransportExplicitly(t *testing.T) {
apmproxy.WithLogger(zaptest.NewLogger(t).Sugar()),
)
require.NoError(t, err)
apmClient.SetApmServerTransportState(context.Background(), apmproxy.Healthy)
apmClient.SetApmServerTransportState(context.Background(), apmproxy.Pending)
apmClient.UpdateStatus(context.Background(), apmproxy.Healthy)
apmClient.UpdateStatus(context.Background(), apmproxy.Started)
assert.True(t, apmClient.Status == apmproxy.Healthy)
assert.Equal(t, apmClient.ReconnectionCount, -1)
}
Expand All @@ -222,8 +222,8 @@ func TestSetInvalidTransport(t *testing.T) {
apmproxy.WithLogger(zaptest.NewLogger(t).Sugar()),
)
require.NoError(t, err)
apmClient.SetApmServerTransportState(context.Background(), apmproxy.Healthy)
apmClient.SetApmServerTransportState(context.Background(), "Invalid")
apmClient.UpdateStatus(context.Background(), apmproxy.Healthy)
apmClient.UpdateStatus(context.Background(), "Invalid")
assert.True(t, apmClient.Status == apmproxy.Healthy)
assert.Equal(t, apmClient.ReconnectionCount, -1)
}
Expand Down Expand Up @@ -266,7 +266,7 @@ func TestEnterBackoffFromHealthy(t *testing.T) {
apmproxy.WithLogger(zaptest.NewLogger(t).Sugar()),
)
require.NoError(t, err)
apmClient.SetApmServerTransportState(context.Background(), apmproxy.Healthy)
apmClient.UpdateStatus(context.Background(), apmproxy.Healthy)

// Close the APM server early so that POST requests fail and that backoff is enabled
apmServer.Close()
Expand Down Expand Up @@ -321,12 +321,12 @@ func TestEnterBackoffFromFailing(t *testing.T) {
)
require.NoError(t, err)

apmClient.SetApmServerTransportState(context.Background(), apmproxy.Healthy)
apmClient.SetApmServerTransportState(context.Background(), apmproxy.Failing)
apmClient.UpdateStatus(context.Background(), apmproxy.Healthy)
apmClient.UpdateStatus(context.Background(), apmproxy.Failing)
require.Eventually(t, func() bool {
return apmClient.Status != apmproxy.Failing
}, 5*time.Second, 50*time.Millisecond)
assert.Equal(t, apmClient.Status, apmproxy.Pending)
return !apmClient.IsUnhealthy()
}, 7*time.Second, 50*time.Millisecond)
assert.Equal(t, apmClient.Status, apmproxy.Started)

assert.Error(t, apmClient.PostToApmServer(context.Background(), agentData))
assert.Equal(t, apmClient.Status, apmproxy.Failing)
Expand Down Expand Up @@ -375,12 +375,12 @@ func TestAPMServerRecovery(t *testing.T) {
)
require.NoError(t, err)

apmClient.SetApmServerTransportState(context.Background(), apmproxy.Healthy)
apmClient.SetApmServerTransportState(context.Background(), apmproxy.Failing)
apmClient.UpdateStatus(context.Background(), apmproxy.Healthy)
apmClient.UpdateStatus(context.Background(), apmproxy.Failing)
require.Eventually(t, func() bool {
return apmClient.Status != apmproxy.Failing
}, 5*time.Second, 50*time.Millisecond)
assert.Equal(t, apmClient.Status, apmproxy.Pending)
return !apmClient.IsUnhealthy()
}, 7*time.Second, 50*time.Millisecond)
assert.Equal(t, apmClient.Status, apmproxy.Started)

assert.NoError(t, apmClient.PostToApmServer(context.Background(), agentData))
assert.Equal(t, apmClient.Status, apmproxy.Healthy)
Expand Down Expand Up @@ -420,12 +420,12 @@ func TestAPMServerAuthFails(t *testing.T) {
apmproxy.WithLogger(zaptest.NewLogger(t).Sugar()),
)
require.NoError(t, err)
apmClient.SetApmServerTransportState(context.Background(), apmproxy.Healthy)
apmClient.SetApmServerTransportState(context.Background(), apmproxy.Failing)
apmClient.UpdateStatus(context.Background(), apmproxy.Healthy)
apmClient.UpdateStatus(context.Background(), apmproxy.Failing)
require.Eventually(t, func() bool {
return apmClient.Status != apmproxy.Failing
}, 5*time.Second, 50*time.Millisecond)
assert.Equal(t, apmClient.Status, apmproxy.Pending)
return !apmClient.IsUnhealthy()
}, 7*time.Second, 50*time.Millisecond)
assert.Equal(t, apmClient.Status, apmproxy.Started)
assert.NoError(t, apmClient.PostToApmServer(context.Background(), agentData))
assert.NotEqual(t, apmClient.Status, apmproxy.Healthy)
}
Expand Down Expand Up @@ -469,12 +469,12 @@ func TestContinuedAPMServerFailure(t *testing.T) {
apmproxy.WithLogger(zaptest.NewLogger(t).Sugar()),
)
require.NoError(t, err)
apmClient.SetApmServerTransportState(context.Background(), apmproxy.Healthy)
apmClient.SetApmServerTransportState(context.Background(), apmproxy.Failing)
apmClient.UpdateStatus(context.Background(), apmproxy.Healthy)
apmClient.UpdateStatus(context.Background(), apmproxy.Failing)
require.Eventually(t, func() bool {
return apmClient.Status != apmproxy.Failing
}, 5*time.Second, 50*time.Millisecond)
assert.Equal(t, apmClient.Status, apmproxy.Pending)
return !apmClient.IsUnhealthy()
}, 7*time.Second, 50*time.Millisecond)
assert.Equal(t, apmClient.Status, apmproxy.Started)
assert.Error(t, apmClient.PostToApmServer(context.Background(), agentData))
assert.Equal(t, apmClient.Status, apmproxy.Failing)
}
Expand Down
28 changes: 14 additions & 14 deletions apmproxy/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,18 @@ const (

// Client is the client used to communicate with the apm server.
type Client struct {
mu sync.Mutex
bufferPool sync.Pool
DataChannel chan AgentData
client *http.Client
Status Status
ReconnectionCount int
ServerAPIKey string
ServerSecretToken string
serverURL string
receiver *http.Server
sendStrategy SendStrategy
logger *zap.SugaredLogger
mu sync.Mutex
bufferPool sync.Pool
DataChannel chan AgentData
client *http.Client
Status Status
ReconnectionCount int
ServerAPIKey string
ServerSecretToken string
serverURL string
receiver *http.Server
sendStrategy SendStrategy
logger *zap.SugaredLogger

flushMutex sync.Mutex
flushCh chan struct{}
Expand All @@ -76,8 +76,8 @@ func NewClient(opts ...Option) (*Client, error) {
client: &http.Client{
Transport: http.DefaultTransport.(*http.Transport).Clone(),
},
ReconnectionCount: -1,
Status: Healthy,
ReconnectionCount: -1,
Status: Started,
receiver: &http.Server{
Addr: defaultReceiverAddr,
ReadTimeout: defaultDataReceiverTimeout,
Expand Down
2 changes: 1 addition & 1 deletion apmproxy/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (c *Client) handleInfoRequest() (func(w http.ResponseWriter, r *http.Reques
reverseProxy.Transport = customTransport

reverseProxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) {
c.SetApmServerTransportState(r.Context(), Failing)
c.UpdateStatus(r.Context(), Failing)
c.logger.Errorf("Error querying version from the APM server: %v", err)
}

Expand Down
21 changes: 19 additions & 2 deletions apmproxy/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,24 @@ package apmproxy
type Status string

const (
Failing Status = "Failing"
Pending Status = "Pending"
// The apmproxy started but no information can be
// inferred on the status of the transport.
// Either because the apmproxy just started and no
// request was forwarded or because it recovered
// from a failure.
Started Status = "Started"

// Last request completed successfully.
Healthy Status = "Healthy"

// Last request failed.
Failing Status = "Failing"

// The APM Server returned status 429 and the extension
// was ratelimited.
RateLimited Status = "RateLimited"

// A failure on the client was observed. This does not
// trigger any backoff mechanism.
ClientFailing Status = "ClientFailing"
)