IgniteDataStreamer führt dazu, dass die Überprüfung des Partitionsstatus fehlgeschlagen ist/PartitionsaktualisierungszähJava

Java-Forum
Anonymous
 IgniteDataStreamer führt dazu, dass die Überprüfung des Partitionsstatus fehlgeschlagen ist/Partitionsaktualisierungszäh

Post by Anonymous »

Welche Bedeutung hat diese WARN-Protokollierung in Apache Ignite?

Code: Select all

WARN  [sys-#78%IgniteInstance1%] (Log4J2Logger.java:523) Partition states validation has failed for group: Cache1, msg: Partitions update counters are inconsistent for Part 32...
Ich versuche, mehrere IgniteDataStreamer-Instanzen zu verwenden, um einen Cache nach dem Start meines Clusters vorab zu laden, wie hier beschrieben, und ich habe dieses minimierte Beispiel zusammengestellt, um zu reproduzieren, was ich sehe. Jeder IgniteRunnable/IgniteDataStreamer streamt einen eindeutigen Satz von Schlüsseln in den Cache.
Im Wesentlichen habe ich diese beiden Klassen:
IgniteServerMain.java

Code: Select all

public class IgniteServerMain {
private static final Logger log = LogManager.getLogger();

public static final String CACHE_NAME = "Cache1";
public static final String IGNITE_INSTANCE_NAME = "IgniteInstance1";
private static final String NODE_ID = System.getProperty("NODE_ID");
private static final int SERVER_NODES = 2;

public static void main(String[] args) {
try {
Ignition.start(getIgniteConfiguration());

if ("1".equals(NODE_ID)) {
waitForServerNodesToBeAvailable();
loadCache();
idleVerifyLoop();
}

while (true) {
sleep(10_000);
}
} catch (Exception e) {
log.error("{}", e, e);
System.exit(1);
}
}

private static IgniteConfiguration getIgniteConfiguration() {
CacheConfiguration cacheConfig = new CacheConfiguration()
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
.setBackups(1)
.setCacheMode(CacheMode.PARTITIONED)
.setName(CACHE_NAME)
.setReadThrough(false)
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.PRIMARY_SYNC);

DataRegionConfiguration defaultDataRegionConfiguration = new DataRegionConfiguration()
.setName("Default_Region")
.setMaxSize(1L * 1024 * 1024 * 1024)
.setInitialSize(1L * 1024 * 1024 * 1024);

DataStorageConfiguration dataStorageConfiguration = new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(defaultDataRegionConfiguration);

TcpCommunicationSpi tcpCommunicationSpi = new TcpCommunicationSpi();

TcpDiscoveryVmIpFinder tcpDiscoveryVmIpFinder = new TcpDiscoveryVmIpFinder()
.setAddresses(List.of("127.0.0.1:47500..47509"));

TcpDiscoverySpi tcpDiscoverySpi = new TcpDiscoverySpi()
.setIpFinder(tcpDiscoveryVmIpFinder);

return new IgniteConfiguration()
.setCacheConfiguration(cacheConfig)
.setCommunicationSpi(tcpCommunicationSpi)
.setDataStorageConfiguration(dataStorageConfiguration)
.setDiscoverySpi(tcpDiscoverySpi)
.setIgniteInstanceName(IGNITE_INSTANCE_NAME)
.setIncludeEventTypes(EventType.EVT_NODE_FAILED);
}

private static void waitForServerNodesToBeAvailable() {
log.info("waiting for server nodes");
Ignite ignite = Ignition.ignite(IGNITE_INSTANCE_NAME);
while (ignite.cluster().forServers().nodes().size() < SERVER_NODES) {
sleep(1);
}

log.info("server nodes are available!");
sleep(100);
}

private static void loadCache() {
Instant start = Instant.now();
log.info("starting {}", CACHE_NAME);

Ignite ignite = Ignition.ignite(IGNITE_INSTANCE_NAME);
IgniteCompute compute = ignite.compute();
List jobs = IntStream.range(0, 100)
.mapToObj(PreloadRunnable::new)
.map(compute::runAsync)
.collect(Collectors.toList());

jobs.forEach(IgniteFuture::get);

Duration duration = Duration.between(start, Instant.now());
log.info("finished: duration {}", duration);

int size = ignite.cache(CACHE_NAME).size();
log.info("cache {}, size {}", CACHE_NAME, size);
}

private static boolean idleVerify() {
Instant start = Instant.now();

MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName objectName = mbs.queryMBeans(null, null).stream()
.filter(objectInstance -> objectInstance.toString().contains("name=IdleVerify"))
.map(objectInstance -> objectInstance.getObjectName())
.findFirst()
.orElseThrow(() ->  new IllegalStateException("IdleVerify: MBean not found"));

try {
String result = (String) mbs.invoke(objectName, "invoke",
new Object[] { "", "", "", "", "" },
new String[] {});
log.info("IdleVerify: finished in {}", Duration.between(start, Instant.now()));

String[] resultSplit = result.split("\\R");
Stream.of(resultSplit).forEach(s -> log.info("IdleVerify: {}", s));
return "The check procedure has finished, no conflicts have been found."
.equals(resultSplit[resultSplit.length - 1]);
} catch (Exception e) {
log.error(e.toString(), e);
return false;
}
}

private static void idleVerifyLoop() {
Instant start = Instant.now();

while (!idleVerify()) {
if (Duration.between(start, Instant.now()).getSeconds() > 120) {
log.error("IdleVerifyLoop: exiting, there are still conflicts after 2 minutes of waiting");
return;
}
sleep(10_000);
}

Duration duration = Duration.between(start, Instant.now());
log.info("IdleVerifyLoop: finished in {}", duration);
}

private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
log.error(e.toString(), e);
Thread.interrupted();
throw new RuntimeException(e);
}
}
}
PreloadRunnable.java

