42
42
43
43
using System ;
44
44
using System . Collections . Generic ;
45
- using System . Diagnostics ;
46
- using System . IO ;
47
- using System . Linq ;
48
45
using System . Text ;
49
- using System . Text . RegularExpressions ;
50
46
using System . Threading ;
51
47
52
48
using NUnit . Framework ;
53
49
54
- using RabbitMQ . Client . Framing ;
55
50
using RabbitMQ . Client . Framing . Impl ;
51
+ using static RabbitMQ . Client . Unit . RabbitMQCtl ;
56
52
57
53
namespace RabbitMQ . Client . Unit
58
54
{
@@ -62,7 +58,6 @@ public class IntegrationFixture
62
58
internal IConnectionFactory ConnFactory ;
63
59
internal IConnection Conn ;
64
60
internal IModel Model ;
65
-
66
61
internal Encoding encoding = new UTF8Encoding ( ) ;
67
62
public static TimeSpan RECOVERY_INTERVAL = TimeSpan . FromSeconds ( 2 ) ;
68
63
@@ -420,276 +415,62 @@ internal void WaitOn(object o)
420
415
}
421
416
}
422
417
423
- //
424
- // Shelling Out
425
- //
426
-
427
- internal Process ExecRabbitMQCtl ( string args )
428
- {
429
- // Allow the path to the rabbitmqctl.bat to be set per machine
430
- string envVariable = Environment . GetEnvironmentVariable ( "RABBITMQ_RABBITMQCTL_PATH" ) ;
431
- string rabbitmqctlPath ;
432
-
433
- if ( envVariable != null )
434
- {
435
- var regex = new Regex ( @"^DOCKER:(?<dockerMachine>.+)$" ) ;
436
- Match match = regex . Match ( envVariable ) ;
437
-
438
- if ( match . Success )
439
- {
440
- return ExecRabbitMqCtlUsingDocker ( args , match . Groups [ "dockerMachine" ] . Value ) ;
441
- } else {
442
- rabbitmqctlPath = envVariable ;
443
- }
444
- }
445
- else
446
- {
447
- // provided by the umbrella
448
- string umbrellaRabbitmqctlPath ;
449
- // provided in PATH by a RabbitMQ installation
450
- string providedRabbitmqctlPath ;
451
-
452
- if ( IsRunningOnMonoOrDotNetCore ( ) )
453
- {
454
- umbrellaRabbitmqctlPath = "../../../../../../rabbit/scripts/rabbitmqctl" ;
455
- providedRabbitmqctlPath = "rabbitmqctl" ;
456
- } else {
457
- umbrellaRabbitmqctlPath = @"..\..\..\..\..\..\rabbit\scripts\rabbitmqctl.bat" ;
458
- providedRabbitmqctlPath = "rabbitmqctl.bat" ;
459
- }
460
-
461
- if ( File . Exists ( umbrellaRabbitmqctlPath ) ) {
462
- rabbitmqctlPath = umbrellaRabbitmqctlPath ;
463
- } else {
464
- rabbitmqctlPath = providedRabbitmqctlPath ;
465
- }
466
- }
467
-
468
- return ExecCommand ( rabbitmqctlPath , args ) ;
469
- }
470
-
471
- private Process ExecRabbitMqCtlUsingDocker ( string args , string dockerMachineName )
472
- {
473
- var proc = new Process
474
- {
475
- StartInfo =
476
- {
477
- CreateNoWindow = true ,
478
- UseShellExecute = false
479
- }
480
- } ;
481
-
482
- try {
483
- proc . StartInfo . FileName = "docker" ;
484
- proc . StartInfo . Arguments = $ "exec { dockerMachineName } rabbitmqctl { args } ";
485
- proc . StartInfo . RedirectStandardError = true ;
486
- proc . StartInfo . RedirectStandardOutput = true ;
487
-
488
- proc . Start ( ) ;
489
- string stderr = proc . StandardError . ReadToEnd ( ) ;
490
- proc . WaitForExit ( ) ;
491
- if ( stderr . Length > 0 || proc . ExitCode > 0 )
492
- {
493
- string stdout = proc . StandardOutput . ReadToEnd ( ) ;
494
- ReportExecFailure ( "rabbitmqctl" , args , $ "{ stderr } \n { stdout } ") ;
495
- }
496
-
497
- return proc ;
498
- }
499
- catch ( Exception e )
500
- {
501
- ReportExecFailure ( "rabbitmqctl" , args , e . Message ) ;
502
- throw ;
503
- }
504
- }
505
-
506
- internal Process ExecCommand ( string command )
507
- {
508
- return ExecCommand ( command , "" ) ;
509
- }
510
-
511
- internal Process ExecCommand ( string command , string args )
512
- {
513
- return ExecCommand ( command , args , null ) ;
514
- }
515
-
516
- internal Process ExecCommand ( string ctl , string args , string changeDirTo )
517
- {
518
- var proc = new Process
519
- {
520
- StartInfo =
521
- {
522
- CreateNoWindow = true ,
523
- UseShellExecute = false
524
- }
525
- } ;
526
- if ( changeDirTo != null )
527
- {
528
- proc . StartInfo . WorkingDirectory = changeDirTo ;
529
- }
530
-
531
- string cmd ;
532
- if ( IsRunningOnMonoOrDotNetCore ( ) ) {
533
- cmd = ctl ;
534
- } else {
535
- cmd = "cmd.exe" ;
536
- args = $ "/c \" \" { ctl } \" { args } \" ";
537
- }
538
-
539
- try {
540
- proc . StartInfo . FileName = cmd ;
541
- proc . StartInfo . Arguments = args ;
542
- proc . StartInfo . RedirectStandardError = true ;
543
- proc . StartInfo . RedirectStandardOutput = true ;
544
-
545
- proc . Start ( ) ;
546
- string stderr = proc . StandardError . ReadToEnd ( ) ;
547
- proc . WaitForExit ( ) ;
548
- if ( stderr . Length > 0 || proc . ExitCode > 0 )
549
- {
550
- string stdout = proc . StandardOutput . ReadToEnd ( ) ;
551
- ReportExecFailure ( cmd , args , $ "{ stderr } \n { stdout } ") ;
552
- }
553
-
554
- return proc ;
555
- }
556
- catch ( Exception e )
557
- {
558
- ReportExecFailure ( cmd , args , e . Message ) ;
559
- throw ;
560
- }
561
- }
562
-
563
- internal void ReportExecFailure ( string cmd , string args , string msg )
564
- {
565
- Console . WriteLine ( $ "Failure while running { cmd } { args } :\n { msg } ") ;
566
- }
567
-
568
- public static bool IsRunningOnMonoOrDotNetCore ( )
569
- {
570
- #if NETCOREAPP
571
- return true ;
572
- #else
573
- return Type . GetType ( "Mono.Runtime" ) != null ;
574
- #endif
575
- }
576
-
577
418
//
578
419
// Flow Control
579
420
//
580
421
581
422
internal void Block ( )
582
423
{
583
- ExecRabbitMQCtl ( "set_vm_memory_high_watermark 0.000000001" ) ;
584
- // give rabbitmqctl some time to do its job
585
- Thread . Sleep ( 1200 ) ;
586
- Publish ( Conn ) ;
424
+ RabbitMQCtl . Block ( Conn , encoding ) ;
587
425
}
588
426
589
427
internal void Unblock ( )
590
428
{
591
- ExecRabbitMQCtl ( "set_vm_memory_high_watermark 0.4" ) ;
429
+ RabbitMQCtl . Unblock ( ) ;
592
430
}
593
431
594
432
internal void Publish ( IConnection conn )
595
433
{
596
- IModel ch = conn . CreateModel ( ) ;
597
- ch . BasicPublish ( "amq.fanout" , "" , null , encoding . GetBytes ( "message" ) ) ;
434
+ RabbitMQCtl . Publish ( conn , encoding ) ;
598
435
}
599
436
600
437
//
601
438
// Connection Closure
602
439
//
603
440
604
- public class ConnectionInfo
605
- {
606
- public string Pid
607
- {
608
- get ; set ;
609
- }
610
-
611
- public string Name
612
- {
613
- get ; set ;
614
- }
615
-
616
- public ConnectionInfo ( string pid , string name )
617
- {
618
- Pid = pid ;
619
- Name = name ;
620
- }
621
-
622
- public override string ToString ( )
623
- {
624
- return $ "pid = { Pid } , name: { Name } ";
625
- }
626
- }
627
-
628
- private static readonly Regex GetConnectionName = new Regex ( @"\{""connection_name"",""(?<connection_name>[^""]+)""\}" ) ;
629
-
630
441
internal List < ConnectionInfo > ListConnections ( )
631
442
{
632
- Process proc = ExecRabbitMQCtl ( "list_connections --silent pid client_properties" ) ;
633
- string stdout = proc . StandardOutput . ReadToEnd ( ) ;
634
-
635
- try
636
- {
637
- // {Environment.NewLine} is not sufficient
638
- string [ ] splitOn = new string [ ] { "\r \n " , "\n " } ;
639
- string [ ] lines = stdout . Split ( splitOn , StringSplitOptions . RemoveEmptyEntries ) ;
640
- // line: <rabbit@mercurio.1.11491.0> {.../*client_properties*/...}
641
- return lines . Select ( s =>
642
- {
643
- string [ ] columns = s . Split ( '\t ' ) ;
644
- Debug . Assert ( ! string . IsNullOrEmpty ( columns [ 0 ] ) , "columns[0] is null or empty!" ) ;
645
- Debug . Assert ( ! string . IsNullOrEmpty ( columns [ 1 ] ) , "columns[1] is null or empty!" ) ;
646
- Match match = GetConnectionName . Match ( columns [ 1 ] ) ;
647
- Debug . Assert ( match . Success , "columns[1] is not in expected format." ) ;
648
- return new ConnectionInfo ( columns [ 0 ] , match . Groups [ "connection_name" ] . Value ) ;
649
- } ) . ToList ( ) ;
650
- }
651
- catch ( Exception )
652
- {
653
- Console . WriteLine ( $ "Bad response from rabbitmqctl list_connections --silent pid client_properties{ Environment . NewLine } { stdout } ") ;
654
- throw ;
655
- }
443
+ return RabbitMQCtl . ListConnections ( ) ;
656
444
}
657
445
658
446
internal void CloseConnection ( IConnection conn )
659
447
{
660
- ConnectionInfo ci = ListConnections ( ) . First ( x => conn . ClientProvidedName == x . Name ) ;
661
- CloseConnection ( ci . Pid ) ;
448
+ RabbitMQCtl . CloseConnection ( conn ) ;
662
449
}
663
450
664
451
internal void CloseAllConnections ( )
665
452
{
666
- List < ConnectionInfo > cs = ListConnections ( ) ;
667
- foreach ( ConnectionInfo c in cs )
668
- {
669
- CloseConnection ( c . Pid ) ;
670
- }
453
+ RabbitMQCtl . CloseAllConnections ( ) ;
671
454
}
672
455
673
456
internal void CloseConnection ( string pid )
674
457
{
675
- ExecRabbitMQCtl ( $ "close_connection \" { pid } \" \" Closed via rabbitmqctl \" " ) ;
458
+ RabbitMQCtl . CloseConnection ( pid ) ;
676
459
}
677
460
678
461
internal void RestartRabbitMQ ( )
679
462
{
680
- StopRabbitMQ ( ) ;
681
- Thread . Sleep ( 500 ) ;
682
- StartRabbitMQ ( ) ;
463
+ RabbitMQCtl . RestartRabbitMQ ( ) ;
683
464
}
684
465
685
466
internal void StopRabbitMQ ( )
686
467
{
687
- ExecRabbitMQCtl ( "stop_app" ) ;
468
+ RabbitMQCtl . StopRabbitMQ ( ) ;
688
469
}
689
470
690
471
internal void StartRabbitMQ ( )
691
472
{
692
- ExecRabbitMQCtl ( "start_app" ) ;
473
+ RabbitMQCtl . StartRabbitMQ ( ) ;
693
474
}
694
475
695
476
//
0 commit comments