3
3
namespace BeyondCode \LaravelWebSockets \ChannelManagers ;
4
4
5
5
use BeyondCode \LaravelWebSockets \Channels \Channel ;
6
+ use BeyondCode \LaravelWebSockets \Helpers ;
7
+ use BeyondCode \LaravelWebSockets \Server \MockableConnection ;
8
+ use Carbon \Carbon ;
6
9
use Clue \React \Redis \Client ;
7
10
use Clue \React \Redis \Factory ;
11
+ use Illuminate \Cache \RedisLock ;
12
+ use Illuminate \Support \Facades \Redis ;
8
13
use Illuminate \Support \Str ;
9
14
use Ratchet \ConnectionInterface ;
15
+ use Ratchet \WebSocket \WsConnection ;
10
16
use React \EventLoop \LoopInterface ;
11
17
use React \Promise \PromiseInterface ;
12
18
use stdClass ;
@@ -41,6 +47,21 @@ class RedisChannelManager extends LocalChannelManager
41
47
*/
42
48
protected $ subscribeClient ;
43
49
50
+ /**
51
+ * The Redis manager instance.
52
+ *
53
+ * @var \Illuminate\Redis\RedisManager
54
+ */
55
+ protected $ redis ;
56
+
57
+ /**
58
+ * The lock name to use on Redis to avoid multiple
59
+ * actions that might lead to multiple processings.
60
+ *
61
+ * @var string
62
+ */
63
+ protected static $ redisLockName = 'laravel-websockets:channel-manager:lock ' ;
64
+
44
65
/**
45
66
* Create a new channel manager instance.
46
67
*
@@ -52,6 +73,10 @@ public function __construct(LoopInterface $loop, $factoryClass = null)
52
73
{
53
74
$ this ->loop = $ loop ;
54
75
76
+ $ this ->redis = Redis::connection (
77
+ config ('websockets.replication.modes.redis.connection ' , 'default ' )
78
+ );
79
+
55
80
$ connectionUri = $ this ->getConnectionUri ();
56
81
57
82
$ factoryClass = $ factoryClass ?: Factory::class;
@@ -141,6 +166,8 @@ public function subscribeToChannel(ConnectionInterface $connection, string $chan
141
166
}
142
167
});
143
168
169
+ $ this ->addConnectionToSet ($ connection );
170
+
144
171
$ this ->addChannelToSet (
145
172
$ connection ->app ->id , $ channelName
146
173
);
@@ -167,8 +194,14 @@ public function unsubscribeFromChannel(ConnectionInterface $connection, string $
167
194
if ($ count === 0 ) {
168
195
$ this ->unsubscribeFromTopic ($ connection ->app ->id , $ channelName );
169
196
197
+ $ this ->removeUserData (
198
+ $ connection ->app ->id , $ channelName , $ connection ->socketId
199
+ );
200
+
170
201
$ this ->removeChannelFromSet ($ connection ->app ->id , $ channelName );
171
202
203
+ $ this ->removeConnectionFromSet ($ connection );
204
+
172
205
return ;
173
206
}
174
207
@@ -179,7 +212,13 @@ public function unsubscribeFromChannel(ConnectionInterface $connection, string $
179
212
if ($ count < 1 ) {
180
213
$ this ->unsubscribeFromTopic ($ connection ->app ->id , $ channelName );
181
214
215
+ $ this ->removeUserData (
216
+ $ connection ->app ->id , $ channelName , $ connection ->socketId
217
+ );
218
+
182
219
$ this ->removeChannelFromSet ($ connection ->app ->id , $ channelName );
220
+
221
+ $ this ->removeConnectionFromSet ($ connection );
183
222
}
184
223
});
185
224
});
@@ -304,12 +343,8 @@ public function getChannelMembers($appId, string $channel): PromiseInterface
304
343
{
305
344
return $ this ->publishClient
306
345
->hgetall ($ this ->getRedisKey ($ appId , $ channel , ['users ' ]))
307
- ->then (function ($ members ) {
308
- [$ keys , $ values ] = collect ($ members )->partition (function ($ value , $ key ) {
309
- return $ key % 2 === 0 ;
310
- });
311
-
312
- return collect (array_combine ($ keys ->all (), $ values ->all ()))
346
+ ->then (function ($ list ) {
347
+ return collect (Helpers::redisListToArray ($ list ))
313
348
->map (function ($ user ) {
314
349
return json_decode ($ user );
315
350
})
@@ -355,6 +390,43 @@ public function getChannelsMembersCount($appId, array $channelNames): PromiseInt
355
390
});
356
391
}
357
392
393
+ /**
394
+ * Keep tracking the connections availability when they pong.
395
+ *
396
+ * @param \Ratchet\ConnectionInterface $connection
397
+ * @return bool
398
+ */
399
+ public function connectionPonged (ConnectionInterface $ connection ): bool
400
+ {
401
+ // This will update the score with the current timestamp.
402
+ $ this ->addConnectionToSet ($ connection );
403
+
404
+ return parent ::connectionPonged ($ connection );
405
+ }
406
+
407
+ /**
408
+ * Remove the obsolete connections that didn't ponged in a while.
409
+ *
410
+ * @return bool
411
+ */
412
+ public function removeObsoleteConnections (): bool
413
+ {
414
+ $ this ->lock ()->get (function () {
415
+ $ this ->getConnectionsFromSet (0 , now ()->subMinutes (2 )->format ('U ' ))
416
+ ->then (function ($ connections ) {
417
+ foreach ($ connections as $ connection => $ score ) {
418
+ [$ appId , $ socketId ] = explode (': ' , $ connection );
419
+
420
+ $ this ->unsubscribeFromAllChannels (
421
+ $ this ->fakeConnectionForApp ($ appId , $ socketId )
422
+ );
423
+ }
424
+ });
425
+ });
426
+
427
+ return parent ::removeObsoleteConnections ();
428
+ }
429
+
358
430
/**
359
431
* Handle a message received from Redis on a specific channel.
360
432
*
@@ -473,6 +545,57 @@ public function decrementSubscriptionsCount($appId, string $channel = null, int
473
545
return $ this ->incrementSubscriptionsCount ($ appId , $ channel , $ increment * -1 );
474
546
}
475
547
548
+ /**
549
+ * Add the connection to the sorted list.
550
+ *
551
+ * @param \Ratchet\ConnectionInterface $connection
552
+ * @param \DateTime|string|null $moment
553
+ * @return void
554
+ */
555
+ public function addConnectionToSet (ConnectionInterface $ connection , $ moment = null )
556
+ {
557
+ $ this ->getPublishClient ()
558
+ ->zadd (
559
+ $ this ->getRedisKey (null , null , ['sockets ' ]),
560
+ Carbon::parse ($ moment )->format ('U ' ), "{$ connection ->app ->id }: {$ connection ->socketId }"
561
+ );
562
+ }
563
+
564
+ /**
565
+ * Remove the connection from the sorted list.
566
+ *
567
+ * @param \Ratchet\ConnectionInterface $connection
568
+ * @return void
569
+ */
570
+ public function removeConnectionFromSet (ConnectionInterface $ connection )
571
+ {
572
+ $ this ->getPublishClient ()
573
+ ->zrem (
574
+ $ this ->getRedisKey (null , null , ['sockets ' ]),
575
+ "{$ connection ->app ->id }: {$ connection ->socketId }"
576
+ );
577
+ }
578
+
579
+ /**
580
+ * Get the connections from the sorted list, with last
581
+ * connection between certain timestamps.
582
+ *
583
+ * @param int $start
584
+ * @param int $stop
585
+ * @return PromiseInterface
586
+ */
587
+ public function getConnectionsFromSet (int $ start = 0 , int $ stop = 0 )
588
+ {
589
+ return $ this ->getPublishClient ()
590
+ ->zrange (
591
+ $ this ->getRedisKey (null , null , ['sockets ' ]),
592
+ $ start , $ stop , 'withscores '
593
+ )
594
+ ->then (function ($ list ) {
595
+ return Helpers::redisListToArray ($ list );
596
+ });
597
+ }
598
+
476
599
/**
477
600
* Add a channel to the set list.
478
601
*
@@ -566,11 +689,11 @@ public function unsubscribeFromTopic($appId, string $channel = null)
566
689
* Get the Redis Keyspace name to handle subscriptions
567
690
* and other key-value sets.
568
691
*
569
- * @param mixed $appId
692
+ * @param string|int|null $appId
570
693
* @param string|null $channel
571
694
* @return string
572
695
*/
573
- public function getRedisKey ($ appId , string $ channel = null , array $ suffixes = []): string
696
+ public function getRedisKey ($ appId = null , string $ channel = null , array $ suffixes = []): string
574
697
{
575
698
$ prefix = config ('database.redis.options.prefix ' , null );
576
699
@@ -588,4 +711,28 @@ public function getRedisKey($appId, string $channel = null, array $suffixes = []
588
711
589
712
return $ hash ;
590
713
}
714
+
715
+ /**
716
+ * Get a new RedisLock instance to avoid race conditions.
717
+ *
718
+ * @return \Illuminate\Cache\CacheLock
719
+ */
720
+ protected function lock ()
721
+ {
722
+ return new RedisLock ($ this ->redis , static ::$ redisLockName , 0 );
723
+ }
724
+
725
+ /**
726
+ * Create a fake connection for app that will mimick a connection
727
+ * by app ID and Socket ID to be able to be passed to the methods
728
+ * that accepts a connection class.
729
+ *
730
+ * @param string|int $appId
731
+ * @param string $socketId
732
+ * @return ConnectionInterface
733
+ */
734
+ public function fakeConnectionForApp ($ appId , string $ socketId )
735
+ {
736
+ return new MockableConnection ($ appId , $ socketId );
737
+ }
591
738
}
0 commit comments