Wenn ich das verwende Roher SQL-Befehl, es funktioniert:
Code: Select all
using Confluent.Kafka;
using Microsoft.EntityFrameworkCore;
using System.Text.Json;
using System.Text.Json.Nodes;
string connectionString = "Server=localhost; Database=pubs; User ID=***; Password=***; TrustServerCertificate=True;";
string bootstrapServers = "localhost:9092";
string topicName = "cbsource";
var consumerConfig = new ConsumerConfig
{
BootstrapServers = bootstrapServers,
GroupId = "CB_SQL_consumer_EF_Core",
AutoOffsetReset = AutoOffsetReset.Earliest
};
var cancellationTokenSource = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
cancellationTokenSource.Cancel();
};
try
{
using (var consumer = new ConsumerBuilder(consumerConfig).Build())
{
consumer.Subscribe(topicName);
var dbContext = new PubDbContext(connectionString);
while (!cancellationTokenSource.Token.IsCancellationRequested)
{
try
{
var consumeResult = consumer.Consume(cancellationTokenSource.Token);
if (consumeResult.Message.Value != null)
{
Console.WriteLine($"Consumed message: {consumeResult.Message.Value}");
JsonObject? jsonObject = JsonSerializer.Deserialize(consumeResult.Message.Value);
if (jsonObject != null)
{
int? outerTableColumn1Value = null;
string? outerTableColumn2Value = null;
int? innerTableColumn1Value = null;
string? innerTableColumn2Value = null;
outerTableColumn1Value = int.TryParse(jsonObject["outer1"]?.ToString(), out int parsedNumber)
? parsedNumber : null;
outerTableColumn2Value = jsonObject["outer2"]?.ToString();
var inner1Value = jsonObject["inner"]?["inner1"]?.ToString();
var inner2Value = jsonObject["inner"]?["inner2"]?.ToString();
if (!string.IsNullOrEmpty(inner1Value) || !string.IsNullOrEmpty(inner2Value))
{
innerTableColumn1Value = int.TryParse(inner1Value, out parsedNumber)
? parsedNumber : null;
innerTableColumn2Value = inner2Value;
Console.WriteLine("Inner table exists.");
}
else
{
Console.WriteLine("Inner table does not exist.");
}
var transaction = dbContext.Database.BeginTransaction();
try
{
string sqlCommand = @"SELECT TOP 1 * FROM OuterTable WHERE Outer1 = {0}";
string updateCommand = "";
string insertCommand = "";
var outerEntity = await dbContext.OuterTable
.FromSqlRaw(sqlCommand, outerTableColumn1Value)
.FirstOrDefaultAsync();
if (outerEntity != null)
{
updateCommand = @"UPDATE OuterTable SET Outer2 = {1} WHERE Outer1 = {0}";
#pragma warning disable CS8604
await dbContext.Database.ExecuteSqlRawAsync(updateCommand,
outerTableColumn1Value, outerTableColumn2Value);
#pragma warning restore CS8604
Console.WriteLine("Outer table row updated.");
}
else
{
insertCommand = @"INSERT INTO OuterTable (Outer1, Outer2) VALUES ({0}, {1})";
#pragma warning disable CS8604
await dbContext.Database.ExecuteSqlRawAsync(insertCommand,
outerTableColumn1Value, outerTableColumn2Value);
#pragma warning restore CS8604
Console.WriteLine("Outer table row inserted.");
}
sqlCommand = @"SELECT TOP 1 * FROM InnerTable WHERE Inner1 = {0}";
var innerEntity = await dbContext.InnerTable
.FromSqlRaw(sqlCommand, innerTableColumn1Value)
.FirstOrDefaultAsync();
if (innerEntity != null)
{
updateCommand = @"UPDATE InnerTable SET Inner2 = {1} WHERE Inner1 = {0}";
#pragma warning disable CS8604
await dbContext.Database.ExecuteSqlRawAsync(updateCommand,
innerTableColumn1Value, innerTableColumn2Value);
#pragma warning restore CS8604
Console.WriteLine("Inner table row updated.");
}
else
{
insertCommand = @"INSERT INTO InnerTable (Inner1, Inner2) VALUES ({0}, {1})";
#pragma warning disable CS8604
await dbContext.Database.ExecuteSqlRawAsync(insertCommand,
innerTableColumn1Value, innerTableColumn2Value);
#pragma warning restore CS8604
Console.WriteLine("Inner table row inserted.");
}
await transaction.CommitAsync();
}
catch (Exception ex)
{
await transaction.RollbackAsync();
Console.WriteLine($"Error: {ex.Message}. Transaction rolled back.");
}
}
}
}
catch (ConsumeException e)
{
Console.WriteLine($"Error during consume: {e.Error.Reason}");
}
}
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Consumer has been cancelled.");
}
public class PubDbContext : DbContext
{
private readonly string _connectionString;
public PubDbContext(string connectionString)
{
_connectionString = connectionString;
}
public DbSet OuterTable { get; set; }
public DbSet InnerTable { get; set; }
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
optionsBuilder.UseSqlServer(_connectionString);
}
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.Entity().HasNoKey();
modelBuilder.Entity().HasNoKey();
}
}
public class OuterTable
{
public int? Outer1 { get; set; }
public string? Outer2 { get; set; }
}
public class InnerTable
{
public int? Inner1 { get; set; }
public string? Inner2 { get; set; }
}
Jedenfalls stammt die Quelle aus Couchbase-JSON-Dokumenten (wobei Kafka zum Empfangen verwendet wird). Nachrichten) wie folgt:

