Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<ProjectGuid>{FEA9F7D9-1B00-4535-AC4D-BF3934A7B913}</ProjectGuid>
<TargetFramework>net452</TargetFramework>
<TargetFramework>net10.0</TargetFramework>
<SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..\</SolutionDir>
<AssemblyTitle>Rebus.Transports.Showdown.Core</AssemblyTitle>
<Product>Rebus.Transports.Showdown.Core</Product>
Expand All @@ -20,12 +20,8 @@
</PropertyGroup>
<Import Project="$(SolutionDir)\.nuget\NuGet.targets" Condition="Exists('$(SolutionDir)\.nuget\NuGet.targets')" />
<ItemGroup>
<PackageReference Include="Newtonsoft.Json" Version="9.0.1" />
<PackageReference Include="Rebus" Version="4.1.0" />
<PackageReference Include="Rebus" Version="8.9.0" />
</ItemGroup>
<ItemGroup>
<Reference Include="System.Configuration" />
<Reference Include="System.Data.DataSetExtensions" />
<Reference Include="Microsoft.CSharp" />
</ItemGroup>
</Project>
</Project>
239 changes: 117 additions & 122 deletions Showdown/Rebus.Transports.Showdown.Core/ShowdownRunner.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq;
Expand All @@ -12,127 +12,122 @@

