Implementing a Distributed Event Bus with Socket in C# Without Relying on Third-party MQ

Implementing a Distributed Event Bus with Socket in C# Without Relying on Third-party MQ

CodeWF.EventBus.Socket is a lightweight, Socket-based distributed event bus system designed to simplify event communication in distributed architectures. It allows process-to-process communication via publish/subscribe patterns without relying on external message queue services.

Last updated 7/28/2024 10:25 AM
沙漠尽头的狼
4 min read
Category
.NET
Tags
.NET C# Architecture Design Distributed EventBus

Distributed Event Bus Implemented with Socket, Supporting CQRS, No Third-Party MQ Required.

CodeWF.EventBus.Socket is a lightweight, Socket-based distributed event bus system designed to simplify event communication in distributed architectures. It allows processes to communicate via publish/subscribe patterns without relying on external message queue services.

Command

Command

Query

Query

Features

  • Lightweight: No dependency on external MQ services, reducing system complexity and dependencies.

  • High Performance: Direct communication based on Socket, providing low-latency, high-throughput message delivery.

  • Flexibility: Supports custom event types and message handlers, easy to integrate into existing systems.

  • Scalability: Supports multiple client connections, suitable for distributed system environments.

Communication Protocol

Data is exchanged via the TCP protocol, and the protocol packet structure is as follows:

0.0.8@2x

Installation

Install CodeWF.EventBus.Socket via the NuGet package manager:

Install-Package CodeWF.EventBus.Socket

Server Usage

Running the Event Service

In the server-side code, create and start an EventServer instance to listen for client connections and events:

using CodeWF.EventBus.Socket;

// Create an event server instance
IEventServer eventServer = new EventServer();

// Start the event server, listening on the specified IP and port
eventServer.Start("127.0.0.1", 9100);

Stopping the Event Service

When the event service is no longer needed, call the Stop method to gracefully shut down the server:

eventServer.Stop();

Client Usage

Connecting to the Event Service

In the client code, create an EventClient instance and connect to the event server:

using CodeWF.EventBus.Socket;

// Create an event client instance
IEventClient eventClient = new EventClient();

// Connect to the event server, use eventClient.ConnectStatus to check the connection status
eventClient.Connect("127.0.0.1", 9100);

Subscribing to Events

Subscribe to specific types of events and specify the event handler:

eventClient.Subscribe<NewEmailCommand>("event.email.new", ReceiveNewEmailCommand);

private void ReceiveNewEmail(NewEmailCommand command)
{
    // Handle the new email notification
    Console.WriteLine($"New email received, subject is {command.Subject}");
}

Publishing Commands

Publish an event to a specific topic for subscribed clients to handle:

// Publish a new email notification event
eventClient.Publish("event.email.new", new NewEmailCommand { Subject = "Congratulations, you won first prize on GitHub", Content = "We are happy, you in July 2024...", SendTime = new DateTime(2024, 7, 27) });

Querying

Query a specific topic; a receiving query endpoint must subscribe to the same topic (i.e., the producer). After receiving the request, it publishes the query result with the same topic:

eventClient.Subscribe<EmailQuery>("event.email.query", ReceiveEmailQuery);

private void ReceiveEmailQuery(EmailQuery query)
{
    // Execute the query request and prepare the query result
    var response = new EmailQueryResponse { Emails = EmailManager.QueryEmail(request.Subject) };

    // Publish the query result with the same topic
    if (_eventClient!.Publish("event.email.query", response,
        out var errorMessage))
    {
        Logger.Info($"Response query result: {response}");
    }
    else
    {
        Logger.Error($"Response query failed: {errorMessage}");
    }
}

Other endpoints can query using the same topic (i.e., consumers):

var response = _eventClient!.Query<EmailQuery, EmailQueryResponse>("event.email.query",
    new EmailQuery() { Subject = "Account" },
    out var errorMessage);
if (string.IsNullOrWhiteSpace(errorMessage) && response != null)
{
    Logger.Info($"Query event.email.query, result: {response}");
}
else
{
    Logger.Error(
        $"Query event.email.query failed: [{errorMessage}]");
}

Unsubscribing from Events

When you no longer need to receive a certain type of event, you can unsubscribe:

eventClient.Unsubscribe<NewEmailNotification>("event.email.new", ReceiveNewEmail);

Disconnecting from the Event Service

When event handling is complete or you need to disconnect from the server, call the Disconnect method:

eventClient.Disconnect();
Console.WriteLine("Disconnected from the event service");

Notes

  • Ensure that the address and port number used by the server and client are consistent and that the port is not occupied by other services.

  • In a production environment, the server should be configured to listen on a public IP address or appropriate network interface.

  • Considering network anomalies and service restarts, clients may need to implement reconnection logic.

  • According to actual requirements, the EventServer and EventClient classes can be extended to support more complex features such as message encryption, authentication, and authorization.

Keep Exploring

Related Reading

More Articles