Skip to content

Commit 93cce05

Browse files
feat: add support for inter-server communication
Syntax: ```js // server A io.serverSideEmit("hello", "world"); // server B io.on("hello", (arg) => { console.log(arg); // prints "world" }); ``` With acknowledgements: ```js // server A io.serverSideEmit("hello", "world", (err, responses) => { console.log(responses); // prints ["hi"] }); // server B io.on("hello", (arg, callback) => { callback("hi"); }); ``` This feature replaces the customHook/customRequest API from the Redis adapter: socketio/socket.io-redis-adapter#370
1 parent dc381b7 commit 93cce05

File tree

6 files changed

+176
-43
lines changed

6 files changed

+176
-43
lines changed

lib/client.ts

+21-7
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,23 @@ interface WriteOptions {
1818

1919
export class Client<
2020
ListenEvents extends EventsMap,
21-
EmitEvents extends EventsMap
21+
EmitEvents extends EventsMap,
22+
ServerSideEvents extends EventsMap
2223
> {
2324
public readonly conn;
2425

2526
private readonly id: string;
26-
private readonly server: Server<ListenEvents, EmitEvents>;
27+
private readonly server: Server<ListenEvents, EmitEvents, ServerSideEvents>;
2728
private readonly encoder: Encoder;
2829
private readonly decoder: Decoder;
29-
private sockets: Map<SocketId, Socket<ListenEvents, EmitEvents>> = new Map();
30-
private nsps: Map<string, Socket<ListenEvents, EmitEvents>> = new Map();
30+
private sockets: Map<
31+
SocketId,
32+
Socket<ListenEvents, EmitEvents, ServerSideEvents>
33+
> = new Map();
34+
private nsps: Map<
35+
string,
36+
Socket<ListenEvents, EmitEvents, ServerSideEvents>
37+
> = new Map();
3138
private connectTimeout?: NodeJS.Timeout;
3239

3340
/**
@@ -37,7 +44,10 @@ export class Client<
3744
* @param conn
3845
* @package
3946
*/
40-
constructor(server: Server<ListenEvents, EmitEvents>, conn: any) {
47+
constructor(
48+
server: Server<ListenEvents, EmitEvents, ServerSideEvents>,
49+
conn: any
50+
) {
4151
this.server = server;
4252
this.conn = conn;
4353
this.encoder = server.encoder;
@@ -98,7 +108,11 @@ export class Client<
98108
this.server._checkNamespace(
99109
name,
100110
auth,
101-
(dynamicNspName: Namespace<ListenEvents, EmitEvents> | false) => {
111+
(
112+
dynamicNspName:
113+
| Namespace<ListenEvents, EmitEvents, ServerSideEvents>
114+
| false
115+
) => {
102116
if (dynamicNspName) {
103117
debug("dynamic namespace %s was created", dynamicNspName);
104118
this.doConnect(name, auth);
@@ -156,7 +170,7 @@ export class Client<
156170
*
157171
* @private
158172
*/
159-
_remove(socket: Socket<ListenEvents, EmitEvents>): void {
173+
_remove(socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>): void {
160174
if (this.sockets.has(socket.id)) {
161175
const nsp = this.sockets.get(socket.id)!.nsp.name;
162176
this.sockets.delete(socket.id);

lib/index.ts

+38-11
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import {
2626
DefaultEventsMap,
2727
EventParams,
2828
StrictEventEmitter,
29+
EventNames,
2930
} from "./typed-events";
3031

3132
const debug = debugModule("socket.io:server");
@@ -170,13 +171,18 @@ interface ServerOptions extends EngineAttachOptions {
170171

171172
export class Server<
172173
ListenEvents extends EventsMap = DefaultEventsMap,
173-
EmitEvents extends EventsMap = ListenEvents
174+
EmitEvents extends EventsMap = ListenEvents,
175+
ServerSideEvents extends EventsMap = {}
174176
> extends StrictEventEmitter<
175-
{},
177+
ServerSideEvents,
176178
EmitEvents,
177-
NamespaceReservedEventsMap<ListenEvents, EmitEvents>
179+
NamespaceReservedEventsMap<ListenEvents, EmitEvents, ServerSideEvents>
178180
> {
179-
public readonly sockets: Namespace<ListenEvents, EmitEvents>;
181+
public readonly sockets: Namespace<
182+
ListenEvents,
183+
EmitEvents,
184+
ServerSideEvents
185+
>;
180186
/**
181187
* A reference to the underlying Engine.IO server.
182188
*
@@ -197,10 +203,13 @@ export class Server<
197203
/**
198204
* @private
199205
*/
200-
_nsps: Map<string, Namespace<ListenEvents, EmitEvents>> = new Map();
206+
_nsps: Map<
207+
string,
208+
Namespace<ListenEvents, EmitEvents, ServerSideEvents>
209+
> = new Map();
201210
private parentNsps: Map<
202211
ParentNspNameMatchFn,
203-
ParentNamespace<ListenEvents, EmitEvents>
212+
ParentNamespace<ListenEvents, EmitEvents, ServerSideEvents>
204213
> = new Map();
205214
private _adapter?: typeof Adapter;
206215
private _serveClient: boolean;
@@ -280,7 +289,9 @@ export class Server<
280289
_checkNamespace(
281290
name: string,
282291
auth: { [key: string]: any },
283-
fn: (nsp: Namespace<ListenEvents, EmitEvents> | false) => void
292+
fn: (
293+
nsp: Namespace<ListenEvents, EmitEvents, ServerSideEvents> | false
294+
) => void
284295
): void {
285296
if (this.parentNsps.size === 0) return fn(false);
286297

@@ -589,8 +600,8 @@ export class Server<
589600
*/
590601
public of(
591602
name: string | RegExp | ParentNspNameMatchFn,
592-
fn?: (socket: Socket<ListenEvents, EmitEvents>) => void
593-
): Namespace<ListenEvents, EmitEvents> {
603+
fn?: (socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>) => void
604+
): Namespace<ListenEvents, EmitEvents, ServerSideEvents> {
594605
if (typeof name === "function" || name instanceof RegExp) {
595606
const parentNsp = new ParentNamespace(this);
596607
debug("initializing parent namespace %s", parentNsp.name);
@@ -649,7 +660,7 @@ export class Server<
649660
*/
650661
public use(
651662
fn: (
652-
socket: Socket<ListenEvents, EmitEvents>,
663+
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>,
653664
next: (err?: ExtendedError) => void
654665
) => void
655666
): this {
@@ -686,7 +697,9 @@ export class Server<
686697
* @return self
687698
* @public
688699
*/
689-
public except(name: Room | Room[]): Server<ListenEvents, EmitEvents> {
700+
public except(
701+
name: Room | Room[]
702+
): Server<ListenEvents, EmitEvents, ServerSideEvents> {
690703
this.sockets.except(name);
691704
return this;
692705
}
@@ -713,6 +726,20 @@ export class Server<
713726
return this;
714727
}
715728

729+
/**
730+
* Emit a packet to other Socket.IO servers
731+
*
732+
* @param ev - the event name
733+
* @param args - an array of arguments, which may include an acknowledgement callback at the end
734+
* @public
735+
*/
736+
public serverSideEmit<Ev extends EventNames<ServerSideEvents>>(
737+
ev: Ev,
738+
...args: EventParams<ServerSideEvents, Ev>
739+
): boolean {
740+
return this.sockets.serverSideEmit(ev, ...args);
741+
}
742+
716743
/**
717744
* Gets a list of socket ids.
718745
*

lib/namespace.ts

+57-15
Original file line numberDiff line numberDiff line change
@@ -20,35 +20,43 @@ export interface ExtendedError extends Error {
2020

2121
export interface NamespaceReservedEventsMap<
2222
ListenEvents extends EventsMap,
23-
EmitEvents extends EventsMap
23+
EmitEvents extends EventsMap,
24+
ServerSideEvents extends EventsMap
2425
> {
25-
connect: (socket: Socket<ListenEvents, EmitEvents>) => void;
26-
connection: (socket: Socket<ListenEvents, EmitEvents>) => void;
26+
connect: (socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>) => void;
27+
connection: (
28+
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>
29+
) => void;
2730
}
2831

32+
export const RESERVED_EVENTS: ReadonlySet<string | Symbol> = new Set<
33+
keyof NamespaceReservedEventsMap<never, never, never>
34+
>(<const>["connect", "connection"]);
35+
2936
export class Namespace<
3037
ListenEvents extends EventsMap = DefaultEventsMap,
31-
EmitEvents extends EventsMap = ListenEvents
38+
EmitEvents extends EventsMap = ListenEvents,
39+
ServerSideEvents extends EventsMap = {}
3240
> extends StrictEventEmitter<
33-
{},
41+
ServerSideEvents,
3442
EmitEvents,
35-
NamespaceReservedEventsMap<ListenEvents, EmitEvents>
43+
NamespaceReservedEventsMap<ListenEvents, EmitEvents, ServerSideEvents>
3644
> {
3745
public readonly name: string;
3846
public readonly sockets: Map<
3947
SocketId,
40-
Socket<ListenEvents, EmitEvents>
48+
Socket<ListenEvents, EmitEvents, ServerSideEvents>
4149
> = new Map();
4250

4351
public adapter: Adapter;
4452

4553
/** @private */
46-
readonly server: Server<ListenEvents, EmitEvents>;
54+
readonly server: Server<ListenEvents, EmitEvents, ServerSideEvents>;
4755

4856
/** @private */
4957
_fns: Array<
5058
(
51-
socket: Socket<ListenEvents, EmitEvents>,
59+
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>,
5260
next: (err?: ExtendedError) => void
5361
) => void
5462
> = [];
@@ -62,7 +70,10 @@ export class Namespace<
6270
* @param server instance
6371
* @param name
6472
*/
65-
constructor(server: Server<ListenEvents, EmitEvents>, name: string) {
73+
constructor(
74+
server: Server<ListenEvents, EmitEvents, ServerSideEvents>,
75+
name: string
76+
) {
6677
super();
6778
this.server = server;
6879
this.name = name;
@@ -88,7 +99,7 @@ export class Namespace<
8899
*/
89100
public use(
90101
fn: (
91-
socket: Socket<ListenEvents, EmitEvents>,
102+
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>,
92103
next: (err?: ExtendedError) => void
93104
) => void
94105
): this {
@@ -104,7 +115,7 @@ export class Namespace<
104115
* @private
105116
*/
106117
private run(
107-
socket: Socket<ListenEvents, EmitEvents>,
118+
socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>,
108119
fn: (err: ExtendedError | null) => void
109120
) {
110121
const fns = this._fns.slice(0);
@@ -166,10 +177,10 @@ export class Namespace<
166177
* @private
167178
*/
168179
_add(
169-
client: Client<ListenEvents, EmitEvents>,
180+
client: Client<ListenEvents, EmitEvents, ServerSideEvents>,
170181
query,
171182
fn?: () => void
172-
): Socket<ListenEvents, EmitEvents> {
183+
): Socket<ListenEvents, EmitEvents, ServerSideEvents> {
173184
debug("adding socket to nsp %s", this.name);
174185
const socket = new Socket(this, client, query);
175186
this.run(socket, (err) => {
@@ -212,7 +223,7 @@ export class Namespace<
212223
*
213224
* @private
214225
*/
215-
_remove(socket: Socket<ListenEvents, EmitEvents>): void {
226+
_remove(socket: Socket<ListenEvents, EmitEvents, ServerSideEvents>): void {
216227
if (this.sockets.has(socket.id)) {
217228
this.sockets.delete(socket.id);
218229
} else {
@@ -255,6 +266,37 @@ export class Namespace<
255266
return this;
256267
}
257268

269+
/**
270+
* Emit a packet to other Socket.IO servers
271+
*
272+
* @param ev - the event name
273+
* @param args - an array of arguments, which may include an acknowledgement callback at the end
274+
* @public
275+
*/
276+
public serverSideEmit<Ev extends EventNames<ServerSideEvents>>(
277+
ev: Ev,
278+
...args: EventParams<ServerSideEvents, Ev>
279+
): boolean {
280+
if (RESERVED_EVENTS.has(ev)) {
281+
throw new Error(`"${ev}" is a reserved event name`);
282+
}
283+
args.unshift(ev);
284+
this.adapter.serverSideEmit(args);
285+
return true;
286+
}
287+
288+
/**
289+
* Called when a packet is received from another Socket.IO server
290+
*
291+
* @param args - an array of arguments, which may include an acknowledgement callback at the end
292+
*
293+
* @private
294+
*/
295+
_onServerSideEmit(args: any[]) {
296+
const event = args.shift();
297+
this.emitUntyped(event, args);
298+
}
299+
258300
/**
259301
* Gets a list of clients.
260302
*

lib/parent-namespace.ts

+10-5
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,15 @@ import type { BroadcastOptions } from "socket.io-adapter";
1010

1111
export class ParentNamespace<
1212
ListenEvents extends EventsMap = DefaultEventsMap,
13-
EmitEvents extends EventsMap = ListenEvents
14-
> extends Namespace<ListenEvents, EmitEvents> {
13+
EmitEvents extends EventsMap = ListenEvents,
14+
ServerSideEvents extends EventsMap = {}
15+
> extends Namespace<ListenEvents, EmitEvents, ServerSideEvents> {
1516
private static count: number = 0;
16-
private children: Set<Namespace<ListenEvents, EmitEvents>> = new Set();
17+
private children: Set<
18+
Namespace<ListenEvents, EmitEvents, ServerSideEvents>
19+
> = new Set();
1720

18-
constructor(server: Server<ListenEvents, EmitEvents>) {
21+
constructor(server: Server<ListenEvents, EmitEvents, ServerSideEvents>) {
1922
super(server, "/_" + ParentNamespace.count++);
2023
}
2124

@@ -43,7 +46,9 @@ export class ParentNamespace<
4346
return true;
4447
}
4548

46-
createChild(name: string): Namespace<ListenEvents, EmitEvents> {
49+
createChild(
50+
name: string
51+
): Namespace<ListenEvents, EmitEvents, ServerSideEvents> {
4752
const namespace = new Namespace(this.server, name);
4853
namespace._fns = this._fns.slice(0);
4954
this.listeners("connect").forEach((listener) =>

lib/socket.ts

+6-5
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ export interface EventEmitterReservedEventsMap {
4646

4747
export const RESERVED_EVENTS: ReadonlySet<string | Symbol> = new Set<
4848
| ClientReservedEvents
49-
| keyof NamespaceReservedEventsMap<never, never>
49+
| keyof NamespaceReservedEventsMap<never, never, never>
5050
| keyof SocketReservedEventsMap
5151
| keyof EventEmitterReservedEventsMap
5252
>(<const>[
@@ -110,7 +110,8 @@ export interface Handshake {
110110

111111
export class Socket<
112112
ListenEvents extends EventsMap = DefaultEventsMap,
113-
EmitEvents extends EventsMap = ListenEvents
113+
EmitEvents extends EventsMap = ListenEvents,
114+
ServerSideEvents extends EventsMap = {}
114115
> extends StrictEventEmitter<
115116
ListenEvents,
116117
EmitEvents,
@@ -126,7 +127,7 @@ export class Socket<
126127
public connected: boolean;
127128
public disconnected: boolean;
128129

129-
private readonly server: Server<ListenEvents, EmitEvents>;
130+
private readonly server: Server<ListenEvents, EmitEvents, ServerSideEvents>;
130131
private readonly adapter: Adapter;
131132
private acks: Map<number, () => void> = new Map();
132133
private fns: Array<
@@ -144,8 +145,8 @@ export class Socket<
144145
* @package
145146
*/
146147
constructor(
147-
readonly nsp: Namespace<ListenEvents, EmitEvents>,
148-
readonly client: Client<ListenEvents, EmitEvents>,
148+
readonly nsp: Namespace<ListenEvents, EmitEvents, ServerSideEvents>,
149+
readonly client: Client<ListenEvents, EmitEvents, ServerSideEvents>,
149150
auth: object
150151
) {
151152
super();

0 commit comments

Comments
 (0)