namespace Rebus.Transports.Showdown.Core
{
public class ShowdownRunner : IDisposable
{
const int MessageCount = 10000;
const int NumberOfWorkers = 5;

readonly BuiltinHandlerActivator _adapter = new BuiltinHandlerActivator();

public BuiltinHandlerActivator Adapter => _adapter;

public async Task Run(string showdownName)
{
try
{
using (var printTimer = new Timer())
{
var sentMessagesCount = 0;
var receivedMessagesCount = 0;
printTimer.Interval = 5000;
printTimer.Elapsed +=
delegate
{
Print("Sent {0} messages. Received {1} messages.", sentMessagesCount, receivedMessagesCount);
};
printTimer.Start();

Print(@"----------------------------------------------------------------------
public class ShowdownRunner : IDisposable
{
const int MessageCount = 10000;
const int NumberOfWorkers = 5;

readonly BuiltinHandlerActivator _adapter = new();

public BuiltinHandlerActivator Adapter => _adapter;

public async Task Run(string showdownName)
{
try
{
using var printTimer = new Timer();
var sentMessagesCount = 0;
var receivedMessagesCount = 0;
printTimer.Interval = 5000;
printTimer.Elapsed +=
delegate
{
Print("Sent {0} messages. Received {1} messages.", sentMessagesCount, receivedMessagesCount);
};
printTimer.Start();

Print(@"----------------------------------------------------------------------
Running showdown: {0}
----------------------------------------------------------------------",
showdownName);
// .GetName()
// .Name);

var receivedMessageIds = new ConcurrentDictionary<int, int>();
var receivedMessages = 0;

Print("Stopping all workers in receiver");
var receiverBus = (RebusBus)_adapter.Bus;
receiverBus.SetNumberOfWorkers(0);

Thread.Sleep(TimeSpan.FromSeconds(2));

Print("Sending {0} messages from sender to receiver", MessageCount);

var senderWatch = Stopwatch.StartNew();

await Task.WhenAll(Enumerable.Range(0, MessageCount)
.Select(async i =>
{
var message = new TestMessage { MessageId = i };
receivedMessageIds[message.MessageId] = 0;
await _adapter.Bus.SendLocal(message);
Interlocked.Increment(ref sentMessagesCount);
}));

var totalSecondsSending = senderWatch.Elapsed.TotalSeconds;
Print("Sending {0} messages took {1:0.0} s ({2:0.0} msg/s)",
MessageCount, totalSecondsSending, MessageCount / totalSecondsSending);

var resetEvent = new ManualResetEvent(false);

_adapter.Handle<TestMessage>(async message =>
{
var result = Interlocked.Increment(ref receivedMessages);
if (result == MessageCount)
{
resetEvent.Set();
}
Interlocked.Increment(ref receivedMessagesCount);
});


Print("Starting receiver with {0} workers", NumberOfWorkers);

var receiverWatch = Stopwatch.StartNew();
receiverBus.Advanced.Workers.SetNumberOfWorkers(NumberOfWorkers);

resetEvent.WaitOne();
var totalSecondsReceiving = receiverWatch.Elapsed.TotalSeconds;

Thread.Sleep(2000);
printTimer.Stop();
Print("Receiving {0} messages took {1:0.0} s ({2:0.0} msg/s)",
MessageCount, totalSecondsReceiving, MessageCount / totalSecondsReceiving);

}
}
catch (Exception e)
{
Print("Error: {0}", e);
}
}

void Print(string message, params object[] objs)
{
Console.WriteLine(message, objs);
}

public void Dispose()
{
if (_disposing || _disposed) return;

lock (this)
{
if (_disposing || _disposed) return;

try
{
_disposing = true;
_adapter.Dispose();
}
finally
{
_disposed = true;
_disposing = false;
}
}
}

bool _disposed;
bool _disposing;


}
showdownName);

var receivedMessageIds = new ConcurrentDictionary<int, int>();
var receivedMessages = 0;

Print("Stopping all workers in receiver");
var receiverBus = (RebusBus)_adapter.Bus;
receiverBus.SetNumberOfWorkers(0);

Thread.Sleep(TimeSpan.FromSeconds(2));

Print("Sending {0} messages from sender to receiver", MessageCount);

var senderWatch = Stopwatch.StartNew();

await Task.WhenAll(Enumerable.Range(0, MessageCount)
.Select(async i =>
{
var message = new TestMessage { MessageId = i };
receivedMessageIds[message.MessageId] = 0;
await _adapter.Bus.SendLocal(message);
Interlocked.Increment(ref sentMessagesCount);
}));

var totalSecondsSending = senderWatch.Elapsed.TotalSeconds;
Print("Sending {0} messages took {1:0.0} s ({2:0.0} msg/s)",
MessageCount, totalSecondsSending, MessageCount / totalSecondsSending);

var resetEvent = new ManualResetEvent(false);

_adapter.Handle<TestMessage>(async message =>
{
var result = Interlocked.Increment(ref receivedMessages);
if (result == MessageCount)
{
resetEvent.Set();
}
Interlocked.Increment(ref receivedMessagesCount);
});

Print("Starting receiver with {0} workers", NumberOfWorkers);

var receiverWatch = Stopwatch.StartNew();
receiverBus.Advanced.Workers.SetNumberOfWorkers(NumberOfWorkers);

resetEvent.WaitOne();
var totalSecondsReceiving = receiverWatch.Elapsed.TotalSeconds;

Thread.Sleep(2000);
printTimer.Stop();
var totalRate = MessageCount / totalSecondsReceiving;
var perWorkerRate = totalRate / NumberOfWorkers;
Print($"Receiving {MessageCount} messages took {totalSecondsReceiving} s ({totalRate:0.0} msg/s with {NumberOfWorkers} workers rate of {perWorkerRate:0.0} msg/s per worker");
}
catch (Exception e)
{
Print("Error: {0}", e);
}
}

static void Print(string message, params object[] objs)
{
Console.WriteLine(message, objs);
}

void IDisposable.Dispose()
{
if (_disposing || _disposed) return;

lock (this)
{
if (_disposing || _disposed) return;

try
{
_disposing = true;
_adapter.Dispose();
}
finally
{
_disposed = true;
_disposing = false;
}
}
}

bool _disposed;
bool _disposing;


}
}
6 changes: 0 additions & 6 deletions Showdown/Rebus.Transports.Showdown.Postgresql/App.config

This file was deleted.

67 changes: 27 additions & 40 deletions Showdown/Rebus.Transports.Showdown.Postgresql/Program.cs
Original file line number Diff line number Diff line change
@@ -1,52 +1,39 @@
using Npgsql;
using Npgsql;
using NpgsqlTypes;
using Rebus.Config;
using Rebus.PostgreSql.Transport;
using Rebus.Transports.Showdown.Core;