Code: Select all

public class PreloadRunnable implements IgniteRunnable {
private static final long serialVersionUID = 1L;

private final int jobId;
private final Random random = new Random();

public PreloadRunnable(int jobId) {
this.jobId = jobId;
}

@Override
public void run() {
try (IgniteDataStreamer streamer = Ignition
.ignite(IgniteServerMain.IGNITE_INSTANCE_NAME)
.dataStreamer(IgniteServerMain.CACHE_NAME)) {

for (int v = 0; v < 10_000; v++) {
char randomLetter = (char) ('A' + random.nextInt(26));
String k = randomLetter + "-" + String.format("%06d", jobId) + "-" + String.format("%06d", v);
streamer.addData(k, v);
}
}
}
}
Einige Szenarien:
  • Ich beobachte die WARN-Protokollierung, wann immer ich Knoten 1 starte vor Knoten 2. In diesem Szenario beginnt Knoten 1 mit dem Streamen von Daten in den Cache, kurz nachdem er erkennt, dass Knoten 2 dem Cluster beigetreten ist, und das Datenstreaming verzahnt sich mit einem Partition Map Exchange.
  • Wenn ich die Pause am Ende von „waitForServerNodesToBeAvailable von 100 Millisekunden auf 10 Sekunden ändere, wird die WARN-Protokollierung nicht angezeigt. Dadurch erhält die anfängliche PME Zeit, vor dem LoadCache fertig zu werden.
  • Wenn ich die Pause auf 100 ms zurücksetze und Knoten 1 nach Knoten 2 starte, dann kehrt die Ignition.start-Methode von Knoten 1 erst zurück, nachdem die anfängliche PME abgeschlossen ist, und daher sehe ich auch nicht WARN-Protokollierung in diesem Szenario.
In meinem Beispielcode habe ich nach LoadCache eine Schleife hinzugefügt, die alle 10 Sekunden programmgesteuert die IdleVerify-JMX-Methode von Ignite aufruft. In Szenario 1 bestätigt der erste IdleVerify-Aufruf, dass die Aktualisierungszähler inkonsistent sind. Etwas später sehe ich normalerweise eine Protokollierung, die darauf hinweist, dass die PME abgeschlossen ist. Und wenn dann der zweite IdleVerify-Aufruf ausgeführt wird, wird gemeldet, dass keine Konflikte gefunden wurden. Es scheint also, dass diese Partitionsaktualisierungszähler irgendwann konsistent sind. Ich habe außerdem festgestellt, dass, wenn ich die Datenmenge erhöhe, die LoadCache in den Cache streamt, es länger dauert, bis die erste PME abgeschlossen ist. Daher kann es viele Iterationen meiner IdleVerify-Schleife dauern, bis gemeldet wird, dass keine Konflikte vorliegen.
Fragen:
  • Was ist hier los? Die IgniteDataStreamer-API gibt an, dass „der Datenstreamer die Datenkonsistenz nicht garantiert, bis er erfolgreich abgeschlossen wurde“, aber dies scheint im Widerspruch zu den inkonsistenten Update-Zählern zu stehen, die ich sehe.
  • Welche Bedeutung haben diese inkonsistenten Update-Zähler? Besteht in diesem Zustand die Möglichkeit eines Datenverlusts? Und ist es richtig, dass diese Aktualisierungszähler immer letztendlich konsistent sein werden?
  • Gibt es eine bessere Möglichkeit für Knoten 1, auf den Beitritt der anderen Knoten zu warten und sicherzustellen, dass die anfängliche PME abgeschlossen ist, bevor sie mit dem Streamen von Daten in den Cache beginnt? Unter der Annahme, dass die Topologie stabil ist (dies ist möglicherweise eine schlechte Annahme), scheint dies das gesamte Problem mit inkonsistenten Update-Zählern zu vermeiden.

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post