
Recently, the author added WebSocket protocol support to FakeRPC. This means we can send multiple pieces of data within a single connection request thanks to its full-duplex communication. The biggest regret of FakeRPC so far is that it's built on top of HTTP rather than TCP/IP. Therefore, considering the WebSocket protocol is more about verifying the feasibility of JSON-RPC and paving the way for the upcoming TCP/IP support. Perhaps you've never realized the intricate connections between these concepts, but if we understand each RPC call as a set of messages, wouldn't you have a deeper understanding of this somewhat old-fashioned concept of RPC? In the process of writing FakeRPC, I used the new .NET data structure Channel to forward messages. Taking the server side as an example, after each RPC request is processed by CallInvoker, the result of the RPC response is not immediately sent back to the client; instead, a background thread takes the message from the Channel and sends it back to the client. So why did the author go the long way? I hope this article can give you the answer.
Getting Started with Channel
Channel is a new collection type introduced by Microsoft after .NET Core 3.0. It resides in the System.Threading.Channels namespace and features async API, high performance, and thread safety. Currently, the main application scenario for Channel is the producer-consumer model. As shown in the diagram below, the producer is responsible for writing data to the queue, and the consumer is responsible for reading data from the queue. On this basis, the model can be further extended by increasing the number of producers or consumers. Tools like RabbitMQ or Kafka can be considered applications of the producer-consumer model in specific domains, and we can even sense a bit of generalized read-write separation from it.
Producer-Consumer Model Diagram
Romain Rolland once said, There is only one kind of true heroism in the world: to see life as it is, and still love it. At this moment, looking at this diagram and pondering, you might think of the following approach:
class Producer<T>
{
private readonly Queue<T> _queue;
public Producer(Queue<T> queue) { _queue = queue; }
}
class Consumer<T>
{
private readonly Queue<T> _queue;
public Consumer(Queue<T> queue) { _queue = queue; }
}
I admit this idea is theoretically fine, but in practice it's full of pitfalls. For example, producers should only write, consumers should only read, but when you pass a queue to them directly, maintaining this separation of duties is difficult. Not to mention that during queue usage, producers worry about the queue being "full," and consumers worry about it being "empty." If you add multiple producers, multiple consumers, multithreading/locks, etc., it's clearly not a simple problem. To solve this, Microsoft introduced BlockingCollection and BufferBlock earlier. Taking the former as an example, here is a typical producer-consumer model:
var bc = new BlockingCollection<int>();
// Producer
var producer = Task.Run(() => {
for (var i = 0; i < Count; i++) {
bc.Add(i);
Console.WriteLine("Producer Write Item: {0}", i);
}
bc.CompleteAdding();
});
// Consumer
var consumer = Task.Run(() => {
while (!bc.IsCompleted) {
if (bc.TryTake(out var item)) {
Console.WriteLine("Consumer Read Item: {0}", item);
}
}
});
await Task.WhenAll(producer, consumer);
Notice that implementing a producer-consumer model has become essentially effortless. At the same time, both BlockingCollection<T> and BufferBlock<T> are thread-safe collections, making them easier to use in multithreaded environments. Reflecting on my past experiences, whenever I needed to use thread synchronization primitives, I had to carefully walk on the edge of bugs. So you might ask: since we already have data structures like BlockingCollection<T> and BufferBlock<T>, why do we still need Channel? As an ordinary programmer, countless bugs fade away over time, and the problems that once troubled us are constantly being re-answered.
BlockingCollection, BufferBlock, Channel Performance Comparison
As shown in the figure, we tested the performance of these three data structures when reading and writing 10,000 pieces of data. Obviously, Channel has the best performance. So, tell me, isn't that a good enough reason? Is there anything more exciting than performance? When your computer's graphics card cannot handle the "Myth Trilogy" of Assassin's Creed, and even deploying Stable Diffusion locally becomes a luxury, you have to admit that this small performance optimization is one of the few ingenious achievements left in an era where Moore's Law is predicted to fail.
// Create a bounded Channel
var boundedChannel = Channel.CreateBounded<int>(100);
// Create an unbounded Channel
var unboundedChannel = Channel.CreateUnbounded<string>();
So, where should we start with Channel? You may notice that creating a Channel is very simple, unless you intend to create a bounded-capacity Channel. Remember the problem we raised at the beginning? In the producer-consumer model, a fixed-capacity queue will inevitably encounter a "full" queue, and we need a strategy or mechanism to handle it. Channel's solution is BoundedChannelFullMode:
var boundedChannel = Channel.CreateBounded<string>(
new BoundedChannelOptions(100) {
FullMode = BoundedChannelFullMode.Wait
});
Note that this is an enumeration type. It actually has four values: Wait, DropNewest, DropOldest, DropWrite, with Wait as default.
- Wait: When the queue is full, writing data returns
falseuntil space becomes available. - DropNewest: Remove the newest data, i.e., remove elements from the tail of the queue.
- DropOldest: Remove the oldest data, i.e., remove elements from the head of the queue.
- DropWrite: Data can be written but is immediately discarded.
Besides the "full" or "empty" queue problem, we also consider issues that may arise in multithreaded producer-consumer models. Fortunately, Channel inherently supports multithreading. We can use SingleWriter and SingleReader in ChannelOptions to specify whether the Channel has a single consumer or producer. By default, both values are false:
var boundedChannel = Channel.CreateBounded<string>(
new BoundedChannelOptions(100) {
SingleWriter = true,
SingleReader = false,
FullMode = BoundedChannelFullMode.Wait
});
For example, the code snippet above creates a Channel with a single producer and multiple consumers. For Channel, the two most important members are Writer and Reader. Writer corresponds to the producer, with type ChannelWriter<T>; Reader corresponds to the consumer, with type ChannelReader<T>. This time, we achieve true read-write separation:
// Producer produces data
channel.Writer.TryWrite("The desert is lonely, the sun sets over the long river.");
// Consumer consumes data
// Mode 1: Read one at a time
while (await channel.Reader.WaitToReadAsync())
{
while (channel.Reader.TryRead(out var item))
{
// Write specific processing logic here
}
}
// Mode 2: Read all at once
while (await channel.Reader.WaitToReadAsync())
{
await foreach (var item in channel.Reader.ReadAllAsync())
{
// Write specific processing logic here
}
}
Perhaps you are still eager to hear more, but I must regretfully tell you that this is the core usage of Channel. Surprised that it's so simple? This is indeed consistent with Microsoft's style—making complex things simple and easy to use. I won't delve into more details about Channel here; you can refer to the official documentation. I swear MSDN and MDN are the best documentation I've ever encountered.
Channel Applications
OK, after having a basic impression of Channel, let's look at its application in specific scenarios. Returning to FakeRPC, which served as the lead-in at the beginning of this article, when I considered adding WebSocket support, the first problem I faced was: how to map a method call to WebSocket communication? For ordinary HTTP, it naturally follows the request-response model, so it can easily link to a method call. But when we switch to a full-duplex WebSocket protocol, the problem becomes interesting. As we know, for WebSocket, the first connection is regular HTTP, but after the connection is established, HTTP is no longer needed. From then on, communication is back and forth. Ultimately, FakeRPC adopted the following approach to support WebSocket:
How FakeRPC Supports WebSocket Protocol
In this approach, CallInvoker is the core component responsible for processing requests. On the client side, the main job is to assemble a FakeRpcRequest based on the method and parameters, then send the message to the server by calling the SendAsync() method of the ClientWebSocket instance. Additionally, it needs to receive messages from the server. Since each message carries an Id, we can easily identify which message is a reply to us. On this basis, the author used a background thread to read messages from the Channel, so sending and receiving messages actually operate on two different threads. On the server side, message processing is similar, except that the server reads messages from the Channel to send to the client, while the client reads messages from the Channel to pass results to the proxy class. The code below shows some implementation details of the client mentioned above:
// Client sends a message
private async Task SendMessage(FakeRpcRequest request) {
var payload = await _messageSerializer.SerializeAsync<FakeRpcRequest>(request);
await _webSocket.SendAsync(new ArraySegment<byte>(payload), WebSocketMessageType.Binary, true, CancellationToken.None);
OnMessageSent?.Invoke(_webSocket, request);
}
// Client reads messages from Channel
private async Task ReadMessagesFromQueue() {
try {
while (await _messagesToReadQueue.Reader.WaitToReadAsync()) {
while (_messagesToReadQueue.Reader.TryRead(out var message)) {
try {
var response = await _messageSerializer.DeserializeAsync<FakeRpcResponse>(message.Array);
OnMessageReceived?.Invoke(_webSocket, response);
} catch (Exception e) {
_logger.LogError(e, $"Failed to send message due to {e.Message}");
}
}
}
}
catch (TaskCanceledException) { }
catch (OperationCanceledException) { }
catch (Exception e) {
_logger.LogError(e, $"Restart listen message queue due to {e.Message}");
ListenMessageQueue();
}
}
Of course, when the ClientWebSocket instance receives a message, it actually writes the message to the Channel. In a sense, you can think of CallInvoker as acting both as a producer and a consumer, and they run on two different threads:
var bytes = stream.ToArray();
var response = await _messageSerializer.DeserializeAsync<FakeRpcResponse>(bytes);
_logger?.LogInformation("Send response to {0}/{1}, payload:{3}", request.ServiceName, request.MethodName, response.Result);
_messagesToReadQueue.Writer.TryWrite(new ArraySegment<byte>(bytes));
At this point, using dynamic proxy, we can easily call an RPC interface, and it runs over WebSocket as a persistent connection:
var _clientFactory = serviceProvider.GetService<FakeRpcClientFactory>();
// Call GreetService
var greetProxy = _clientFactory.Create<IGreetService>(new Uri("ws://localhost:5000"), FakeRpcTransportProtocols.WebSocket, FakeRpcMediaTypes.Default);
var reply = await greetProxy.SayHello(new HelloRequest() { Name = "Zhang San" });
reply = await greetProxy.SayWho();
// Call ICalculatorService
var calculatorProxy = _clientFactory.Create<ICalculatorService>(new Uri("ws://localhost:5000"), FakeRpcTransportProtocols.WebSocket, FakeRpcMediaTypes.Default);
var result = calculatorProxy.Random();
Notice that we not only abstracted the transport protocol but also the message protocol. You can still freely use MessagePack or Protobuf. To prove that Channel really helps performance, here is a benchmark result of FakeRPC:
FakeRPC Performance Comparison: Different Communication Protocols and Message Protocols
We can see that MessagePack consistently performs well under both HTTP and WebSocket. This makes me anticipate whether it can continue this legend over TCP/IP. Just a few days ago, I completed the binary message definition for TCP/IP. Since serialization and deserialization have been abstracted into the IMessageSerializer interface, we will have more opportunities to support more message protocols. With Ubisoft officially announcing the new title Assassin's Creed: Mirage, my understanding of the name FakeRPC has extended to the philosophy of the Assassin Brotherhood: Nothing is true, or to say, everything is permitted. In a sense, RPC hides the winding intermediate processes, giving you the illusion that you can call a remote method like a local method. Even when designing binary message protocols, I suddenly realized that I was simply reinventing HTTP. So, does all this still make sense? Of course it does! After all, living is about being happy!
var buffer = new BufferBlock<int>();
// Producer
async static Task Producer(IEnumerable<int> values) {
foreach (var value in values) {
await buffer.SendAsync(value);
}
buffer.Complete();
}
// Consumer
async static Task Consumer(Action<int> process) {
while (await buffer.OutputAvailableAsync()) {
process?.Invoke(await buffer.ReceiveAsync());
}
}
var range = Enumerable.Range(0, 100);
await Task.WhenAll(Producer(range), Consumer(n => Console.WriteLine(n)));
BufferBlock is an important component of Microsoft's TPL DataFlow. The basic idea is that a data flow consists of data blocks, where after processing one block, it is linked to the next. Each block receives and buffers data from one or more sources. When a block receives information, it reacts to the input, and its output is passed to the next block. Besides BufferBlock, there are also ActionBlock, TransformBlock, BroadcastBlock, etc. The main reason we mention BufferBlock here is that it adopts the producer-consumer model, and BlockingCollection, BufferBlock, and Channel represent different stages of .NET. Recalling who you were at different stages is a story that evokes emotion! Applying this idea, we find a new way to play with Channel:
// GetFiles
Task<Channel<string>> GetFiles(string root) {
var filePathChannel = Channel.CreateUnbounded<string>();
var directoryInfo = new DirectoryInfo(root);
foreach (var file in directoryInfo.EnumerateFileSystemInfos()) {
filePathChannel.Writer.TryWrite(file.FullName);
}
filePathChannel.Writer.Complete();
return Task.FromResult(filePathChannel);
}
// Analyse
async Task<Channel<string>[]> Analyse(Channel<string> rootChannel) {
var counterChannel = Channel.CreateUnbounded<string>();
var errorsChannel = Channel.CreateUnbounded<string>();
while (await rootChannel.Reader.WaitToReadAsync()) {
await foreach (var filePath in rootChannel.Reader.ReadAllAsync()) {
var fileInfo = new FileInfo(filePath);
if (fileInfo.Extension == ".md") {
var totalWords = File.ReadAllText(filePath).Length;
counterChannel.Writer.TryWrite($"Article [{fileInfo.Name}] has {totalWords} characters.");
} else {
errorsChannel.Writer.TryWrite($"Path [{filePath}] is a folder or has an incorrect format.");
}
}
}
counterChannel.Writer.Complete();
errorsChannel.Writer.Complete();
return new Channel<string>[] { counterChannel, errorsChannel };
}
// Merge
async Task<Channel<string>> Merge(params Channel<string>[] channels) {
var mergeTasks = new List<Task>();
var outputChannel = Channel.CreateUnbounded<string>();
foreach (var channel in channels) {
var thisChannel = channel;
var mergeTask = Task.Run(async () => {
while (await thisChannel.Reader.WaitToReadAsync()) {
await foreach (var item in thisChannel.Reader.ReadAllAsync()) {
outputChannel.Writer.TryWrite(item);
}
}
});
mergeTasks.Add(mergeTask);
}
await Task.WhenAll(mergeTasks);
outputChannel.Writer.Complete();
return outputChannel;
}
// Run
var filePathChannel = await GetFiles(@"/hugo-blog/content/posts/");
var analysedChannels = await Analyse(filePathChannel);
var mergedChannel = await Merge(analysedChannels);
while (await mergedChannel.Reader.WaitToReadAsync()) {
await foreach (var item in mergedChannel.Reader.ReadAllAsync()) {
Console.WriteLine(item);
}
}
OK, what do these three methods do? Personally, I think this is exactly the data flow we mentioned earlier. First, we use GetFiles() to get file information in a specified directory; then these are sent to Analyse() for processing—here it counts characters in markdown files and filters out non-markdown files or subdirectories; finally, via Merge(), we aggregate and output the results. If represented by a diagram, it would be the following flow:
Data Flow Pattern Using Channel
In a sense, this is a "divide and conquer" strategy: breaking a large task into smaller tasks and merging their results. Many years ago, I saw a similar code snippet in a book on parallel programming. At that time, I had already heard of Google's MapReduce, and later I came across Parallel. I suddenly realized that if the Map() and Reduce() functions run on a remote server, this process can be considered RPC, and the functions running on the remote server are performing some computation in parallel, which is parallel computing. When these parallel computations use globally scalable computing resources, it becomes cloud computing. So, the process of writing is quite interesting, isn't it?
Summary
Many years ago, my colleague Wesley talked about how to write logs to a database. At that time, none of us had heard of the term Elasticsearch. The solution we could think of was to use BlockingCollection to implement a blocking queue. In other words, after receiving logs from NLog or Log4Net, put them all into BlockingCollection and then consider writing them to a database or some output. Later, I gradually encountered NLog Target and Serilog Sink, and roughly understood how all this works. Even these logging components can support outputting logs to different destinations. In a sense, this cognitive change might declare the obsolescence of the immature thinking of the past. However, it is precisely these immature thoughts that constantly push you to refresh your understanding of a thing. In other words, our lives are made up of countless pasts. No matter how awkward, down, or frustrated you were, they all tirelessly tell you that I really existed in this world. Just as we have a more convenient Channel today, it doesn't mean our past thinking was meaningless. At least when we mention the producer-consumer model, I will think of Wesley, of BlockingCollection. Perhaps this is time's response to someone with a good memory: You can miss the past, but don't expect everything to remain the same. In this world, you can only do what you can do.