namespace Rebus.Transports.Showdown.PostgreSql
{
public class Program
{
const string QueueName = "test_showdown";
const string TableName = QueueName;
const string PostgresqlConnectionString = "server=localhost;database=rebus2_test;user id=postgres; password=postgres; Maximum Pool Size=30";
const string TableNotFound = "42P01";
public class Program
{
const string QueueName = "test_showdown";
const string TableName = QueueName;
const string PostgresqlConnectionString = "server=localhost;port=55432;database=showdowndb;user id=showdown;password=showdownpw;";

public static void Main()
{
using (var runner = new ShowdownRunner())
{
PurgeInputQueue(QueueName);
public static void Main()
{
using var runner = new ShowdownRunner();
PurgeInputQueue(QueueName);

Configure.With(runner.Adapter)
.Logging(l => l.None())
.Transport(t => t.UsePostgreSql(PostgresqlConnectionString, TableName, QueueName))
.Options(o => o.SetMaxParallelism(20))
.Start();
Configure.With(runner.Adapter)
.Logging(l => l.None())
.Transport(t => t.UsePostgreSql(PostgresqlConnectionString, TableName, QueueName))
.Options(o => o.SetMaxParallelism(20))
.Start();

runner.Run(typeof(Program).Namespace).Wait();
}
}
runner.Run(typeof(Program).Namespace).Wait();
}

static void PurgeInputQueue(string queueName)
{
using (var connection = new NpgsqlConnection(PostgresqlConnectionString))
{
connection.Open();
try
{
using (var command = connection.CreateCommand())
{
command.CommandText = $@"DELETE FROM ""{TableName}"" WHERE ""recipient"" = @recipient;";
command.Parameters.Add("recipient", NpgsqlDbType.Text, 200).Value = queueName;
command.ExecuteNonQuery();
}
}
catch (PostgresException postgresException) when (postgresException.SqlState == TableNotFound)
{
}
}
}
}
static void PurgeInputQueue(string queueName)
{
using var connection = new NpgsqlConnection(PostgresqlConnectionString);
connection.Open();
using var command = connection.CreateCommand();
command.CommandText = $"DROP TABLE IF EXISTS \"{TableName}\"";
command.ExecuteNonQuery();

}
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<ProjectGuid>{BAA972E0-27AE-450D-A45D-098801082A10}</ProjectGuid>
<OutputType>Exe</OutputType>
<TargetFramework>net452</TargetFramework>
<TargetFramework>net10.0</TargetFramework>
<AutoGenerateBindingRedirects>true</AutoGenerateBindingRedirects>
<AssemblyTitle>Rebus.Transports.Showdown.Postgresql</AssemblyTitle>
<Product>Rebus.Transports.Showdown.Postgresql</Product>
Expand All @@ -20,19 +20,13 @@
<DebugType>pdbonly</DebugType>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Newtonsoft.Json" Version="9.0.1" />
<PackageReference Include="Npgsql" Version="3.2.2" />
<PackageReference Include="Rebus" Version="4.1.0" />
<PackageReference Include="Rebus.PostgreSql" Version="4.0.0" />
<PackageReference Include="System.Threading.Tasks.Extensions" Version="4.3.0" />
<PackageReference Include="Npgsql" Version="10.0.1" />
<PackageReference Include="Rebus" Version="8.9.0" />
<PackageReference Include="Rebus.PostgreSql" Version="9.1.1" />
</ItemGroup>
<ItemGroup>
<Reference Include="System.Configuration" />
<Reference Include="System.Data.DataSetExtensions" />
<Reference Include="Microsoft.CSharp" />
<Reference Include="System.Net.Http" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Rebus.Transports.Showdown.Core\Rebus.Transports.Showdown.Core.csproj" />
</ItemGroup>
</Project>
</Project>
Loading