Skip to content

Commit 02f4ef1

Browse files
authored
Allow metadata in register transaction (#384)
1 parent 8215472 commit 02f4ef1

File tree

4 files changed

+142
-51
lines changed

4 files changed

+142
-51
lines changed

CHANGELOG.asciidoc

+10
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,16 @@
1616
===== Bug fixes
1717
////
1818
19+
[float]
20+
[[lambda-unreleased]]
21+
=== Unreleased
22+
23+
https://github.com/elastic/apm-aws-lambda/compare/v1.3.1...main[View commits]
24+
25+
[float]
26+
===== Features
27+
- experimental:[] Allow metadata in register transaction {lambda-pull}384[384]
28+
1929
[float]
2030
[[lambda-1.3.1]]
2131
=== 1.3.1 - 2023/04/04

accumulator/batch.go

+51-16
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,16 @@ var (
4343
maxSizeThreshold = 0.9
4444
zeroTime = time.Time{}
4545
newLineSep = []byte("\n")
46-
transactionKey = []byte("transaction")
46+
transactionKey = "transaction"
47+
metadataKey = "metadata"
48+
)
49+
50+
type eventType int
51+
52+
const (
53+
metadataEvent = iota
54+
transactionEvent
55+
otherEvent
4756
)
4857

4958
// Batch manages the data that needs to be shipped to APM Server. It holds
@@ -112,22 +121,44 @@ func (b *Batch) RegisterInvocation(
112121
}
113122

114123
// OnAgentInit caches the transaction ID and the payload for the currently
115-
// executing invocation as reported by the agent. The agent payload will be
116-
// used to create a new transaction in an event the actual transaction is
117-
// not reported by the agent due to unexpected termination.
118-
func (b *Batch) OnAgentInit(reqID, txnID string, payload []byte) error {
119-
if !isTransactionEvent(payload) {
124+
// executing invocation as reported by the agent. The payload can contain
125+
// metadata along with partial transaction. Metadata, if available, will
126+
// be cached for all future invocation. The agent payload will be used to
127+
// create a new transaction in an event the actual transaction is not
128+
// reported by the agent due to unexpected termination.
129+
func (b *Batch) OnAgentInit(reqID, contentEncoding string, raw []byte) error {
130+
payload, err := GetUncompressedBytes(raw, contentEncoding)
131+
if err != nil {
132+
return fmt.Errorf("failed to decompress request body: %w", err)
133+
}
134+
135+
var metadata, txnData []byte
136+
switch findEventType(payload) {
137+
case metadataEvent:
138+
metadata, txnData, _ = bytes.Cut(payload, newLineSep)
139+
case transactionEvent:
140+
txnData = payload
141+
default:
120142
return errors.New("invalid payload")
121143
}
144+
145+
txnID := gjson.GetBytes(txnData, "transaction.id").String()
146+
if txnID == "" {
147+
return errors.New("failed to parse transaction id from registration body")
148+
}
149+
122150
b.mu.Lock()
123151
defer b.mu.Unlock()
152+
if b.metadataBytes == 0 && len(metadata) > 0 {
153+
b.metadataBytes, _ = b.buf.Write(metadata)
154+
}
124155
i, ok := b.invocations[reqID]
125156
if !ok {
126157
// It is possible that the invocation is registered at a later time
127158
i = &Invocation{}
128159
b.invocations[reqID] = i
129160
}
130-
i.TransactionID, i.AgentPayload = txnID, payload
161+
i.TransactionID, i.AgentPayload = txnID, txnData
131162
b.currentlyExecutingRequestID = reqID
132163
return nil
133164
}
@@ -167,7 +198,7 @@ func (b *Batch) AddAgentData(apmData APMData) error {
167198
}
168199
for {
169200
data, after, _ = bytes.Cut(after, newLineSep)
170-
if inc.NeedProxyTransaction() && isTransactionEvent(data) {
201+
if inc.NeedProxyTransaction() && findEventType(data) == transactionEvent {
171202
res := gjson.GetBytes(data, "transaction.id")
172203
if res.Str != "" && inc.TransactionID == res.Str {
173204
inc.TransactionObserved = true
@@ -313,21 +344,25 @@ func (b *Batch) addData(data []byte) error {
313344
return nil
314345
}
315346

316-
func isTransactionEvent(body []byte) bool {
347+
func findEventType(body []byte) eventType {
348+
var quote byte
317349
var key []byte
318350
for i, r := range body {
319351
if r == '"' || r == '\'' {
352+
quote = r
320353
key = body[i+1:]
321354
break
322355
}
323356
}
324-
if len(key) < len(transactionKey) {
325-
return false
357+
end := bytes.IndexByte(key, quote)
358+
if end == -1 {
359+
return otherEvent
326360
}
327-
for i := 0; i < len(transactionKey); i++ {
328-
if transactionKey[i] != key[i] {
329-
return false
330-
}
361+
switch string(key[:end]) {
362+
case transactionKey:
363+
return transactionEvent
364+
case metadataKey:
365+
return metadataEvent
331366
}
332-
return true
367+
return otherEvent
333368
}

accumulator/batch_test.go

+70-20
Original file line numberDiff line numberDiff line change
@@ -95,22 +95,26 @@ func TestShouldShip_ReasonAge(t *testing.T) {
9595
func TestLifecycle(t *testing.T) {
9696
reqID := "test-req-id"
9797
fnARN := "test-fn-arn"
98-
txnID := "023d90ff77f13b9f"
9998
lambdaData := `{"log":{"message":"this is log"}}`
100-
txnData := fmt.Sprintf(`{"transaction":{"id":"%s"}}`, txnID)
99+
txnData := fmt.Sprintf(`{"transaction":{"id":"%s"}}`, "023d90ff77f13b9f")
101100
ts := time.Date(2022, time.October, 1, 1, 1, 1, 0, time.UTC)
102101
txnDur := time.Second
103102

103+
type agentInit struct {
104+
init bool
105+
withMetadata bool
106+
}
107+
104108
for _, tc := range []struct {
105109
name string
106-
agentInit bool
110+
agentInit agentInit
107111
receiveAgentRootTxn bool
108112
receiveLambdaLogRuntime bool
109113
expected string
110114
}{
111115
{
112116
name: "without_agent_init_without_root_txn",
113-
agentInit: false,
117+
agentInit: agentInit{init: false, withMetadata: false},
114118
receiveAgentRootTxn: false,
115119
receiveLambdaLogRuntime: false,
116120
// Without agent init no proxy txn is created if root txn is not reported
@@ -122,7 +126,7 @@ func TestLifecycle(t *testing.T) {
122126
},
123127
{
124128
name: "without_agent_init_with_root_txn",
125-
agentInit: false,
129+
agentInit: agentInit{init: false, withMetadata: false},
126130
receiveAgentRootTxn: true,
127131
receiveLambdaLogRuntime: false,
128132
expected: fmt.Sprintf(
@@ -133,8 +137,8 @@ func TestLifecycle(t *testing.T) {
133137
),
134138
},
135139
{
136-
name: "with_agent_init_with_root_txn",
137-
agentInit: true,
140+
name: "with_no_meta_agent_init_with_root_txn",
141+
agentInit: agentInit{init: true, withMetadata: false},
138142
receiveAgentRootTxn: true,
139143
receiveLambdaLogRuntime: false,
140144
expected: fmt.Sprintf(
@@ -145,8 +149,20 @@ func TestLifecycle(t *testing.T) {
145149
),
146150
},
147151
{
148-
name: "with_agent_init_without_root_txn_with_runtimeDone",
149-
agentInit: true,
152+
name: "with_meta_agent_init_with_root_txn",
153+
agentInit: agentInit{init: true, withMetadata: true},
154+
receiveAgentRootTxn: true,
155+
receiveLambdaLogRuntime: false,
156+
expected: fmt.Sprintf(
157+
"%s\n%s\n%s",
158+
metadata,
159+
generateCompleteTxn(t, txnData, "success", "", txnDur),
160+
lambdaData,
161+
),
162+
},
163+
{
164+
name: "with_no_meta_agent_init_without_root_txn_with_runtimeDone",
165+
agentInit: agentInit{init: true, withMetadata: false},
150166
receiveAgentRootTxn: false,
151167
receiveLambdaLogRuntime: true,
152168
// With agent init proxy txn is created if root txn is not reported.
@@ -159,8 +175,37 @@ func TestLifecycle(t *testing.T) {
159175
),
160176
},
161177
{
162-
name: "with_agent_init_without_root_txn",
163-
agentInit: true,
178+
name: "with_meta_agent_init_without_root_txn_with_runtimeDone",
179+
agentInit: agentInit{init: true, withMetadata: true},
180+
receiveAgentRootTxn: false,
181+
receiveLambdaLogRuntime: true,
182+
// With agent init proxy txn is created if root txn is not reported.
183+
// Details in runtimeDone event is used to find the result of the txn.
184+
expected: fmt.Sprintf(
185+
"%s\n%s\n%s",
186+
metadata,
187+
lambdaData,
188+
generateCompleteTxn(t, txnData, "failure", "failure", txnDur),
189+
),
190+
},
191+
{
192+
name: "with_no_meta_agent_init_without_root_txn",
193+
agentInit: agentInit{init: true, withMetadata: false},
194+
receiveAgentRootTxn: false,
195+
receiveLambdaLogRuntime: false,
196+
// With agent init proxy txn is created if root txn is not reported.
197+
// If runtimeDone event is not available `timeout` is used as the
198+
// result of the transaction.
199+
expected: fmt.Sprintf(
200+
"%s\n%s\n%s",
201+
metadata,
202+
lambdaData,
203+
generateCompleteTxn(t, txnData, "timeout", "failure", txnDur),
204+
),
205+
},
206+
{
207+
name: "with_meta_agent_init_without_root_txn",
208+
agentInit: agentInit{init: true, withMetadata: true},
164209
receiveAgentRootTxn: false,
165210
receiveLambdaLogRuntime: false,
166211
// With agent init proxy txn is created if root txn is not reported.
@@ -179,8 +224,12 @@ func TestLifecycle(t *testing.T) {
179224
// NEXT API response creates a new invocation cache
180225
b.RegisterInvocation(reqID, fnARN, ts.Add(txnDur).UnixMilli(), ts)
181226
// Agent creates and registers a partial transaction in the extn
182-
if tc.agentInit {
183-
require.NoError(t, b.OnAgentInit(reqID, txnID, []byte(txnData)))
227+
if tc.agentInit.init {
228+
initData := txnData
229+
if tc.agentInit.withMetadata {
230+
initData = fmt.Sprintf("%s\n%s", metadata, txnData)
231+
}
232+
require.NoError(t, b.OnAgentInit(reqID, "", []byte(initData)))
184233
}
185234
// Agent sends a request with metadata
186235
require.NoError(t, b.AddAgentData(APMData{
@@ -209,17 +258,18 @@ func TestLifecycle(t *testing.T) {
209258
}
210259
}
211260

212-
func TestIsTransactionEvent(t *testing.T) {
261+
func TestFindEventType(t *testing.T) {
213262
for _, tc := range []struct {
214263
body []byte
215-
expected bool
264+
expected eventType
216265
}{
217-
{body: []byte(`{}`), expected: false},
218-
{body: []byte(`{"tran":{}}`), expected: false},
219-
{body: []byte(`{"span":{}}`), expected: false},
220-
{body: []byte(`{"transaction":{}}`), expected: true},
266+
{body: []byte(`{}`), expected: otherEvent},
267+
{body: []byte(`{"tran":{}}`), expected: otherEvent},
268+
{body: []byte(`{"span":{}}`), expected: otherEvent},
269+
{body: []byte(`{"metadata":{}}\n{"transaction":{}}`), expected: metadataEvent},
270+
{body: []byte(`{"transaction":{}}`), expected: transactionEvent},
221271
} {
222-
assert.Equal(t, tc.expected, isTransactionEvent(tc.body))
272+
assert.Equal(t, tc.expected, findEventType(tc.body))
223273
}
224274
}
225275

apmproxy/receiver.go

+11-15
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,9 @@ import (
2929
"time"
3030

3131
"github.com/elastic/apm-aws-lambda/accumulator"
32-
"github.com/tidwall/gjson"
3332
)
3433

35-
const txnRegistrationContentType = "application/vnd.elastic.apm.transaction+json"
34+
const txnRegistrationContentType = "application/vnd.elastic.apm.transaction+ndjson"
3635

3736
// StartReceiver starts the server listening for APM agent data.
3837
func (c *Client) StartReceiver() error {
@@ -76,12 +75,12 @@ func (c *Client) Shutdown() error {
7675
// URL: http://server/
7776
func (c *Client) handleInfoRequest() (func(w http.ResponseWriter, r *http.Request), error) {
7877
// Init reverse proxy
79-
parsedApmServerUrl, err := url.Parse(c.serverURL)
78+
parsedApmServerURL, err := url.Parse(c.serverURL)
8079
if err != nil {
8180
return nil, fmt.Errorf("could not parse APM server URL: %w", err)
8281
}
8382

84-
reverseProxy := httputil.NewSingleHostReverseProxy(parsedApmServerUrl)
83+
reverseProxy := httputil.NewSingleHostReverseProxy(parsedApmServerURL)
8584

8685
reverseProxy.Transport = c.client.Transport.(*http.Transport).Clone()
8786

@@ -103,10 +102,10 @@ func (c *Client) handleInfoRequest() (func(w http.ResponseWriter, r *http.Reques
103102
r.Header.Del("X-Forwarded-For")
104103

105104
// Update headers to allow for SSL redirection
106-
r.URL.Host = parsedApmServerUrl.Host
107-
r.URL.Scheme = parsedApmServerUrl.Scheme
105+
r.URL.Host = parsedApmServerURL.Host
106+
r.URL.Scheme = parsedApmServerURL.Scheme
108107
r.Header.Set("X-Forwarded-Host", r.Header.Get("Host"))
109-
r.Host = parsedApmServerUrl.Host
108+
r.Host = parsedApmServerURL.Host
110109

111110
// Forward request to the APM server
112111
reverseProxy.ServeHTTP(w, r)
@@ -185,14 +184,11 @@ func (c *Client) handleTransactionRegistration() func(w http.ResponseWriter, r *
185184
w.WriteHeader(http.StatusBadRequest)
186185
return
187186
}
188-
txnID := gjson.GetBytes(rawBytes, "transaction.id").String()
189-
if txnID == "" {
190-
c.logger.Warn("Could not parse transaction id from transaction registration body")
191-
w.WriteHeader(http.StatusUnprocessableEntity)
192-
return
193-
}
194-
if err := c.batch.OnAgentInit(reqID, txnID, rawBytes); err != nil {
195-
c.logger.Warnf("Failed to update invocation for transaction ID %s: %v", txnID, err)
187+
188+
if err := c.batch.OnAgentInit(
189+
reqID, r.Header.Get("Content-Encoding"), rawBytes,
190+
); err != nil {
191+
c.logger.Warnf("Failed to update invocation: %w", err)
196192
w.WriteHeader(http.StatusUnprocessableEntity)
197193
return
198194
}

0 commit comments

Comments
 (0)