-
Notifications
You must be signed in to change notification settings - Fork 23
/
Copy pathamqp-base-client.ts
611 lines (588 loc) · 25.3 KB
/
amqp-base-client.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
import { AMQPChannel } from './amqp-channel.js'
import { AMQPError } from './amqp-error.js'
import { AMQPMessage } from './amqp-message.js'
import { AMQPView } from './amqp-view.js'
import type { Logger } from './types.js'
const VERSION = '3.1.1'
/**
* Base class for AMQPClients.
* Implements everything except how to connect, send data and close the socket
*/
export abstract class AMQPBaseClient {
vhost: string
username: string
password: string
name?: string
platform?: string
channels: AMQPChannel[]
protected connectPromise?: [(conn: AMQPBaseClient) => void, (err: Error) => void]
protected closePromise?: [(value?: void) => void, (err: Error) => void]
protected onUpdateSecretOk?: (value?: void) => void
closed = true
blocked?: string
channelMax = 0
frameMax: number
heartbeat: number
onerror: (error: AMQPError) => void
logger: Logger | null | undefined = console
/** Used for string -> arraybuffer when publishing */
readonly textEncoder: InstanceType<typeof TextEncoder> = new TextEncoder()
// Buffer pool for publishes, let multiple microtasks publish at the same time but save on allocations
readonly bufferPool: AMQPView[] = []
/**
* @param name - name of the connection, set in client properties
* @param platform - used in client properties
*/
constructor(vhost: string, username: string, password: string, name?: string, platform?: string, frameMax = 4096, heartbeat = 0, channelMax = 0) {
this.vhost = vhost
this.username = username
this.password = ""
Object.defineProperty(this, 'password', {
value: password,
enumerable: false // hide it from console.log etc.
})
if (name) this.name = name // connection name
if (platform) this.platform = platform
this.channels = [new AMQPChannel(this, 0)]
this.onerror = (error: AMQPError) => this.logger?.error("amqp-client connection closed", error.message)
if (frameMax < 4096) throw new Error("frameMax must be 4096 or larger")
this.frameMax = frameMax
if (heartbeat < 0) throw new Error("heartbeat must be positive")
this.heartbeat = heartbeat
if (channelMax && channelMax < 0) throw new Error("channelMax must be positive")
this.channelMax = channelMax
}
/**
* Open a channel
* @param [id] - An existing or non existing specific channel
*/
channel(id?: number): Promise<AMQPChannel> {
if (this.closed) return this.rejectClosed()
if (id && id > 0) {
const channel = this.channels[id]
if (channel) return Promise.resolve(channel)
}
// Store channels in an array, set position to null when channel is closed
// Look for first null value or add one the end
if (!id)
id = this.channels.findIndex((ch) => ch === undefined)
if (id === -1) id = this.channels.length
if (id > this.channelMax && this.channelMax > 0) return Promise.reject(new AMQPError("Max number of channels reached", this))
const channel = new AMQPChannel(this, id)
this.channels[id] = channel
return channel.open()
}
/**
* Gracefully close the AMQP connection
* @param [reason] might be logged by the server
*/
close(reason = "", code = 200): Promise<void> {
if (this.closed) return this.rejectClosed()
this.closed = true
let j = 0
const frame = new AMQPView(new ArrayBuffer(512))
frame.setUint8(j, 1); j += 1 // type: method
frame.setUint16(j, 0); j += 2 // channel: 0
frame.setUint32(j, 0); j += 4 // frameSize
frame.setUint16(j, 10); j += 2 // class: connection
frame.setUint16(j, 50); j += 2 // method: close
frame.setUint16(j, code); j += 2 // reply code
j += frame.setShortString(j, reason) // reply reason
frame.setUint16(j, 0); j += 2 // failing-class-id
frame.setUint16(j, 0); j += 2 // failing-method-id
frame.setUint8(j, 206); j += 1 // frame end byte
frame.setUint32(3, j - 8) // update frameSize
return new Promise((resolve, reject) => {
this.send(new Uint8Array(frame.buffer, 0, j))
.then(() => this.closePromise = [resolve, reject])
.catch(reject)
})
}
updateSecret(newSecret: string, reason: string) {
let j = 0
const frame = new AMQPView(new ArrayBuffer(4096))
frame.setUint8(j, 1); j += 1 // type: method
frame.setUint16(j, 0); j += 2 // channel: 0
frame.setUint32(j, 0); j += 4 // frameSize
frame.setUint16(j, 10); j += 2 // class: connection
frame.setUint16(j, 70); j += 2 // method: update-secret
j += frame.setLongString(j, newSecret) // new secret
j += frame.setShortString(j, reason) // reason
frame.setUint8(j, 206); j += 1 // frame end byte
frame.setUint32(3, j - 8) // update frameSize
return new Promise((resolve, reject) => {
this.send(new Uint8Array(frame.buffer, 0, j))
.then(() => this.onUpdateSecretOk = resolve)
.catch(reject)
})
}
/**
* Try establish a connection
*/
abstract connect(): Promise<AMQPBaseClient>
/**
* @ignore
* @param bytes to send
* @return fulfilled when the data is enqueued
*/
abstract send(bytes: Uint8Array): Promise<void>
protected abstract closeSocket(): void
private rejectClosed() {
return Promise.reject(new AMQPError("Connection closed", this))
}
private rejectConnect(err: Error): void {
if (this.connectPromise) {
const [, reject] = this.connectPromise
delete this.connectPromise
reject(err)
}
this.closed = true
this.closeSocket()
}
/**
* Parse and act on frames in an AMQPView
* @ignore
*/
protected parseFrames(view: AMQPView): void {
// Can possibly be multiple AMQP frames in a single WS frame
for (let i = 0; i < view.byteLength;) {
let j = 0 // position in outgoing frame
const type = view.getUint8(i); i += 1
const channelId = view.getUint16(i); i += 2
const frameSize = view.getUint32(i); i += 4
let frameEnd = 0
try {
frameEnd = view.getUint8(i + frameSize)
} catch {
throw (new AMQPError(`Frame end out of range, frameSize=${frameSize}, pos=${i}, byteLength=${view.byteLength}`, this))
}
if (frameEnd !== 206)
throw (new AMQPError(`Invalid frame end ${frameEnd}, expected 206`, this))
const channel = this.channels[channelId]
if (!channel) {
this.logger?.warn("AMQP channel", channelId, "not open")
i += frameSize + 1
continue
}
switch (type) {
case 1: { // method
const classId = view.getUint16(i); i += 2
const methodId = view.getUint16(i); i += 2
switch (classId) {
case 10: { // connection
switch (methodId) {
case 10: { // start
// ignore start frame, just reply startok
i += frameSize - 4
const startOk = new AMQPView(new ArrayBuffer(4096))
startOk.setUint8(j, 1); j += 1 // type: method
startOk.setUint16(j, 0); j += 2 // channel: 0
startOk.setUint32(j, 0); j += 4 // frameSize: to be updated
startOk.setUint16(j, 10); j += 2 // class: connection
startOk.setUint16(j, 11); j += 2 // method: startok
const clientProps = {
connection_name: this.name || undefined,
product: "amqp-client.js",
information: "https://github.com/cloudamqp/amqp-client.js",
version: VERSION,
platform: this.platform,
capabilities: {
"authentication_failure_close": true,
"basic.nack": true,
"connection.blocked": true,
"consumer_cancel_notify": true,
"exchange_exchange_bindings": true,
"per_consumer_qos": true,
"publisher_confirms": true,
}
}
j += startOk.setTable(j, clientProps) // client properties
j += startOk.setShortString(j, "PLAIN") // mechanism
const response = `\u0000${this.username}\u0000${this.password}`
j += startOk.setLongString(j, response) // response
j += startOk.setShortString(j, "") // locale
startOk.setUint8(j, 206); j += 1 // frame end byte
startOk.setUint32(3, j - 8) // update frameSize
this.send(new Uint8Array(startOk.buffer, 0, j)).catch(this.rejectConnect)
break
}
case 30: { // tune
const channelMax = view.getUint16(i); i += 2
const frameMax = view.getUint32(i); i += 4
const heartbeat = view.getUint16(i); i += 2
this.channelMax = this.channelMax === 0 ? channelMax : Math.min(this.channelMax, channelMax)
this.frameMax = this.frameMax === 0 ? frameMax : Math.min(this.frameMax, frameMax)
this.heartbeat = this.heartbeat === 0 ? 0 : Math.min(this.heartbeat, heartbeat)
const tuneOk = new AMQPView(new ArrayBuffer(20))
tuneOk.setUint8(j, 1); j += 1 // type: method
tuneOk.setUint16(j, 0); j += 2 // channel: 0
tuneOk.setUint32(j, 12); j += 4 // frameSize: 12
tuneOk.setUint16(j, 10); j += 2 // class: connection
tuneOk.setUint16(j, 31); j += 2 // method: tuneok
tuneOk.setUint16(j, this.channelMax); j += 2 // channel max
tuneOk.setUint32(j, this.frameMax); j += 4 // frame max
tuneOk.setUint16(j, this.heartbeat); j += 2 // heartbeat
tuneOk.setUint8(j, 206); j += 1 // frame end byte
this.send(new Uint8Array(tuneOk.buffer, 0, j)).catch(this.rejectConnect)
j = 0
const open = new AMQPView(new ArrayBuffer(512))
open.setUint8(j, 1); j += 1 // type: method
open.setUint16(j, 0); j += 2 // channel: 0
open.setUint32(j, 0); j += 4 // frameSize: to be updated
open.setUint16(j, 10); j += 2 // class: connection
open.setUint16(j, 40); j += 2 // method: open
j += open.setShortString(j, this.vhost) // vhost
open.setUint8(j, 0); j += 1 // reserved1
open.setUint8(j, 0); j += 1 // reserved2
open.setUint8(j, 206); j += 1 // frame end byte
open.setUint32(3, j - 8) // update frameSize
this.send(new Uint8Array(open.buffer, 0, j)).catch(this.rejectConnect)
break
}
case 41: { // openok
i += 1 // reserved1
this.closed = false
const promise = this.connectPromise
if (promise) {
const [resolve,] = promise
delete this.connectPromise
resolve(this)
}
break
}
case 50: { // close
const code = view.getUint16(i); i += 2
const [text, strLen] = view.getShortString(i); i += strLen
const classId = view.getUint16(i); i += 2
const methodId = view.getUint16(i); i += 2
this.logger?.debug("connection closed by server", code, text, classId, methodId)
const msg = `connection closed: ${text} (${code})`
const err = new AMQPError(msg, this)
this.channels.forEach((ch) => ch.setClosed(err))
this.channels = [new AMQPChannel(this, 0)]
const closeOk = new AMQPView(new ArrayBuffer(12))
closeOk.setUint8(j, 1); j += 1 // type: method
closeOk.setUint16(j, 0); j += 2 // channel: 0
closeOk.setUint32(j, 4); j += 4 // frameSize
closeOk.setUint16(j, 10); j += 2 // class: connection
closeOk.setUint16(j, 51); j += 2 // method: closeok
closeOk.setUint8(j, 206); j += 1 // frame end byte
this.send(new Uint8Array(closeOk.buffer, 0, j))
.catch(err => this.logger?.warn("Error while sending Connection#CloseOk", err))
this.onerror(err)
this.rejectConnect(err)
this.onUpdateSecretOk?.()
break
}
case 51: { // closeOk
this.channels.forEach((ch) => ch.setClosed())
this.channels = [new AMQPChannel(this, 0)]
const promise = this.closePromise
if (promise) {
const [resolve,] = promise
delete this.closePromise
resolve()
this.closeSocket()
}
break
}
case 60: { // blocked
const [reason, len] = view.getShortString(i); i += len
this.logger?.warn("AMQP connection blocked:", reason)
this.blocked = reason
break
}
case 61: { // unblocked
this.logger?.info("AMQP connection unblocked")
delete this.blocked
break
}
case 71: { // update-secret-ok
console.info("AMQP connection update secret ok")
this.onUpdateSecretOk?.()
delete this.onUpdateSecretOk
break
}
default:
i += frameSize - 4
this.logger?.error("unsupported class/method id", classId, methodId)
}
break
}
case 20: { // channel
switch (methodId) {
case 11: { // openok
i += 4 // reserved1 (long string)
channel.resolveRPC(channel)
break
}
case 21: { // flowOk
const active = view.getUint8(i) !== 0; i += 1
channel.resolveRPC(active)
break
}
case 40: { // close
const code = view.getUint16(i); i += 2
const [text, strLen] = view.getShortString(i); i += strLen
const classId = view.getUint16(i); i += 2
const methodId = view.getUint16(i); i += 2
this.logger?.debug("channel", channelId, "closed", code, text, classId, methodId)
const msg = `channel ${channelId} closed: ${text} (${code})`
const err = new AMQPError(msg, this)
channel.setClosed(err)
delete this.channels[channelId]
const closeOk = new AMQPView(new ArrayBuffer(12))
closeOk.setUint8(j, 1); j += 1 // type: method
closeOk.setUint16(j, channelId); j += 2 // channel
closeOk.setUint32(j, 4); j += 4 // frameSize
closeOk.setUint16(j, 20); j += 2 // class: channel
closeOk.setUint16(j, 41); j += 2 // method: closeok
closeOk.setUint8(j, 206); j += 1 // frame end byte
this.send(new Uint8Array(closeOk.buffer, 0, j))
.catch(err => this.logger?.error("Error while sending Channel#closeOk", err))
break
}
case 41: { // closeOk
channel.setClosed()
delete this.channels[channelId]
channel.resolveRPC()
break
}
default:
i += frameSize - 4 // skip rest of frame
this.logger?.error("unsupported class/method id", classId, methodId)
}
break
}
case 40: { // exchange
switch (methodId) {
case 11: // declareOk
case 21: // deleteOk
case 31: // bindOk
case 51: { // unbindOk
channel.resolveRPC()
break
}
default:
i += frameSize - 4 // skip rest of frame
this.logger?.error("unsupported class/method id", classId, methodId)
}
break
}
case 50: { // queue
switch (methodId) {
case 11: { // declareOk
const [name, strLen] = view.getShortString(i); i += strLen
const messageCount = view.getUint32(i); i += 4
const consumerCount = view.getUint32(i); i += 4
channel.resolveRPC({ name, messageCount, consumerCount })
break
}
case 21: { // bindOk
channel.resolveRPC()
break
}
case 31: { // purgeOk
const messageCount = view.getUint32(i); i += 4
channel.resolveRPC({ messageCount })
break
}
case 41: { // deleteOk
const messageCount = view.getUint32(i); i += 4
channel.resolveRPC({ messageCount })
break
}
case 51: { // unbindOk
channel.resolveRPC()
break
}
default:
i += frameSize - 4
this.logger?.error("unsupported class/method id", classId, methodId)
}
break
}
case 60: { // basic
switch (methodId) {
case 11: { // qosOk
channel.resolveRPC()
break
}
case 21: { // consumeOk
const [consumerTag, len] = view.getShortString(i); i += len
channel.resolveRPC(consumerTag)
break
}
case 30: { // cancel
const [consumerTag, len] = view.getShortString(i); i += len
const noWait = view.getUint8(i) === 1; i += 1
const consumer = channel.consumers.get(consumerTag)
if (consumer) {
consumer.setClosed(new AMQPError("Consumer cancelled by the server", this))
channel.consumers.delete(consumerTag)
}
if (!noWait) {
const frame = new AMQPView(new ArrayBuffer(512))
frame.setUint8(j, 1); j += 1 // type: method
frame.setUint16(j, channel.id); j += 2 // channel
frame.setUint32(j, 0); j += 4 // frameSize
frame.setUint16(j, 60); j += 2 // class: basic
frame.setUint16(j, 31); j += 2 // method: cancelOk
j += frame.setShortString(j, consumerTag) // tag
frame.setUint8(j, 206); j += 1 // frame end byte
frame.setUint32(3, j - 8) // update frameSize
this.send(new Uint8Array(frame.buffer, 0, j))
}
break
}
case 31: { // cancelOk
const [consumerTag, len] = view.getShortString(i); i += len
channel.resolveRPC(consumerTag)
break
}
case 50: { // return
const code = view.getUint16(i); i += 2
const [text, len] = view.getShortString(i); i += len
const [exchange, exchangeLen] = view.getShortString(i); i += exchangeLen
const [routingKey, routingKeyLen] = view.getShortString(i); i += routingKeyLen
const message = new AMQPMessage(channel)
message.exchange = exchange
message.routingKey = routingKey
message.replyCode = code
message.replyText = text
channel.returned = message
break
}
case 60: { // deliver
const [consumerTag, consumerTagLen] = view.getShortString(i); i += consumerTagLen
const deliveryTag = view.getUint64(i); i += 8
const redelivered = view.getUint8(i) === 1; i += 1
const [exchange, exchangeLen] = view.getShortString(i); i += exchangeLen
const [routingKey, routingKeyLen] = view.getShortString(i); i += routingKeyLen
const message = new AMQPMessage(channel)
message.consumerTag = consumerTag
message.deliveryTag = deliveryTag
message.exchange = exchange
message.routingKey = routingKey
message.redelivered = redelivered
channel.delivery = message
break
}
case 71: { // getOk
const deliveryTag = view.getUint64(i); i += 8
const redelivered = view.getUint8(i) === 1; i += 1
const [exchange, exchangeLen] = view.getShortString(i); i += exchangeLen
const [routingKey, routingKeyLen] = view.getShortString(i); i += routingKeyLen
const messageCount = view.getUint32(i); i += 4
const message = new AMQPMessage(channel)
message.deliveryTag = deliveryTag
message.redelivered = redelivered
message.exchange = exchange
message.routingKey = routingKey
message.messageCount = messageCount
channel.getMessage = message
break
}
case 72: { // getEmpty
const [, len] = view.getShortString(i); i += len // reserved1
channel.resolveRPC(null)
break
}
case 80: { // confirm ack
const deliveryTag = view.getUint64(i); i += 8
const multiple = view.getUint8(i) === 1; i += 1
channel.publishConfirmed(deliveryTag, multiple, false)
break
}
case 111: { // recoverOk
channel.resolveRPC()
break
}
case 120: { // confirm nack
const deliveryTag = view.getUint64(i); i += 8
const multiple = view.getUint8(i) === 1; i += 1
channel.publishConfirmed(deliveryTag, multiple, true)
break
}
default:
i += frameSize - 4
this.logger?.error("unsupported class/method id", classId, methodId)
}
break
}
case 85: { // confirm
switch (methodId) {
case 11: { // selectOk
channel.confirmId = 1
channel.resolveRPC()
break
}
default:
i += frameSize - 4
this.logger?.error("unsupported class/method id", classId, methodId)
}
break
}
case 90: { // tx / transaction
switch (methodId) {
case 11: // selectOk
case 21: // commitOk
case 31: { // rollbackOk
channel.resolveRPC()
break
}
default:
i += frameSize - 4
this.logger?.error("unsupported class/method id", classId, methodId)
}
break
}
default:
i += frameSize - 2
this.logger?.error("unsupported class id", classId)
}
break
}
case 2: { // header
i += 4 // ignoring class id and weight
const bodySize = view.getUint64(i); i += 8
const [properties, propLen] = view.getProperties(i); i += propLen
const message = channel.delivery || channel.getMessage || channel.returned
if (message) {
message.bodySize = bodySize
message.properties = properties
message.body = new Uint8Array(bodySize)
if (bodySize === 0)
channel.onMessageReady(message)
} else {
this.logger?.warn("Header frame but no message")
}
break
}
case 3: { // body
const message = channel.delivery || channel.getMessage || channel.returned
if (message && message.body) {
const bodyPart = new Uint8Array(view.buffer, view.byteOffset + i, frameSize)
message.body.set(bodyPart, message.bodyPos)
message.bodyPos += frameSize
i += frameSize
if (message.bodyPos === message.bodySize)
channel.onMessageReady(message)
} else {
this.logger?.warn("Body frame but no message")
}
break
}
case 8: { // heartbeat
const heartbeat = new Uint8Array([8, 0, 0, 0, 0, 0, 0, 206])
this.send(heartbeat).catch(err => this.logger?.warn("Error while sending heartbeat", err))
break
}
default:
this.logger?.error("invalid frame type:", type)
i += frameSize
}
i += 1 // frame end
}
}
}