Introducing Kafka transport support in CoreWCF
Introduction
CoreWCF 1.4 release introduced Apache Kafka transport support through the publish of 2 new nuget packages CoreWCF.Kafka
and CoreWCF.Kafka.Client
. The Kafka protocol implementation is provided by taking a dependency on the Confluent.Kafka
nuget package and the underlying librdkafka
C/C++ library.
Extensibility
Both server and client packages expose a KafkaBinding
that should be sufficient to configure security and transport to the broker.
However in certain scenario it could be useful to finegrain properties of Confluent.Kafka
/ librdkafka
, this can be achieved by creating a CustomBinding
and pulling out the KafkaTransportBindingElement
. This element exposes all the properties exposed by ConsumerConfig
and ProducerConfig
from Confluent.Kafka
.
var binding = new KafkaBinding(KafkaDeliverySemantics.AtMostOnce)
{
AutoOffsetReset = AutoOffsetReset.Earliest,
GroupId = "my-group"
};
var customBinding = new CustomBinding(binding);
KafkaTransportBindingElement transport = customBinding.Elements.Find<KafkaTransportBindingElement>();
transport.Debug = "all";
Security
The table below summarizes the mapping between Confluent.Kafka and KafkaBinding security modes.
SecurityProtocol | SaslMechanism | ClientCertfiicate | CoreWCF KafkaBinding configuration |
---|---|---|---|
Plaintext |
N/A | N/A | KafkaSecurityMode.None + KafkaCredentialType.None |
Ssl |
N/A | No | KafkaSecurityMode.Transport + KafkaCredentialType.None + requires configuring CaPem |
N/A | Yes | KafkaSecurityMode.Transport + KafkaCredentialType.SslKeyPairCertificate + requires configuring CaPem + providing a SslKeyPairCredential instance |
|
SaslPlaintext |
Gssapi |
N/A | supported through custom binding |
Plain |
N/A | KafkaSecurityMode.TransportCredentialOnly + KafkaCredentialType.SaslPlain + providing a SaslUsernamePasswordCredential instance |
|
ScramSha256 |
N/A | supported through custom binding | |
ScramSha512 |
N/A | supported through custom binding | |
OAuthBearer |
N/A | supported through custom binding | |
SaslSsl |
Gssapi |
N/A | supported through custom binding |
Plain |
N/A | KafkaSecurityMode.Transport + KafkaCredentialType.SaslPlain + requires configuring CaPem + providing a SaslUsernamePassword instance |
|
ScramSha256 |
N/A | supported through custom binding | |
ScramSha512 |
N/A | supported through custom binding | |
OAuthBearer |
N/A | supported through custom binding |
Getting started
First, configure CoreWCF to consume the topic my-topic
with consumer group id my-consumer-group
. To specify from which offset the consumer want to start consuming messages the AutoOffsetReset
property should be provided.
var builder = WebApplication.CreateBuilder();
builder.Services.AddServiceModelServices().AddQueueTransport()
var app = builder.Build();
app.UseServiceModel(serviceBuilder =>
{
services.AddService<Service>();
services.AddServiceEndpoint<Service, IService>(new CoreWCF.Kafka.KafkaBinding
{
AutoOffsetReset = AutoOffsetReset.Earliest,
DeliverySemantics = KafkaDeliverySemantics.AtMostOnce,
GroupId = "my-consumer-group"
}, $"net.kafka://localhost:9092/my-topic");
});
Then, configure a client to produce messages to topic my-topic
.
CoreWCF.ServiceModel.Channels.KafkaBinding kafkaBinding = new();
var factory = new System.ServiceModel.ChannelFactory<IService>(kafkaBinding,
new System.ServiceModel.EndpointAddress(new Uri($"net.kafka://localhost:9092/my-topic")));
IService channel = factory.CreateChannel();
await channel.CallServiceAsync(name);
DeliverySemantics
The delivery semantic can be configured at the binding level to AtLeastOnce
or AtMostOnce
.
CoreWCF.Kafka.KafkaBinding = new KafkaBinding
{
DeliverySemantics = KafkaDeliverySemantics.AtLeastOnce;
}
ErrorHandlingStrategy and DLQ support
The error handling strategy can be configured at the binding level to Ignore
or DeadLetterQueue
.
When specifying DeadLetterQueue
, DeadLetterQueueTopic
should also be provided.
CoreWCF.Kafka.KafkaBinding = new KafkaBinding
{
ErrorHandlingStrategy = KafkaErrorHandlingStrategy.DeadLetterQueue,
DeadLetterQueueTopic = "my-topic-DLQ",
}