Hi 👋, so in today’s post I will be narrating how I wrote a custom Redis streams provider for Orleans.
Before I continue, I want to express my gratitude to the Orleans community for their support in addressing my queries. Special thanks to Ledjon Behluli and Reuben Bond for their invaluable contributions to the community.
Streams in Orleans enable your grains to react to a sequence of events. It allows the decoupling of the data generator from the data consumer.
Orleans’ extensibility allows developers to create custom stream providers to enable interoperability with the Orleans runtime.
Redis offers two popular messaging and real-time event-driven communication. They are:
- Redis Pub/Sub
- Redis Streams
I chose to write a Redis stream provider for Orleans due to the benefits Redis Stream has over Redis Pub/Sub. Redis stream supports message persistence, it also supports multiple consumers reading events at their own pace and other complex message processing.
For this article and project, I will be using Redis stream as a queue and a message broker.
To learn about streams in Orleans check these resources:
To learn about Redis Streams check here.
Building the Redis Streams Provider
Building a custom stream provider for Orleans requires implementing some interfaces.
The IQueueAdapterFactory interface is used for bootstrapping the components needed for a stream provider to be compatible with the Orleans runtime.
The IQueueAdapter interface enables the producer to send an event to the right queue and it also enables the Orleans runtime to know which queues to retrieve the events from before sending it to the consumer(Grain).
In Orleans each queue is assigned a pulling agent, the pulling agent retrieves a message from an assigned queue and sends it to a consumer which is a target Grain in Orleans.
You can decide to set the number of queues to one or more from your queue mapping settings when building the Orleans client and server host.
Orleans streams give you the freedom to use more than one queue to support various scalability, reliability and performance needs in a distributed system.
The IStreamFailureHandler interface enables you to define how you want to handle delivery errors to stream subscriptions.
The IQueueAdapterCache interface enables you to define how you want to handle the caching of the retrieved events from the queue.
The IStreamQueueMapper interface enables the stream/event producer to know which queue to push an event to by mapping the stream ID to a queue and it enables the runtime to know how many queues and cache to generate for the pulling agent.
Below is my implementation for the IQueueAdapterFactory interface:
public class RedisStreamFactory : IQueueAdapterFactory
{
private readonly IDatabase _database;
private readonly ILoggerFactory _loggerFactory;
private readonly string _providerName;
private readonly IStreamFailureHandler _streamFailureHandler;
private readonly SimpleQueueCacheOptions _simpleQueueCacheOptions;
private readonly HashRingBasedStreamQueueMapper _hashRingBasedStreamQueueMapper;
public RedisStreamFactory(IDatabase database,
ILoggerFactory loggerFactory,
string providerName,
IStreamFailureHandler streamFailureHandler,
SimpleQueueCacheOptions simpleQueueCacheOptions,
HashRingStreamQueueMapperOptions hashRingStreamQueueMapperOptions
)
{
_database = database;
_loggerFactory = loggerFactory;
_providerName = providerName;
_streamFailureHandler = streamFailureHandler;
_simpleQueueCacheOptions = simpleQueueCacheOptions;
_hashRingBasedStreamQueueMapper = new HashRingBasedStreamQueueMapper(hashRingStreamQueueMapperOptions, providerName);
}
public static IQueueAdapterFactory Create(IServiceProvider provider, string providerName)
{
var database = provider.GetRequiredService<IDatabase>();
var loggerFactory = provider.GetRequiredService<ILoggerFactory>();
var simpleQueueCacheOptions = provider.GetOptionsByName<SimpleQueueCacheOptions>(providerName);
var hashRingStreamQueueMapperOptions = provider.GetOptionsByName<HashRingStreamQueueMapperOptions>(providerName);
var streamFailureHandler = new RedisStreamFailureHandler(loggerFactory.CreateLogger<RedisStreamFailureHandler>());
return new RedisStreamFactory(database, loggerFactory, providerName, streamFailureHandler, simpleQueueCacheOptions, hashRingStreamQueueMapperOptions);
}
public Task<IQueueAdapter> CreateAdapter()
{
return Task.FromResult<IQueueAdapter>(new RedisStreamAdapter(_database, _providerName, _hashRingBasedStreamQueueMapper, _loggerFactory));
}
public Task<IStreamFailureHandler> GetDeliveryFailureHandler(QueueId queueId)
{
return Task.FromResult(_streamFailureHandler);
}
public IQueueAdapterCache GetQueueAdapterCache()
{
return new SimpleQueueAdapterCache(_simpleQueueCacheOptions, _providerName, _loggerFactory);
}
public IStreamQueueMapper GetStreamQueueMapper()
{
return _hashRingBasedStreamQueueMapper;
}
}
Next is my implementation of the IQueueAdapter interface, this interface enables the creation of a queue receiver. A receiver is generated for each queue. This interface is also used to send or push events to the queue. To push an event to the assigned queue, I had to use the inbuilt HashRingBasedStreamQueueMapper to obtain the queue a stream is assigned to. But you can write a custom queue mapper that suits your needs.
I also made use of the inbuilt SimpleQueueAdapterCache implementation to enable the caching of pulled events. But you can also use a custom one based on your needs.
In my implementation I made a batch of events to be a single event in Redis streams. I made the single event to comprise a stream’s namespace, a stream key, an event type and the actual data (event data).
This information will enable the queues pulling agent to know the consumer(Grain) for each event. Also do ensure that the Name property is properly set to the name of the stream provider.
The code below is my implementation of the interface.
public class RedisStreamAdapter : IQueueAdapter
{
private readonly IDatabase _database;
private readonly string _providerName;
private readonly HashRingBasedStreamQueueMapper _hashRingBasedStreamQueueMapper;
private readonly ILoggerFactory _loggerFactory;
private readonly ILogger<RedisStreamAdapter> _logger;
public RedisStreamAdapter(IDatabase database, string providerName, HashRingBasedStreamQueueMapper hashRingBasedStreamQueueMapper, ILoggerFactory loggerFactory)
{
_database = database;
_providerName = providerName;
_hashRingBasedStreamQueueMapper = hashRingBasedStreamQueueMapper;
_loggerFactory = loggerFactory;
_logger = loggerFactory.CreateLogger<RedisStreamAdapter>();
}
public string Name => _providerName;
public bool IsRewindable => false;
public StreamProviderDirection Direction => StreamProviderDirection.ReadWrite;
public IQueueAdapterReceiver CreateReceiver(QueueId queueId)
{
return new RedisStreamReceiver(queueId, _database, _loggerFactory.CreateLogger<RedisStreamReceiver>());
}
public async Task QueueMessageBatchAsync<T>(StreamId streamId, IEnumerable<T> events, StreamSequenceToken token, Dictionary<string, object> requestContext)
{
try
{
foreach (var @event in events)
{
NameValueEntry streamNamespaceEntry = new("streamNamespace", streamId.Namespace);
NameValueEntry streamKeyEntry = new("streamKey", streamId.Key);
NameValueEntry eventTypeEntry = new("eventType", @event!.GetType().Name);
NameValueEntry dataEntry = new("data", JsonSerializer.Serialize(@event));
var queueId = _hashRingBasedStreamQueueMapper.GetQueueForStream(streamId);
await _database.StreamAddAsync(queueId.ToString(), [streamNamespaceEntry, streamKeyEntry, eventTypeEntry, dataEntry]);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error adding event to stream {StreamId}", streamId);
}
}
}
Next, I implemented the IQueueAdapterReceiver interface, this implementation is used by pulling agents to retrieve messages or events from queues( Redis Stream) and sends the messages to the consumers (Grains). It also enables you to perform some cleanups or acknowledgement of processed messages using the MessagesDeliveredAsync method.
My implementation can be seen below.
public class RedisStreamReceiver : IQueueAdapterReceiver
{
private readonly QueueId _queueId;
private readonly IDatabase _database;
private readonly ILogger<RedisStreamReceiver> _logger;
private string _lastId = "0";
private Task? pendingTasks;
public RedisStreamReceiver(QueueId queueId, IDatabase database, ILogger<RedisStreamReceiver> logger)
{
_queueId = queueId;
_database = database;
_logger = logger;
}
public async Task<IList<IBatchContainer>?> GetQueueMessagesAsync(int maxCount)
{
try
{
var events = _database.StreamReadGroupAsync(_queueId.ToString(), "consumer", _queueId.ToString(), _lastId, maxCount);
pendingTasks = events;
_lastId = ">";
var batches = (await events).Select(e => new RedisStreamBatchContainer(e)).ToList<IBatchContainer>();
return batches;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error reading from stream {QueueId}", _queueId);
return default;
}
finally
{
pendingTasks = null;
}
}
public async Task Initialize(TimeSpan timeout)
{
try
{
using (var cts = new CancellationTokenSource(timeout))
{
var task = _database.StreamCreateConsumerGroupAsync(_queueId.ToString(), "consumer", "$", true);
await task.WaitAsync(timeout, cts.Token);
}
}
catch (Exception ex) when (ex.Message.Contains("name already exists")) { }
catch (Exception ex)
{
_logger.LogError(ex, "Error initializing stream {QueueId}", _queueId);
}
}
public async Task MessagesDeliveredAsync(IList<IBatchContainer> messages)
{
try
{
foreach (var message in messages)
{
var container = message as RedisStreamBatchContainer;
if (container != null)
{
var ack = _database.StreamAcknowledgeAsync(_queueId.ToString(), "consumer", container.StreamEntry.Id);
pendingTasks = ack;
await ack;
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error acknowledging messages in stream {QueueId}", _queueId);
}
finally
{
pendingTasks = null;
}
}
public async Task Shutdown(TimeSpan timeout)
{
using (var cts = new CancellationTokenSource(timeout))
{
if (pendingTasks is not null)
{
await pendingTasks.WaitAsync(timeout, cts.Token);
}
}
_logger.LogInformation("Shutting down stream {QueueId}", _queueId);
}
}
Next, I created a new type called RedisStreamSequenceToken which I inherited from the StreamSequenceToken abstract class.
I needed a way to convert Redis streams event ID, into a form that is compatible with Orleans streams. so this inheritance did the magic. I converted the Redis stream ID timestamp section into the SequenceNumber property and the Redis stream sequence number section into the EventIndex property.
The inheritance requires using the GenerateSerializer attribute to mark the new type. This will enable the Orleans runtime to copy the object within its internals. You also need to ensure that the Microsoft.Orleans.Sdk is used to enable the proper generation of the serialized code.
The code below is my implementation:
public class RedisStreamSequenceToken : StreamSequenceToken
{
[Id(0)]
public override long SequenceNumber { get; protected set; }
[Id(1)]
public override int EventIndex { get; protected set; }
public RedisStreamSequenceToken(RedisValue id)
{
var split = id.ToString().Split("-");
SequenceNumber = long.Parse(split[0]);
EventIndex = int.Parse(split[1]);
}
public RedisStreamSequenceToken(long sequenceNumber, int eventIndex)
{
SequenceNumber = sequenceNumber;
EventIndex = eventIndex;
}
public override int CompareTo(StreamSequenceToken other)
{
if (other is null) throw new ArgumentNullException(nameof(other));
if (other is RedisStreamSequenceToken token)
{
if (SequenceNumber == token.SequenceNumber)
{
return EventIndex.CompareTo(token.EventIndex);
}
return SequenceNumber.CompareTo(token.SequenceNumber);
}
throw new ArgumentException("Invalid token type", nameof(other));
}
public override bool Equals(StreamSequenceToken other)
{
var token = other as RedisStreamSequenceToken;
return token != null && SequenceNumber == token.SequenceNumber && EventIndex == token.EventIndex;
}
}
Finally, I implemented the IBatchContainer interface. This interface enables you to define a container for an event or a list of events. However, in my implementation, I made it to only encapsulate a single event or message from Redis streams.
Orleans streams support having heterogeneous types. This means that the queueing system can hold different types of data structures and Orleans will be able to deserialize it with your custom deserializer into the proper type required by the the event consumers.
It also has a generic method called GetEvents<T>(). This method enables the pulling agent to check if the batch container has the type of data required by the subscribed grain.
Below is my implementation:
public class RedisStreamBatchContainer : IBatchContainer
{
public StreamId StreamId { get; }
public StreamSequenceToken SequenceToken { get; }
public StreamEntry StreamEntry { get; }
public RedisStreamBatchContainer(StreamEntry streamEntry)
{
StreamEntry = streamEntry;
var streamNamespace = StreamEntry.Values[0].Value;
var steamKey = StreamEntry.Values[1].Value;
StreamId = StreamId.Create(streamNamespace!, steamKey!);
SequenceToken = new RedisStreamSequenceToken(StreamEntry.Id);
}
public IEnumerable<Tuple<T, StreamSequenceToken>> GetEvents<T>()
{
List<Tuple<T, StreamSequenceToken>> events = new();
var eventType = typeof(T).Name;
if (eventType == StreamEntry.Values[2].Value)
{
var data = StreamEntry.Values[3].Value;
var @event = JsonSerializer.Deserialize<T>(data!);
events.Add(new(@event!, SequenceToken));
}
return events;
}
public bool ImportRequestContext()
{
return false;
}
}
One last thing you should note is that when setting up the Factory settings ensure the same configuration of queues is set on both the Orleans server and the client.
For instance, if I want 4 queues to be created then I have to set the number of queues in both the client and the server to be the same. The code below shows that:
// client
using IHost host = new HostBuilder()
.UseOrleansClient(clientBuilder =>
{
clientBuilder.Services.AddSingleton<IDatabase>(sp =>
{
IDatabase db = ConnectionMultiplexer.Connect("localhost").GetDatabase();
return db;
});
clientBuilder.UseLocalhostClustering();
clientBuilder.AddPersistentStreams("RedisStream", RedisStreamFactory.Create, null);
clientBuilder.ConfigureServices(services =>
{
services.AddOptions<HashRingStreamQueueMapperOptions>("RedisStream")
.Configure(options =>
{
options.TotalQueueCount = 4;
});
});
})
.ConfigureLogging(logging => logging.AddConsole())
.Build();
// server
var builder = new HostBuilder()
.UseOrleans(silo =>
{
silo.UseLocalhostClustering();
silo.Services.AddSingleton<IDatabase>(sp =>
{
return ConnectionMultiplexer.Connect("localhost").GetDatabase();
});
silo.ConfigureLogging(logging => logging.AddConsole());
silo.AddMemoryGrainStorage("PubSubStore");
silo.AddPersistentStreams("RedisStream", Provider.RedisStreamFactory.Create, null);
silo.AddMemoryGrainStorageAsDefault();
}).UseConsoleLifetime();
builder.ConfigureServices(services =>
{
services.AddOptions<HashRingStreamQueueMapperOptions>("RedisStream")
.Configure(options =>
{
options.TotalQueueCount = 4;
});
services.AddOptions<SimpleQueueCacheOptions>("RedisStream");
});
using IHost host = builder.Build();
Thanks for reading through, a working project that stitches everything together can be seen here.