Anonymous
Der Fall wird zweimal erstellt, indem die API in Spring/Java zweimal aufgerufen wird [geschlossen]
Post
by Anonymous » 05 Jan 2026, 14:11
Wir haben Code wie diesen, der die API im Rahmen des Abfragemechanismus dreimal wiederholt. Falls es nicht abgeschlossen wird, wird ein Fall erstellt, indem eine andere API aufgerufen wird. Was passiert, ist, dass ich in den Protokollen zwei Threads sehe – den Hauptthread und den Thread-Pool, der neue Fälle erstellt. Daher wird die Tracking-ID aus dem Thread-Pool thehead auf null gesetzt und somit doppelte Threads erstellt.
Hier ist ein relevanter Teil des Codes. LLM schlägt mir vor, eine Zeitüberschreitung unter dem Abbruch-Aufruf hinzuzufügen, aber eine andere API, bei der es keine Zeitüberschreitung gibt, ist damit konfrontiert.
Hauptthread
Code: Select all
if (entity != null && entity.getId() != null) {
Future future = null;
try {
log.info("Waiting for polling result for id : {} trackingId: {}", entity.getId(), trackingId);
OrderRequest finalEntity = entity;
String finalProductOrderId = productOrderId;
Callable task = () -> checkGetStatusAPIForCeaseLine(customerId, finalProductOrderId, routeEnv, sc, trackingId, finalEntity, req);
future = executor.submit(task);
CeaseLineResponse result = future.get(futureTimeoutSecondsForCeaseLine, TimeUnit.SECONDS);
return result;
} catch (TimeoutException te) {
log.warn("Polling did not complete in time for id: {} trackingId: {}", entity.getId(), trackingId);
future.cancel(true);
return handleAmaCaseForCeaseLineTimeout(customerId, entity, productOrderId, req, httpHeaders, trackingId);
} catch (Exception e) {
log.error("Error while waiting for poll result", e);
return CeaseLineResponse.builder()
.httpCode(500)
.message("FAILED")
.orderNo(entity.getBillerOrderNo())
.orderActionId(entity.getBillerOrderActionNo())
.build();
}
}
Executor-Aufruf
Code: Select all
@Transactional
public CeaseLineResponse checkGetStatusAPIForCeaseLine(String customerId, String productOrderId, String routeEnv, String sc, String trackingId, OrderRequest entity, CreateAndSubmitSuspendResumeCeaseOrderRequest req) {
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.add(HEADER_TRACKING_ID, trackingId);
httpHeaders.add(HEADER_ROUTE_ENV, routeEnv);
httpHeaders.add(HEADER_SOURCE_SYSTEM_ID, "MATRIX-CEASE-POLL");
httpHeaders.add(HEADER_SOURCE_SERVER_ID, "MATRIX-CEASE-POLL");
httpHeaders.add(HEADER_SERVICE_NAME, "OrderStatus");
httpHeaders.add(HEADER_CACHE_CONTROL, HEADER_CACHE_CONTROL_NO_CACHE);
httpHeaders.add(HEADER_SERVICE_OPERATION_NAME, "MATRIX-CEASE-POLL");
httpHeaders.add(HEADER_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
for (int i = 0; i < maxAttemptForCeaseLine; i++) {
// wait before polling
try {
Thread.sleep(ceaseLineSubmittedPlan[i] * 1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.info("Polling interrupted due to timeout, trackingId: {}", trackingId);
return null;
}
try {
long start = System.nanoTime();
OrderStatusResponse orderStatus = amdocs.getOrderStatus(customerId, productOrderId, sc, trackingId, httpHeaders);
long end = System.nanoTime();
double durationSeconds = (end - start) / 1_000_000_000.0;
log.info("Polling attempt {} for getCeaseLineOrderStatus. Execution time: {} seconds, trackingId: {}", i + 1, durationSeconds, trackingId);
String status = orderStatus != null && orderStatus.getMessage() != null ? orderStatus.getMessage().getStatus() : null;
log.info("Polled status: {} for cease line, trackingId: {}", status, trackingId);
if ("CLOSED".equalsIgnoreCase(status)) {
entity.setStatus(OrderStatus.SUCCESS);
entity.setStatusUpdateDate(Instant.now());
entity.setStatusUpdateSource("API");
entity.setAttemptCount(i + 1);
final OrderRequest save = ceaseOrderRepo.save(entity);
log.info("Obtained CLOSED status for cease line. Updated row: {} trackingId: {}", save, entity.getTrackingId());
return CeaseLineResponse.builder()
.httpCode(200)
.message("SUCCESS")
.orderNo(entity.getBillerOrderNo())
.orderActionId(entity.getBillerOrderActionNo())
.build();
}
} catch (Exception ex) {
entity.setStatus(OrderStatus.FAILED);
entity.setRejectReason("Poll error: " + checkError(ex));
entity.setAttemptCount(i + 1);
final OrderRequest save = ceaseOrderRepo.save(entity);
log.info("Obtained FAILED status for cease line. Updated row: {} trackingId: {}", save, entity.getTrackingId());
return CeaseLineResponse.builder()
.httpCode(500)
.message("FAILED")
.orderNo(entity.getBillerOrderNo())
.orderActionId(entity.getBillerOrderActionNo())
.messageDescription(entity.getRejectReason())
.build();
}
}
entity.setAttemptCount(maxAttemptForReplaceDevice);
return handleAmaCaseForCeaseLineTimeout(customerId, entity, productOrderId, req, httpHeaders, trackingId);
}
private CeaseLineResponse handleAmaCaseForCeaseLineTimeout(
String customerId,
OrderRequest entity,
String productOfferId,
CreateAndSubmitSuspendResumeCeaseOrderRequest req,
HttpHeaders httpHeaders,
String trackingId
) {
log.info("Creating case in AMA for stuck cease order. Order ID: {}, Action ID: {}, trackingId: {}", entity.getBillerOrderNo(), entity.getBillerOrderActionNo(),
trackingId);
String bpmOrderId = req.getPayload().getBpmOrderInputX9() != null
? req.getPayload().getBpmOrderInputX9().getBpmOrderIdx9()
: null;
String caseNumber = maybeCreateAmaCase(customerId, entity, productOfferId, bpmOrderId, httpHeaders);
entity.setRejectReason("Order created but still processing - case Number: " + caseNumber);
entity.setStatus(OrderStatus.SUBMITTED);
final OrderRequest save = ceaseOrderRepo.save(entity);
log.info("SUBMITTED (polling finished) for cease line. Updated row: {} trackingId: {}", save, entity.getTrackingId());
return buildAcceptedOrderResponse202(entity.getBillerOrderNo(), entity.getBillerOrderActionNo(), "Order created but still processing", caseNumber);
}
Dies geschieht in der Anfrage und beim Erstellen eines neuen Falls mithilfe der API
Code: Select all
u_correlation_id\":\"null:9001962483:9001962484
Dies ist, was LLM mir vorgeschlagen hat, nach dem Abbruch-Aufruf hinzuzufügen:
Code: Select all
// Add a small delay to ensure the polling thread has stopped
try {
Thread.sleep(100);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
und eine unterbrochene Prüfung hier"
Code: Select all
for (int i = 0; i < maxAttemptForCeaseLine; i++) {
// Check if thread has been interrupted/cancelled
if (Thread.currentThread().isInterrupted()) {
log.info("Polling cancelled for trackingId: {}", trackingId);
return null;
}
Was läuft hier schief? Irgendwelche Gedanken?
Bearbeiten: Dieses Verhalten wird nur bei tryCount=3 beobachtet und nicht für 0,1 und 2.
Bearbeiten 2 : Minimales Beispiel hinzufügen. Dies ist jedoch nicht in der Lage, die Rennbedingung zu reproduzieren
Code: Select all
package com.order.service.concurrent;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.BeforeEach;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.junit.MockitoJUnitRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
@RunWith(MockitoJUnitRunner.class)
public class DoubleCaseCreationTest {
private static final Logger log = LoggerFactory.getLogger(DoubleCaseCreationTest.class);
@Mock
private Service Service;
@Mock
private CeaseOrderRepository ceaseOrderRepo;
private ExecutorService executor;
private AtomicInteger amaCaseCallCount;
private CeaseOrderService ceaseOrderService;
// Test configuration
private static final int TIMEOUT_SECONDS = 1;
private static final int POLLING_ATTEMPTS = 3;
private static final int[] POLLING_INTERVALS = {1,1,1}; // 1 second each = 5 seconds total
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
executor = Executors.newFixedThreadPool(5);
amaCaseCallCount = new AtomicInteger(0);
// Mock repository save
when(ceaseOrderRepo.save(any(com..matrix.order.service.entity.OrderRequest.class)))
.thenAnswer(invocation -> invocation.getArgument(0));
// Mock to return IN_PROGRESS (never completes)
OrderStatusResponse inProgressResponse = new OrderStatusResponse();
Message message = new Message();
message.setStatus("IN_PROGRESS");
inProgressResponse.setMessage(message);
when(Service.getOrderStatus(anyString(), anyString(), anyString(), anyString(), any()))
.thenReturn(inProgressResponse);
ceaseOrderService = new CeaseOrderService(
Service,
ceaseOrderRepo,
executor,
TIMEOUT_SECONDS,
POLLING_ATTEMPTS,
POLLING_INTERVALS
);
}
@After
public void tearDown() {
executor.shutdownNow();
try {
executor.awaitTermination(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
/**
* TEST 1: Demonstrates the BUG - Two AMA cases are created
*
* Timeline:
* T=0s: Main thread submits polling task
* T=0-2s: Executor thread polls (attempts 1-2)
* T=2s: Main thread times out
* T=2s: Main thread creates AMA case #1
* T=2-5s: Executor thread continues polling (attempts 3-5)
* T=5s: Executor thread exhausts retries, creates AMA case #2
*
* Result: 2 cases created
*/
@Test
public void testBuggyVersion_CreatesTwoCases() throws Exception {
log.info("\n========================================");
log.info("TEST: BUGGY VERSION - Expects 2 Cases");
log.info("========================================\n");
// Reset counter
amaCaseCallCount.set(0);
// Track AMA case creation calls
AtomicInteger mainThreadCalls = new AtomicInteger(0);
AtomicInteger executorThreadCalls = new AtomicInteger(0);
ceaseOrderService.setAmaCaseCreationCallback((threadName) -> {
int count = amaCaseCallCount.incrementAndGet();
log.warn("AMA CASE #{} CREATED by thread: {}", count, threadName);
if (threadName.contains("main") || threadName.contains("Test worker")) {
mainThreadCalls.incrementAndGet();
} else if (threadName.contains("pool")) {
executorThreadCalls.incrementAndGet();
}
});
// Create test data
com..matrix.order.service.entity.OrderRequest entity = createTestOrder();
CreateAndSubmitSuspendResumeCeaseOrderRequest request = new CreateAndSubmitSuspendResumeCeaseOrderRequest();
// Execute BUGGY version
long startTime = System.currentTimeMillis();
CeaseLineResponse response = ceaseOrderService.createCeaseOrderBuggy(
"customer123",
entity,
"order123",
request,
"demo",
"sc123",
"track123"
);
long endTime = System.currentTimeMillis();
log.info("\n--- BUGGY VERSION Response ---");
log.info("Execution time: {}ms", (endTime - startTime));
log.info("Response code: {}", response.getHttpCode());
log.info("Response message: {}", response.getMessage());
// Wait for executor thread to finish
log.info("\nWaiting for executor thread to complete...");
//Thread.sleep(4000); // Wait for remaining polling attempts
// Results
int totalCases = amaCaseCallCount.get();
log.info("\n========================================");
log.info("BUGGY VERSION RESULTS:");
log.info("Total AMA cases created: {}", totalCases);
log.info("Cases by main thread: {}", mainThreadCalls.get());
log.info("Cases by executor thread: {}", executorThreadCalls.get());
log.info("========================================\n");
// Assertions - BUGGY version creates 2 cases
assertEquals("BUGGY: Should create 2 cases (demonstrating the bug)", 2, totalCases);
assertEquals("Main thread should create 1 case", 1, mainThreadCalls.get());
assertEquals("Executor thread should create 1 case", 1, executorThreadCalls.get());
assertEquals("Should return 202 ACCEPTED", 202, response.getHttpCode());
}
private com..matrix.order.service.entity.OrderRequest createTestOrder() {
com..matrix.order.service.entity.OrderRequest entity = new com..matrix.order.service.entity.OrderRequest();
entity.setId(1L);
entity.setBillerOrderNo("9001962483");
entity.setBillerOrderActionNo("9001962484");
entity.setTrackingId("testTrack123");
entity.setStatus(com..matrix.order.service.domain.OrderStatus.REJECTED);
return entity;
}
}
/**
* Service class that implements both BUGGY and FIXED versions
*/
class CeaseOrderService {
private static final Logger log = LoggerFactory.getLogger(CeaseOrderService.class);
private final Service Service;
private final CeaseOrderRepository ceaseOrderRepo;
private final ExecutorService executor;
private final int timeoutSeconds;
private final int maxPollingAttempts;
private final int[] pollingIntervals;
// Callback for testing
private java.util.function.Consumer amaCaseCreationCallback;
public CeaseOrderService(Service Service,
CeaseOrderRepository ceaseOrderRepo,
ExecutorService executor,
int timeoutSeconds,
int maxPollingAttempts,
int[] pollingIntervals) {
this.Service = Service;
this.ceaseOrderRepo = ceaseOrderRepo;
this.executor = executor;
this.timeoutSeconds = timeoutSeconds;
this.maxPollingAttempts = maxPollingAttempts;
this.pollingIntervals = pollingIntervals;
}
public void setAmaCaseCreationCallback(java.util.function.Consumer callback) {
this.amaCaseCreationCallback = callback;
}
/**
* BUGGY VERSION - Does not cancel future properly
*/
public CeaseLineResponse createCeaseOrderBuggy(
String customerId,
com..matrix.order.service.entity.OrderRequest entity,
String productOrderId,
CreateAndSubmitSuspendResumeCeaseOrderRequest req,
String routeEnv,
String sc,
String trackingId) {
if (entity != null && entity.getId() != null) {
Future future = null;
try {
log.info("[BUGGY] Submitting polling task - trackingId: {}", trackingId);
Callable task = new Callable() {
@Override
public CeaseLineResponse call() throws Exception {
return checkGetStatusAPIBuggy(customerId, productOrderId, routeEnv, sc, trackingId, entity, req);
}
};
future = executor.submit(task);
CeaseLineResponse result = future.get(timeoutSeconds, TimeUnit.SECONDS);
return result;
} catch (TimeoutException te) {
log.warn("[BUGGY] Timeout occurred! Main thread creating case - trackingId: {}", trackingId);
future.cancel(true);
// BUG: No future cancellation!
// Executor thread continues running and will create its own case
return handleAmaCaseForCeaseLineTimeout(customerId, entity, productOrderId, trackingId);
} catch (Exception e) {
log.error("[BUGGY] Error: {}", e.getMessage());
return buildFailedResponse(entity);
}
}
return null;
}
/**
* FIXED VERSION - Properly cancels future
*/
public CeaseLineResponse createCeaseOrderFixed(
String customerId,
com..matrix.order.service.entity.OrderRequest entity,
String productOrderId,
CreateAndSubmitSuspendResumeCeaseOrderRequest req,
String routeEnv,
String sc,
String trackingId) {
if (entity != null && entity.getId() != null) {
Future future = null;
try {
log.info("[FIXED] Submitting polling task - trackingId: {}", trackingId);
Callable task = new Callable() {
@Override
public CeaseLineResponse call() throws Exception {
return checkGetStatusAPIFixed(customerId, productOrderId, routeEnv, sc, trackingId, entity, req);
}
};
future = executor.submit(task);
CeaseLineResponse result = future.get(timeoutSeconds, TimeUnit.SECONDS);
return result;
} catch (TimeoutException te) {
log.warn("[FIXED] Timeout occurred! Cancelling future - trackingId: {}", trackingId);
// FIX: Cancel the future
if (future != null) {
boolean cancelled = future.cancel(true);
log.info("[FIXED] Future cancelled: {}", cancelled);
}
// FIX: Wait for task to stop
if (future != null) {
try {
future.get(200, TimeUnit.MILLISECONDS);
} catch (CancellationException ce) {
log.info("[FIXED] Task was successfully cancelled");
} catch (TimeoutException | ExecutionException | InterruptedException e) {
log.warn("[FIXED] Task did not stop in time: {}", e.getMessage());
}
}
return handleAmaCaseForCeaseLineTimeout(customerId, entity, productOrderId, trackingId);
} catch (Exception e) {
log.error("[FIXED] Error: {}", e.getMessage());
return buildFailedResponse(entity);
}
}
return null;
}
/**
* BUGGY polling - doesn't respect interruption
*/
private CeaseLineResponse checkGetStatusAPIBuggy(
String customerId,
String productOrderId,
String routeEnv,
String sc,
String trackingId,
com..matrix.order.service.entity.OrderRequest entity,
CreateAndSubmitSuspendResumeCeaseOrderRequest req) {
log.info("[BUGGY POLL] 🔄 Starting polling in executor thread - trackingId: {}", trackingId);
for (int i = 0; i < maxPollingAttempts; i++) {
try {
log.info("[BUGGY POLL] Attempt {}/{} - sleeping {}s", i+1, maxPollingAttempts, pollingIntervals[i]);
Thread.sleep(pollingIntervals[i] * 1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// BUG: Doesn't return, continues execution!
log.warn("[BUGGY POLL] Interrupted but continuing...");
return null;
}
try {
Thread.sleep(7000L);
OrderStatusResponse orderStatus = Service.getOrderStatus(
customerId, productOrderId, sc, trackingId, null);
String status = orderStatus != null && orderStatus.getMessage() != null
? orderStatus.getMessage().getStatus() : null;
if ("CLOSED".equalsIgnoreCase(status)) {
entity.setStatus(com..matrix.order.service.domain.OrderStatus.SUCCESS);
ceaseOrderRepo.save(entity);
log.info("[BUGGY POLL] Order completed successfully");
return buildSuccessResponse(entity);
}
} catch (Exception ex) {
log.error("[BUGGY POLL] Error polling: {}", ex.getMessage());
}
}
log.warn("[BUGGY POLL] All attempts exhausted, executor thread creating case!");
return handleAmaCaseForCeaseLineTimeout(customerId, entity, productOrderId, trackingId);
}
/**
* FIXED polling - respects interruption
*/
private CeaseLineResponse checkGetStatusAPIFixed(
String customerId,
String productOrderId,
String routeEnv,
String sc,
String trackingId,
com..matrix.order.service.entity.OrderRequest entity,
CreateAndSubmitSuspendResumeCeaseOrderRequest req) {
log.info("[FIXED POLL] Starting polling in executor thread - trackingId: {}", trackingId);
for (int i = 0; i < maxPollingAttempts; i++) {
// FIX: Check for interruption at start of loop
if (Thread.currentThread().isInterrupted()) {
log.info("[FIXED POLL] Detected interruption at loop start, exiting cleanly");
return null;
}
try {
log.info("[FIXED POLL] Attempt {}/{} - sleeping {}s", i+1, maxPollingAttempts, pollingIntervals[i]);
Thread.sleep(pollingIntervals[i] * 1000L);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// FIX: Return immediately on interruption
log.info("[FIXED POLL] Interrupted during sleep, exiting immediately");
return null;
}
try {
OrderStatusResponse orderStatus = Service.getOrderStatus(
customerId, productOrderId, sc, trackingId, null);
String status = orderStatus != null && orderStatus.getMessage() != null
? orderStatus.getMessage().getStatus() : null;
if ("CLOSED".equalsIgnoreCase(status)) {
entity.setStatus(com..matrix.order.service.domain.OrderStatus.SUCCESS);
ceaseOrderRepo.save(entity);
log.info("[FIXED POLL] Order completed successfully");
return buildSuccessResponse(entity);
}
} catch (Exception ex) {
log.error("[FIXED POLL] Error polling: {}", ex.getMessage());
}
}
log.warn("[FIXED POLL] All attempts exhausted, creating case");
return handleAmaCaseForCeaseLineTimeout(customerId, entity, productOrderId, trackingId);
}
private CeaseLineResponse handleAmaCaseForCeaseLineTimeout(
String customerId,
com..matrix.order.service.entity.OrderRequest entity,
String productOrderId,
String trackingId) {
String threadName = Thread.currentThread().getName();
log.info("📋 Creating AMA case from thread: {}", threadName);
// Notify callback for testing
if (amaCaseCreationCallback != null) {
amaCaseCreationCallback.accept(threadName);
}
String caseNumber = "CASE-" + System.currentTimeMillis();
entity.setRejectReason("Order created but still processing - case: " + caseNumber);
entity.setStatus(com..matrix.order.service.domain.OrderStatus.SUBMITTED);
ceaseOrderRepo.save(entity);
return CeaseLineResponse.builder()
.httpCode(202)
.message("ACCEPTED")
.messageDescription("Case created: " + caseNumber)
.orderNo(entity.getBillerOrderNo())
.orderActionId(entity.getBillerOrderActionNo())
.build();
}
private CeaseLineResponse buildSuccessResponse(com..matrix.order.service.entity.OrderRequest entity) {
return CeaseLineResponse.builder()
.httpCode(200)
.message("SUCCESS")
.orderNo(entity.getBillerOrderNo())
.orderActionId(entity.getBillerOrderActionNo())
.build();
}
private CeaseLineResponse buildFailedResponse(com..matrix.order.service.entity.OrderRequest entity) {
return CeaseLineResponse.builder()
.httpCode(500)
.message("FAILED")
.orderNo(entity.getBillerOrderNo())
.orderActionId(entity.getBillerOrderActionNo())
.build();
}
}
// Supporting classes and mocks
interface Service {
OrderStatusResponse getOrderStatus(String customerId, String productOrderId,
String sc, String trackingId, Object headers);
}
interface CeaseOrderRepository {
com..matrix.order.service.entity.OrderRequest save(com..matrix.order.service.entity.OrderRequest entity);
}
class OrderRequest {
private Long id;
private String billerOrderNo;
private String billerOrderActionNo;
private String trackingId;
private OrderStatus status;
private String rejectReason;
// Getters and setters
public Long getId() { return id; }
public void setId(Long id) { this.id = id; }
public String getBillerOrderNo() { return billerOrderNo; }
public void setBillerOrderNo(String no) { this.billerOrderNo = no; }
public String getBillerOrderActionNo() { return billerOrderActionNo; }
public void setBillerOrderActionNo(String no) { this.billerOrderActionNo = no; }
public String getTrackingId() { return trackingId; }
public void setTrackingId(String id) { this.trackingId = id; }
public OrderStatus getStatus() { return status; }
public void setStatus(OrderStatus status) { this.status = status; }
public String getRejectReason() { return rejectReason; }
public void setRejectReason(String reason) { this.rejectReason = reason; }
}
enum OrderStatus { NEW, SUBMITTED, COMPLETED, REJECTED, FAILED, SUCCESS }
class OrderStatusResponse {
private Message message;
public Message getMessage() { return message; }
public void setMessage(Message message) { this.message = message; }
}
class Message {
private String status;
public String getStatus() { return status; }
public void setStatus(String status) { this.status = status; }
}
class CeaseLineResponse {
private int httpCode;
private String message;
private String messageDescription;
private String orderNo;
private String orderActionId;
public static Builder builder() { return new Builder(); }
public int getHttpCode() { return httpCode; }
public String getMessage() { return message; }
public String getMessageDescription() { return messageDescription; }
public String getOrderNo() { return orderNo; }
public String getOrderActionId() { return orderActionId; }
static class Builder {
private CeaseLineResponse response = new CeaseLineResponse();
public Builder httpCode(int code) { response.httpCode = code; return this; }
public Builder message(String msg) { response.message = msg; return this; }
public Builder messageDescription(String desc) { response.messageDescription = desc; return this; }
public Builder orderNo(String no) { response.orderNo = no; return this; }
public Builder orderActionId(String id) { response.orderActionId = id; return this; }
public CeaseLineResponse build() { return response; }
}
}
class CreateAndSubmitSuspendResumeCeaseOrderRequest {}
1767618701
Anonymous
Wir haben Code wie diesen, der die API im Rahmen des Abfragemechanismus dreimal wiederholt. Falls es nicht abgeschlossen wird, wird ein Fall erstellt, indem eine andere API aufgerufen wird. Was passiert, ist, dass ich in den Protokollen zwei Threads sehe – den Hauptthread und den Thread-Pool, der neue Fälle erstellt. Daher wird die Tracking-ID aus dem Thread-Pool thehead auf null gesetzt und somit doppelte Threads erstellt. Hier ist ein relevanter Teil des Codes. LLM schlägt mir vor, eine Zeitüberschreitung unter dem Abbruch-Aufruf hinzuzufügen, aber eine andere API, bei der es keine Zeitüberschreitung gibt, ist damit konfrontiert. [b]Hauptthread[/b] [code]if (entity != null && entity.getId() != null) { Future future = null; try { log.info("Waiting for polling result for id : {} trackingId: {}", entity.getId(), trackingId); OrderRequest finalEntity = entity; String finalProductOrderId = productOrderId; Callable task = () -> checkGetStatusAPIForCeaseLine(customerId, finalProductOrderId, routeEnv, sc, trackingId, finalEntity, req); future = executor.submit(task); CeaseLineResponse result = future.get(futureTimeoutSecondsForCeaseLine, TimeUnit.SECONDS); return result; } catch (TimeoutException te) { log.warn("Polling did not complete in time for id: {} trackingId: {}", entity.getId(), trackingId); future.cancel(true); return handleAmaCaseForCeaseLineTimeout(customerId, entity, productOrderId, req, httpHeaders, trackingId); } catch (Exception e) { log.error("Error while waiting for poll result", e); return CeaseLineResponse.builder() .httpCode(500) .message("FAILED") .orderNo(entity.getBillerOrderNo()) .orderActionId(entity.getBillerOrderActionNo()) .build(); } } [/code] Executor-Aufruf [code]@Transactional public CeaseLineResponse checkGetStatusAPIForCeaseLine(String customerId, String productOrderId, String routeEnv, String sc, String trackingId, OrderRequest entity, CreateAndSubmitSuspendResumeCeaseOrderRequest req) { HttpHeaders httpHeaders = new HttpHeaders(); httpHeaders.add(HEADER_TRACKING_ID, trackingId); httpHeaders.add(HEADER_ROUTE_ENV, routeEnv); httpHeaders.add(HEADER_SOURCE_SYSTEM_ID, "MATRIX-CEASE-POLL"); httpHeaders.add(HEADER_SOURCE_SERVER_ID, "MATRIX-CEASE-POLL"); httpHeaders.add(HEADER_SERVICE_NAME, "OrderStatus"); httpHeaders.add(HEADER_CACHE_CONTROL, HEADER_CACHE_CONTROL_NO_CACHE); httpHeaders.add(HEADER_SERVICE_OPERATION_NAME, "MATRIX-CEASE-POLL"); httpHeaders.add(HEADER_TIMESTAMP, String.valueOf(System.currentTimeMillis())); for (int i = 0; i < maxAttemptForCeaseLine; i++) { // wait before polling try { Thread.sleep(ceaseLineSubmittedPlan[i] * 1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.info("Polling interrupted due to timeout, trackingId: {}", trackingId); return null; } try { long start = System.nanoTime(); OrderStatusResponse orderStatus = amdocs.getOrderStatus(customerId, productOrderId, sc, trackingId, httpHeaders); long end = System.nanoTime(); double durationSeconds = (end - start) / 1_000_000_000.0; log.info("Polling attempt {} for getCeaseLineOrderStatus. Execution time: {} seconds, trackingId: {}", i + 1, durationSeconds, trackingId); String status = orderStatus != null && orderStatus.getMessage() != null ? orderStatus.getMessage().getStatus() : null; log.info("Polled status: {} for cease line, trackingId: {}", status, trackingId); if ("CLOSED".equalsIgnoreCase(status)) { entity.setStatus(OrderStatus.SUCCESS); entity.setStatusUpdateDate(Instant.now()); entity.setStatusUpdateSource("API"); entity.setAttemptCount(i + 1); final OrderRequest save = ceaseOrderRepo.save(entity); log.info("Obtained CLOSED status for cease line. Updated row: {} trackingId: {}", save, entity.getTrackingId()); return CeaseLineResponse.builder() .httpCode(200) .message("SUCCESS") .orderNo(entity.getBillerOrderNo()) .orderActionId(entity.getBillerOrderActionNo()) .build(); } } catch (Exception ex) { entity.setStatus(OrderStatus.FAILED); entity.setRejectReason("Poll error: " + checkError(ex)); entity.setAttemptCount(i + 1); final OrderRequest save = ceaseOrderRepo.save(entity); log.info("Obtained FAILED status for cease line. Updated row: {} trackingId: {}", save, entity.getTrackingId()); return CeaseLineResponse.builder() .httpCode(500) .message("FAILED") .orderNo(entity.getBillerOrderNo()) .orderActionId(entity.getBillerOrderActionNo()) .messageDescription(entity.getRejectReason()) .build(); } } entity.setAttemptCount(maxAttemptForReplaceDevice); return handleAmaCaseForCeaseLineTimeout(customerId, entity, productOrderId, req, httpHeaders, trackingId); } private CeaseLineResponse handleAmaCaseForCeaseLineTimeout( String customerId, OrderRequest entity, String productOfferId, CreateAndSubmitSuspendResumeCeaseOrderRequest req, HttpHeaders httpHeaders, String trackingId ) { log.info("Creating case in AMA for stuck cease order. Order ID: {}, Action ID: {}, trackingId: {}", entity.getBillerOrderNo(), entity.getBillerOrderActionNo(), trackingId); String bpmOrderId = req.getPayload().getBpmOrderInputX9() != null ? req.getPayload().getBpmOrderInputX9().getBpmOrderIdx9() : null; String caseNumber = maybeCreateAmaCase(customerId, entity, productOfferId, bpmOrderId, httpHeaders); entity.setRejectReason("Order created but still processing - case Number: " + caseNumber); entity.setStatus(OrderStatus.SUBMITTED); final OrderRequest save = ceaseOrderRepo.save(entity); log.info("SUBMITTED (polling finished) for cease line. Updated row: {} trackingId: {}", save, entity.getTrackingId()); return buildAcceptedOrderResponse202(entity.getBillerOrderNo(), entity.getBillerOrderActionNo(), "Order created but still processing", caseNumber); } [/code] Dies geschieht in der Anfrage und beim Erstellen eines neuen Falls mithilfe der API [code]u_correlation_id\":\"null:9001962483:9001962484[/code] Dies ist, was LLM mir vorgeschlagen hat, nach dem Abbruch-Aufruf hinzuzufügen: [code]// Add a small delay to ensure the polling thread has stopped try { Thread.sleep(100); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } [/code] und eine unterbrochene Prüfung hier" [code]for (int i = 0; i < maxAttemptForCeaseLine; i++) { // Check if thread has been interrupted/cancelled if (Thread.currentThread().isInterrupted()) { log.info("Polling cancelled for trackingId: {}", trackingId); return null; } [/code] Was läuft hier schief? Irgendwelche Gedanken? Bearbeiten: Dieses Verhalten wird nur bei tryCount=3 beobachtet und nicht für 0,1 und 2. [b]Bearbeiten 2[/b]: Minimales Beispiel hinzufügen. Dies ist jedoch nicht in der Lage, die Rennbedingung zu reproduzieren [code]package com.order.service.concurrent; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.jupiter.api.BeforeEach; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.mockito.junit.MockitoJUnitRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; @RunWith(MockitoJUnitRunner.class) public class DoubleCaseCreationTest { private static final Logger log = LoggerFactory.getLogger(DoubleCaseCreationTest.class); @Mock private Service Service; @Mock private CeaseOrderRepository ceaseOrderRepo; private ExecutorService executor; private AtomicInteger amaCaseCallCount; private CeaseOrderService ceaseOrderService; // Test configuration private static final int TIMEOUT_SECONDS = 1; private static final int POLLING_ATTEMPTS = 3; private static final int[] POLLING_INTERVALS = {1,1,1}; // 1 second each = 5 seconds total @Before public void setUp() { MockitoAnnotations.initMocks(this); executor = Executors.newFixedThreadPool(5); amaCaseCallCount = new AtomicInteger(0); // Mock repository save when(ceaseOrderRepo.save(any(com..matrix.order.service.entity.OrderRequest.class))) .thenAnswer(invocation -> invocation.getArgument(0)); // Mock to return IN_PROGRESS (never completes) OrderStatusResponse inProgressResponse = new OrderStatusResponse(); Message message = new Message(); message.setStatus("IN_PROGRESS"); inProgressResponse.setMessage(message); when(Service.getOrderStatus(anyString(), anyString(), anyString(), anyString(), any())) .thenReturn(inProgressResponse); ceaseOrderService = new CeaseOrderService( Service, ceaseOrderRepo, executor, TIMEOUT_SECONDS, POLLING_ATTEMPTS, POLLING_INTERVALS ); } @After public void tearDown() { executor.shutdownNow(); try { executor.awaitTermination(2, TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } /** * TEST 1: Demonstrates the BUG - Two AMA cases are created * * Timeline: * T=0s: Main thread submits polling task * T=0-2s: Executor thread polls (attempts 1-2) * T=2s: Main thread times out * T=2s: Main thread creates AMA case #1 * T=2-5s: Executor thread continues polling (attempts 3-5) * T=5s: Executor thread exhausts retries, creates AMA case #2 * * Result: 2 cases created */ @Test public void testBuggyVersion_CreatesTwoCases() throws Exception { log.info("\n========================================"); log.info("TEST: BUGGY VERSION - Expects 2 Cases"); log.info("========================================\n"); // Reset counter amaCaseCallCount.set(0); // Track AMA case creation calls AtomicInteger mainThreadCalls = new AtomicInteger(0); AtomicInteger executorThreadCalls = new AtomicInteger(0); ceaseOrderService.setAmaCaseCreationCallback((threadName) -> { int count = amaCaseCallCount.incrementAndGet(); log.warn("AMA CASE #{} CREATED by thread: {}", count, threadName); if (threadName.contains("main") || threadName.contains("Test worker")) { mainThreadCalls.incrementAndGet(); } else if (threadName.contains("pool")) { executorThreadCalls.incrementAndGet(); } }); // Create test data com..matrix.order.service.entity.OrderRequest entity = createTestOrder(); CreateAndSubmitSuspendResumeCeaseOrderRequest request = new CreateAndSubmitSuspendResumeCeaseOrderRequest(); // Execute BUGGY version long startTime = System.currentTimeMillis(); CeaseLineResponse response = ceaseOrderService.createCeaseOrderBuggy( "customer123", entity, "order123", request, "demo", "sc123", "track123" ); long endTime = System.currentTimeMillis(); log.info("\n--- BUGGY VERSION Response ---"); log.info("Execution time: {}ms", (endTime - startTime)); log.info("Response code: {}", response.getHttpCode()); log.info("Response message: {}", response.getMessage()); // Wait for executor thread to finish log.info("\nWaiting for executor thread to complete..."); //Thread.sleep(4000); // Wait for remaining polling attempts // Results int totalCases = amaCaseCallCount.get(); log.info("\n========================================"); log.info("BUGGY VERSION RESULTS:"); log.info("Total AMA cases created: {}", totalCases); log.info("Cases by main thread: {}", mainThreadCalls.get()); log.info("Cases by executor thread: {}", executorThreadCalls.get()); log.info("========================================\n"); // Assertions - BUGGY version creates 2 cases assertEquals("BUGGY: Should create 2 cases (demonstrating the bug)", 2, totalCases); assertEquals("Main thread should create 1 case", 1, mainThreadCalls.get()); assertEquals("Executor thread should create 1 case", 1, executorThreadCalls.get()); assertEquals("Should return 202 ACCEPTED", 202, response.getHttpCode()); } private com..matrix.order.service.entity.OrderRequest createTestOrder() { com..matrix.order.service.entity.OrderRequest entity = new com..matrix.order.service.entity.OrderRequest(); entity.setId(1L); entity.setBillerOrderNo("9001962483"); entity.setBillerOrderActionNo("9001962484"); entity.setTrackingId("testTrack123"); entity.setStatus(com..matrix.order.service.domain.OrderStatus.REJECTED); return entity; } } /** * Service class that implements both BUGGY and FIXED versions */ class CeaseOrderService { private static final Logger log = LoggerFactory.getLogger(CeaseOrderService.class); private final Service Service; private final CeaseOrderRepository ceaseOrderRepo; private final ExecutorService executor; private final int timeoutSeconds; private final int maxPollingAttempts; private final int[] pollingIntervals; // Callback for testing private java.util.function.Consumer amaCaseCreationCallback; public CeaseOrderService(Service Service, CeaseOrderRepository ceaseOrderRepo, ExecutorService executor, int timeoutSeconds, int maxPollingAttempts, int[] pollingIntervals) { this.Service = Service; this.ceaseOrderRepo = ceaseOrderRepo; this.executor = executor; this.timeoutSeconds = timeoutSeconds; this.maxPollingAttempts = maxPollingAttempts; this.pollingIntervals = pollingIntervals; } public void setAmaCaseCreationCallback(java.util.function.Consumer callback) { this.amaCaseCreationCallback = callback; } /** * BUGGY VERSION - Does not cancel future properly */ public CeaseLineResponse createCeaseOrderBuggy( String customerId, com..matrix.order.service.entity.OrderRequest entity, String productOrderId, CreateAndSubmitSuspendResumeCeaseOrderRequest req, String routeEnv, String sc, String trackingId) { if (entity != null && entity.getId() != null) { Future future = null; try { log.info("[BUGGY] Submitting polling task - trackingId: {}", trackingId); Callable task = new Callable() { @Override public CeaseLineResponse call() throws Exception { return checkGetStatusAPIBuggy(customerId, productOrderId, routeEnv, sc, trackingId, entity, req); } }; future = executor.submit(task); CeaseLineResponse result = future.get(timeoutSeconds, TimeUnit.SECONDS); return result; } catch (TimeoutException te) { log.warn("[BUGGY] Timeout occurred! Main thread creating case - trackingId: {}", trackingId); future.cancel(true); // BUG: No future cancellation! // Executor thread continues running and will create its own case return handleAmaCaseForCeaseLineTimeout(customerId, entity, productOrderId, trackingId); } catch (Exception e) { log.error("[BUGGY] Error: {}", e.getMessage()); return buildFailedResponse(entity); } } return null; } /** * FIXED VERSION - Properly cancels future */ public CeaseLineResponse createCeaseOrderFixed( String customerId, com..matrix.order.service.entity.OrderRequest entity, String productOrderId, CreateAndSubmitSuspendResumeCeaseOrderRequest req, String routeEnv, String sc, String trackingId) { if (entity != null && entity.getId() != null) { Future future = null; try { log.info("[FIXED] Submitting polling task - trackingId: {}", trackingId); Callable task = new Callable() { @Override public CeaseLineResponse call() throws Exception { return checkGetStatusAPIFixed(customerId, productOrderId, routeEnv, sc, trackingId, entity, req); } }; future = executor.submit(task); CeaseLineResponse result = future.get(timeoutSeconds, TimeUnit.SECONDS); return result; } catch (TimeoutException te) { log.warn("[FIXED] Timeout occurred! Cancelling future - trackingId: {}", trackingId); // FIX: Cancel the future if (future != null) { boolean cancelled = future.cancel(true); log.info("[FIXED] Future cancelled: {}", cancelled); } // FIX: Wait for task to stop if (future != null) { try { future.get(200, TimeUnit.MILLISECONDS); } catch (CancellationException ce) { log.info("[FIXED] Task was successfully cancelled"); } catch (TimeoutException | ExecutionException | InterruptedException e) { log.warn("[FIXED] Task did not stop in time: {}", e.getMessage()); } } return handleAmaCaseForCeaseLineTimeout(customerId, entity, productOrderId, trackingId); } catch (Exception e) { log.error("[FIXED] Error: {}", e.getMessage()); return buildFailedResponse(entity); } } return null; } /** * BUGGY polling - doesn't respect interruption */ private CeaseLineResponse checkGetStatusAPIBuggy( String customerId, String productOrderId, String routeEnv, String sc, String trackingId, com..matrix.order.service.entity.OrderRequest entity, CreateAndSubmitSuspendResumeCeaseOrderRequest req) { log.info("[BUGGY POLL] 🔄 Starting polling in executor thread - trackingId: {}", trackingId); for (int i = 0; i < maxPollingAttempts; i++) { try { log.info("[BUGGY POLL] Attempt {}/{} - sleeping {}s", i+1, maxPollingAttempts, pollingIntervals[i]); Thread.sleep(pollingIntervals[i] * 1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); // BUG: Doesn't return, continues execution! log.warn("[BUGGY POLL] Interrupted but continuing..."); return null; } try { Thread.sleep(7000L); OrderStatusResponse orderStatus = Service.getOrderStatus( customerId, productOrderId, sc, trackingId, null); String status = orderStatus != null && orderStatus.getMessage() != null ? orderStatus.getMessage().getStatus() : null; if ("CLOSED".equalsIgnoreCase(status)) { entity.setStatus(com..matrix.order.service.domain.OrderStatus.SUCCESS); ceaseOrderRepo.save(entity); log.info("[BUGGY POLL] Order completed successfully"); return buildSuccessResponse(entity); } } catch (Exception ex) { log.error("[BUGGY POLL] Error polling: {}", ex.getMessage()); } } log.warn("[BUGGY POLL] All attempts exhausted, executor thread creating case!"); return handleAmaCaseForCeaseLineTimeout(customerId, entity, productOrderId, trackingId); } /** * FIXED polling - respects interruption */ private CeaseLineResponse checkGetStatusAPIFixed( String customerId, String productOrderId, String routeEnv, String sc, String trackingId, com..matrix.order.service.entity.OrderRequest entity, CreateAndSubmitSuspendResumeCeaseOrderRequest req) { log.info("[FIXED POLL] Starting polling in executor thread - trackingId: {}", trackingId); for (int i = 0; i < maxPollingAttempts; i++) { // FIX: Check for interruption at start of loop if (Thread.currentThread().isInterrupted()) { log.info("[FIXED POLL] Detected interruption at loop start, exiting cleanly"); return null; } try { log.info("[FIXED POLL] Attempt {}/{} - sleeping {}s", i+1, maxPollingAttempts, pollingIntervals[i]); Thread.sleep(pollingIntervals[i] * 1000L); } catch (InterruptedException e) { Thread.currentThread().interrupt(); // FIX: Return immediately on interruption log.info("[FIXED POLL] Interrupted during sleep, exiting immediately"); return null; } try { OrderStatusResponse orderStatus = Service.getOrderStatus( customerId, productOrderId, sc, trackingId, null); String status = orderStatus != null && orderStatus.getMessage() != null ? orderStatus.getMessage().getStatus() : null; if ("CLOSED".equalsIgnoreCase(status)) { entity.setStatus(com..matrix.order.service.domain.OrderStatus.SUCCESS); ceaseOrderRepo.save(entity); log.info("[FIXED POLL] Order completed successfully"); return buildSuccessResponse(entity); } } catch (Exception ex) { log.error("[FIXED POLL] Error polling: {}", ex.getMessage()); } } log.warn("[FIXED POLL] All attempts exhausted, creating case"); return handleAmaCaseForCeaseLineTimeout(customerId, entity, productOrderId, trackingId); } private CeaseLineResponse handleAmaCaseForCeaseLineTimeout( String customerId, com..matrix.order.service.entity.OrderRequest entity, String productOrderId, String trackingId) { String threadName = Thread.currentThread().getName(); log.info("📋 Creating AMA case from thread: {}", threadName); // Notify callback for testing if (amaCaseCreationCallback != null) { amaCaseCreationCallback.accept(threadName); } String caseNumber = "CASE-" + System.currentTimeMillis(); entity.setRejectReason("Order created but still processing - case: " + caseNumber); entity.setStatus(com..matrix.order.service.domain.OrderStatus.SUBMITTED); ceaseOrderRepo.save(entity); return CeaseLineResponse.builder() .httpCode(202) .message("ACCEPTED") .messageDescription("Case created: " + caseNumber) .orderNo(entity.getBillerOrderNo()) .orderActionId(entity.getBillerOrderActionNo()) .build(); } private CeaseLineResponse buildSuccessResponse(com..matrix.order.service.entity.OrderRequest entity) { return CeaseLineResponse.builder() .httpCode(200) .message("SUCCESS") .orderNo(entity.getBillerOrderNo()) .orderActionId(entity.getBillerOrderActionNo()) .build(); } private CeaseLineResponse buildFailedResponse(com..matrix.order.service.entity.OrderRequest entity) { return CeaseLineResponse.builder() .httpCode(500) .message("FAILED") .orderNo(entity.getBillerOrderNo()) .orderActionId(entity.getBillerOrderActionNo()) .build(); } } // Supporting classes and mocks interface Service { OrderStatusResponse getOrderStatus(String customerId, String productOrderId, String sc, String trackingId, Object headers); } interface CeaseOrderRepository { com..matrix.order.service.entity.OrderRequest save(com..matrix.order.service.entity.OrderRequest entity); } class OrderRequest { private Long id; private String billerOrderNo; private String billerOrderActionNo; private String trackingId; private OrderStatus status; private String rejectReason; // Getters and setters public Long getId() { return id; } public void setId(Long id) { this.id = id; } public String getBillerOrderNo() { return billerOrderNo; } public void setBillerOrderNo(String no) { this.billerOrderNo = no; } public String getBillerOrderActionNo() { return billerOrderActionNo; } public void setBillerOrderActionNo(String no) { this.billerOrderActionNo = no; } public String getTrackingId() { return trackingId; } public void setTrackingId(String id) { this.trackingId = id; } public OrderStatus getStatus() { return status; } public void setStatus(OrderStatus status) { this.status = status; } public String getRejectReason() { return rejectReason; } public void setRejectReason(String reason) { this.rejectReason = reason; } } enum OrderStatus { NEW, SUBMITTED, COMPLETED, REJECTED, FAILED, SUCCESS } class OrderStatusResponse { private Message message; public Message getMessage() { return message; } public void setMessage(Message message) { this.message = message; } } class Message { private String status; public String getStatus() { return status; } public void setStatus(String status) { this.status = status; } } class CeaseLineResponse { private int httpCode; private String message; private String messageDescription; private String orderNo; private String orderActionId; public static Builder builder() { return new Builder(); } public int getHttpCode() { return httpCode; } public String getMessage() { return message; } public String getMessageDescription() { return messageDescription; } public String getOrderNo() { return orderNo; } public String getOrderActionId() { return orderActionId; } static class Builder { private CeaseLineResponse response = new CeaseLineResponse(); public Builder httpCode(int code) { response.httpCode = code; return this; } public Builder message(String msg) { response.message = msg; return this; } public Builder messageDescription(String desc) { response.messageDescription = desc; return this; } public Builder orderNo(String no) { response.orderNo = no; return this; } public Builder orderActionId(String id) { response.orderActionId = id; return this; } public CeaseLineResponse build() { return response; } } } class CreateAndSubmitSuspendResumeCeaseOrderRequest {} [/code]