Hier ist ein Beispielcode, den ich verwende:
Code: Select all
using Confluent.Kafka;
using System.Text.Json;
namespace FirstKafkaProducer
{
public class Program
{
public static async Task Main()
{
CancellationTokenSource cts = new CancellationTokenSource();
var task1 = Task.Run(() => Consume(cts.Token));
var task2 = Task.Run(() => ReadKey());
await Task.WhenAny(task1, task2);
cts.Cancel();
await Task.WhenAll(task1, task2);
Console.WriteLine("Exiting...");
}
private static void Consume(CancellationToken token)
{
var config = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "CountryCounter",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false,
};
using (var consumer = new ConsumerBuilder(config)
.SetPartitionsRevokedHandler((c, p) =>
{
Console.WriteLine("Partitions revoked...");
Commit(p);
})
.SetPartitionsAssignedHandler((c, p) =>
{
Console.WriteLine("Partitions assigned...");
foreach (var assignment in p)
Console.WriteLine($"{assignment.Topic}:{assignment.Partition}");
if (File.Exists("offset.txt"))
{
var offsets = JsonSerializer.Deserialize(File.ReadAllText("offset.txt"));
if (offsets != null)
{
foreach (var tp in p)
{
var offset = offsets.SingleOrDefault(o => o.Topic == tp.Topic && o.Partition == tp.Partition.Value);
if (offset != null)
c.Seek(new TopicPartitionOffset(tp, new Offset(offset.Offset))); // new OffsetDto { Topic = o.Topic, Partition = o.Partition.Value, Offset = o.Offset.Value }).ToArray()));
Console.WriteLine("Commited...");
}
catch (KafkaException) { }
}
}
}
public class OffsetDto
{
public string Topic { get; set; } = string.Empty;
public int Partition { get; set; }
public long Offset { get; set; }
}
Was ist los?