Ich habe es also mit verschachteltem JSON zu tun Dokumente. Einige davon haben die innere Struktur.
Jedenfalls wurde mir von anderen gesagt, dass der obige Code vom Geist von ORM abweicht, da man es vermeiden sollte, rohe SQL-Befehle im Code auszuführen. Also bin ich auf Folgendes gestoßen.
Code: Select all
using Confluent.Kafka;
using Microsoft.EntityFrameworkCore;
using System.Text.Json;
using System.Text.Json.Nodes;
string connectionString = "Server=localhost; Database=pubs; User ID=***; Password=***; TrustServerCertificate=True;";
string bootstrapServers = "localhost:9092";
string topicName = "cbsource";
var consumerConfig = new ConsumerConfig
{
BootstrapServers = bootstrapServers,
GroupId = "CB_SQL_consumer_ORM",
AutoOffsetReset = AutoOffsetReset.Earliest
};
var cancellationTokenSource = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true;
cancellationTokenSource.Cancel();
};
var optionsBuilder = new DbContextOptionsBuilder();
optionsBuilder.UseSqlServer(connectionString);
try
{
using (var consumer = new ConsumerBuilder(consumerConfig).Build())
{
consumer.Subscribe(topicName);
int i = 0;
int j = 0;
while (!cancellationTokenSource.Token.IsCancellationRequested)
{
try
{
var consumeResult = consumer.Consume(cancellationTokenSource.Token);
if (consumeResult.Message.Value != null)
{
Console.WriteLine($"Consumed message: {consumeResult.Message.Value}");
JsonObject? jsonObject = JsonSerializer.Deserialize(consumeResult.Message.Value);
if (jsonObject != null)
{
int outerTableColumn1Value = int.TryParse(jsonObject["outer1"]?.ToString(), out int outer1Parsed) ? outer1Parsed : --i;
string? outerTableColumn2Value = jsonObject["outer2"]?.ToString();
string? inner1Value = jsonObject["inner"]?["inner1"]?.ToString();
string? inner2Value = jsonObject["inner"]?["inner2"]?.ToString();
int innerTableColumn1Value = int.TryParse(inner1Value, out int inner1Parsed) ? inner1Parsed : --j;
string? innerTableColumn2Value = inner2Value;
using (var context = new ApplicationDbContext(optionsBuilder.Options))
{
using (var transaction = context.Database.BeginTransaction())
{
try
{
var outerRow = await context.OuterTable
.FirstOrDefaultAsync(o => o.Outer1 == outerTableColumn1Value);
if (outerRow != null)
{
outerRow.Outer2 = outerTableColumn2Value;
context.OuterTable.Update(outerRow);
Console.WriteLine("Outer table row updated.");
}
else
{
var outerTable = new OuterTable
{
Outer1 = outerTableColumn1Value,
Outer2 = outerTableColumn2Value
};
await context.OuterTable.AddAsync(outerTable);
Console.WriteLine("Outer table row inserted.");
}
var innerRow = await context.InnerTable
.FirstOrDefaultAsync(o => o.Inner1 == innerTableColumn1Value);
if (innerRow != null)
{
innerRow.Inner2 = innerTableColumn2Value;
context.InnerTable.Update(innerRow);
Console.WriteLine("Inner table row updated.");
}
else
{
var innerTable = new InnerTable
{
Inner1 = innerTableColumn1Value,
Inner2 = innerTableColumn2Value
};
await context.InnerTable.AddAsync(innerTable);
Console.WriteLine("Inner table row inserted.");
}
await context.SaveChangesAsync();
await transaction.CommitAsync();
}
catch (Exception ex)
{
transaction.Rollback();
Console.WriteLine($"Error: {ex.Message}. Transaction rolled back.");
}
}
}
}
}
}
catch (ConsumeException e)
{
Console.WriteLine($"Error during consume: {e.Error.Reason}");
}
}
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Consumer has been cancelled.");
}
public class ApplicationDbContext : DbContext
{
public DbSet OuterTable { get; set; }
public DbSet InnerTable { get; set; }
public ApplicationDbContext(DbContextOptions options) : base(options) { }
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.Entity().HasKey(o => o.Outer1);
modelBuilder.Entity().HasKey(i => i.Inner1);
}
}
public class OuterTable
{
public int? Outer1 { get; set; }
public string? Outer2 { get; set; }
}
public class InnerTable
{
public int? Inner1 { get; set; }
public string? Inner2 { get; set; }
}
eingegeben
Code: Select all
modelBuilder.Entity().HasKey(o => o.Outer1);
modelBuilder.Entity().HasKey(i => i.Inner1);
Ich habe einen Beitrag gesehen, der die Verwendung zusammengesetzter Schlüssel vorschlägt, aber die Frage ist, dass einige JSON-Dokumente keine äußeren Schlüssel haben /inner Struktur überhaupt, aber ich bestehe immer noch darauf, diesen NULL-Werte zuzuweisen. Daher scheint das Zusammensetzen von Schlüsseln in meinem Fall nicht zu funktionieren.
Hier hänge ich auch die Tabellen im SQL Server an, die durch meinen ersten Code generiert wurden, und was ich wirklich will.

