diff --git a/topic/src/test/java/tech/ydb/topic/impl/TopicReaderEventOrderingTest.java b/topic/src/test/java/tech/ydb/topic/impl/TopicReaderEventOrderingTest.java
new file mode 100644
index 000000000..9cb1f96a9
--- /dev/null
+++ b/topic/src/test/java/tech/ydb/topic/impl/TopicReaderEventOrderingTest.java
@@ -0,0 +1,872 @@
+package tech.ydb.topic.impl;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.jetbrains.annotations.NotNull;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import tech.ydb.core.Status;
+import tech.ydb.test.junit4.GrpcTransportRule;
+import tech.ydb.topic.TopicClient;
+import tech.ydb.topic.description.Consumer;
+import tech.ydb.topic.read.AsyncReader;
+import tech.ydb.topic.read.PartitionSession;
+import tech.ydb.topic.read.events.ReadEventHandler;
+import tech.ydb.topic.settings.CreateTopicSettings;
+import tech.ydb.topic.settings.PartitioningSettings;
+import tech.ydb.topic.settings.ReadEventHandlersSettings;
+import tech.ydb.topic.settings.ReaderSettings;
+import tech.ydb.topic.settings.TopicReadSettings;
+import tech.ydb.topic.settings.WriterSettings;
+import tech.ydb.topic.write.Message;
+import tech.ydb.topic.write.SyncWriter;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test to verify event ordering guarantees and session close race conditions in Topic API.
+ *
+ * This test checks for two related problems:
+ * 1. Event Ordering: StartPartitionSessionEvent and StopPartitionSessionEvent must be delivered in order,
+ * ensuring stop events are not processed before their corresponding start events.
+ *
+ * 2. Session Close Race Condition: Server reader sessions should not be closed before onPartitionSessionClosed
+ * and onReaderClosed callbacks complete execution. This prevents partitions from being reassigned to other
+ * readers before the original reader has finished cleaning up its resources.
+ *
+ * @author Evgeny Kuvardin
+ */
+public class TopicReaderEventOrderingTest {
+ private static final Logger logger = LoggerFactory.getLogger(TopicReaderEventOrderingTest.class);
+
+ @ClassRule
+ public static final GrpcTransportRule ydbTransport = new GrpcTransportRule();
+
+ private static final String TEST_CONSUMER = "test-consumer";
+
+ // Be careful to increment partition count!
+ // All single threads are stuck for 5 seconds
+ // Also should increase value to wait reader2GotPartition
+ private static final int partitionCount = 2;
+
+ private TopicClient client;
+ private String testTopic;
+
+ @Before
+ public void setup() {
+ testTopic = "test-topic-" + UUID.randomUUID();
+ logger.info("Creating test topic: {}", testTopic);
+
+ client = TopicClient.newClient(ydbTransport).build();
+ client.createTopic(testTopic, CreateTopicSettings.newBuilder()
+ .addConsumer(Consumer.newBuilder().setName(TEST_CONSUMER).build())
+ .setPartitioningSettings(PartitioningSettings.
+ newBuilder()
+ .setMinActivePartitions(partitionCount)
+ .build())
+ .build()
+ ).join().expectSuccess("Failed to create test topic");
+ }
+
+ @After
+ public void tearDown() {
+ if (testTopic != null && client != null) {
+ logger.info("Dropping test topic: {}", testTopic);
+ Status dropStatus = client.dropTopic(testTopic).join();
+ dropStatus.expectSuccess("Failed to drop test topic");
+ }
+ if (client != null) {
+ client.close();
+ }
+ }
+
+ private void sendMessage(String data) {
+ WriterSettings settings = WriterSettings.newBuilder()
+ .setTopicPath(testTopic)
+ .setProducerId("test-producer")
+ .build();
+
+ SyncWriter writer = client.createSyncWriter(settings);
+ writer.initAndWait();
+ writer.send(Message.of(data.getBytes()));
+ writer.flush();
+ }
+
+ /**
+ * Scenario:
+ * Verify that StartPartitionSessionEvent is always delivered before
+ * PartitionSessionClosedEvent for the same partition session.
+ *
+ * The test ensures that event ordering is preserved and that the client
+ * never receives a "close session" event before the corresponding "start session".
+ *
+ * Test steps:
+ *
+ * 1. Create structures to track events per partition.
+ * 2. Create AsyncReader with event handlers that log start/close events.
+ * 3. Initialize reader.
+ * 4. Send a message to trigger partition assignment.
+ * 5. Wait until start events are received
+ * 6. Shutdown reader to trigger close events
+ * 7. Wait until close events are received.
+ * 8. Verify that no ordering violation occurred.
+ * 9. Verify Start event occurs before Close event
+ */
+ @Test
+ public void testEventOrderingGuarantees() throws Exception {
+ logger.info("Starting testEventOrderingGuarantees");
+
+ // Step 1: Create structures to track events per partition.
+ StructureTest1 structureTest = getStructureForOrderGarantees();
+
+ // Step 2: Create AsyncReader with event handlers that log start/close events.
+ AsyncReader reader = getAsyncReaderForOrderGaran(structureTest.readerSettings, structureTest.executor, structureTest.eventLog, structureTest.activeSessions, structureTest.orderingViolation, structureTest.startReceived, structureTest.closeReceived);
+
+ // Step 3: Initialize reader
+ reader.init().join();
+
+ // Step 4: Send message to trigger partition assignment
+ sendMessage("test-message");
+
+ // Step 5: Wait until start events are received
+ assertTrue("Start event not received", structureTest.startReceived.await(10, TimeUnit.SECONDS));
+
+ // Step 6: Shutdown reader to trigger close events
+ logger.info("Shutting down reader");
+ reader.shutdown().get(10, TimeUnit.SECONDS);
+
+ // Step 7: Wait for close events
+ assertTrue("Close event not received", structureTest.closeReceived.await(10, TimeUnit.SECONDS));
+
+ structureTest.executor.shutdownNow();
+
+ logger.info("Event log: {}", structureTest.eventLog);
+ // Step 8: Verify no ordering violations occurred
+ assertFalse("Event ordering violation detected", structureTest.orderingViolation.get());
+
+ // Step 9: Verify Start event occurs before Close event
+ for (long partitionId = 0; partitionId < partitionCount; partitionId++) {
+ // Verify event sequence
+ int startIndex = -1;
+ int stopIndex = -1;
+ for (int i = 0; i < structureTest.eventLog.get(partitionId).size(); i++) {
+ if (startIndex == -1 && structureTest.eventLog.get(partitionId).get(i).startsWith("onStartPartitionSession")) {
+ startIndex = i;
+ }
+ if (stopIndex == -1 && structureTest.eventLog.get(partitionId).get(i).startsWith("onPartitionSessionClosed")) {
+ stopIndex = i;
+ }
+ }
+
+ assertTrue("Start event should be present", startIndex >= 0);
+ assertTrue("Close event should be present", stopIndex >= 0);
+ assertTrue("Start event must come before Stop event", startIndex < stopIndex);
+ }
+ }
+
+ /**
+ * Scenario:
+ * Verify that partition reassignment does not happen while the previous reader
+ * is still executing cleanup logic inside onPartitionSessionClosed.
+ *
+ * This test simulates a slow cleanup in Reader-1 and starts Reader-2 while
+ * Reader-1 is still closing the session.
+ *
+ * Steps:
+ *
+ * 1. Start Reader-1 and wait until it receives partitions.
+ * 2. Send a message to trigger partition assignment.
+ * 3. Shutdown Reader-1 to trigger session close.
+ * 4. Block Reader-1 cleanup to simulate slow resource release.
+ * 5. Start Reader-2 while Reader-1 cleanup is still in progress.
+ * 6. Allow Reader-1 cleanup to finish.
+ * 7. Wait for partition reassignment to Reader-2.
+ * 8. Verify that reassignment only happened after Reader-1 cleanup finished.
+ */
+ @Ignore
+ @Test
+ public void testSessionCloseRaceCondition() throws Exception {
+ logger.info("Starting testSessionCloseRaceCondition");
+
+ StructureTest2 structureTest = getStructureForRaceCondition();
+
+ // Create Reader-1
+ AsyncReader reader1 = getAsyncReader1ForRaceCondition(structureTest.readerSettings, structureTest.reader1Executor, structureTest.reader1PartitionSession, structureTest.reader1Started, structureTest.reader1CleanupInProgress, structureTest.reader1CleanupStarted, structureTest.allowReader1ToFinish, structureTest.reader1CleanupCompleted);
+
+ // Step 1. Start Reader-1 and wait until it receives partitions.
+ reader1.init().join();
+
+ // Step 2. Send a message to trigger partition assignment.
+ sendMessage("test-message-1");
+
+ // Wait for Reader-1 to receive partition
+ assertTrue("Reader-1 did not receive partition", structureTest.reader1Started.await(10, TimeUnit.SECONDS));
+ for (Map.Entry> v : structureTest.reader1PartitionSession.entrySet()) {
+ assertNotNull("Reader-1 partition session is null", v.getValue().get());
+ logger.info("Reader-1 received partition: {}", v.getKey());
+ }
+
+ // Step 3.Reader-1 to trigger session close.
+ logger.info("Before reader-1 shutdown");
+ CompletableFuture reader1ShutdownFuture = reader1.shutdown();
+
+ // Wait for Reader-1 cleanup to start
+ assertTrue("Reader-1 cleanup did not start", structureTest.reader1CleanupStarted.await(15, TimeUnit.SECONDS));
+ logger.info("Reader-1 cleanup started");
+
+ // Create Reader-2 while Reader-1 is still cleaning up
+ AsyncReader reader2 = getAsyncReader2ForRaceCondition(structureTest.readerSettings, structureTest.reader2Executor, structureTest.reader1CleanupInProgress, structureTest.raceConditionDetected, structureTest.reader1CleanupCompleted, structureTest.reader2Started);
+
+ // Step 5. Start Reader-2 while Reader-1 cleanup is still in progress.
+ reader2.init().join();
+
+ // Give some time for Reader-2 to potentially receive the partition during Reader-1's cleanup
+ Thread.sleep(500);
+
+ // Step 6. Allow Reader-1 cleanup to finish.
+ structureTest.allowReader1ToFinish.countDown();
+
+ // Step 7. Wait for partition reassignment to Reader-2.
+ reader1ShutdownFuture.get(10, TimeUnit.SECONDS);
+ logger.info("After reader-1 shutdown");
+
+ // Wait a bit more for partition reassignment to Reader-2
+ boolean reader2GotPartition = structureTest.reader2Started.await(15, TimeUnit.SECONDS);
+
+ // Cleanup
+ reader2.shutdown().get(10, TimeUnit.SECONDS);
+ structureTest.reader1Executor.shutdownNow();
+ structureTest.reader2Executor.shutdownNow();
+
+ // Step 8. Verify that reassignment only happened after Reader-1 cleanup finished.
+ assertFalse("Race condition detected: Reader-2 received partition while Reader-1 was still cleaning up",
+ structureTest.raceConditionDetected.get());
+
+ if (reader2GotPartition) {
+ for (Map.Entry v : structureTest.reader1CleanupCompleted.entrySet()) {
+ assertTrue("Reader-1 cleanup should be completed before Reader-2 receives the partition : " + v.getKey(),
+ v.getValue().get());
+ logger.info("Test passed: Reader-2 received partition only after Reader-1 completed cleanup, partition {}", v.getKey());
+ }
+ } else {
+ logger.warn("Reader-2 did not receive partition within timeout - test inconclusive");
+ }
+ }
+
+ /**
+ * Scenario:
+ * Verify that when the YDB server removes a partition from a reader and immediately reassigns
+ * it back to the same reader, StartPartitionSessionEvent is never delivered before
+ * PartitionSessionClosedEvent for the prior session on the same partition.
+ *
+ * Test steps:
+ *
+ * 1. Start Reader-1 and wait until it receives all partitions.
+ * 2. Connect Reader-2 with the same consumer to trigger server-side rebalancing.
+ * Server sends StopPartitionSession to Reader-1 for some partitions.
+ * 3. Once Reader-2 receives at least one partition, immediately shut it down.
+ * Server should reassign the partition back to Reader-1.
+ * 4. Wait for Reader-1 to receive StartPartitionSession a second time (the re-assignment).
+ * 5. Verify no ordering violation: no StartPartitionSession arrived while the previous
+ * session for the same partition was still open (i.e., before PartitionSessionClosed).
+ */
+ @Ignore
+ @Test
+ public void testPartitionRemoveAndImmediateReturn() throws Exception {
+ logger.info("Starting testPartitionRemoveAndImmediateReturn");
+
+ // Track currently active session per partition: partitionId -> sessionId.
+ // If a new StartPartitionSession arrives while a session is still active, that is a violation.
+ Map activeSessions = new ConcurrentHashMap<>();
+ AtomicBoolean orderingViolation = new AtomicBoolean(false);
+
+ // Count how many times each partition has been started on reader1.
+ Map startCounts = new ConcurrentHashMap<>();
+
+ // Step 1 signal: all partitions assigned to reader1 for the first time.
+ CountDownLatch reader1FirstAssignment = new CountDownLatch(partitionCount);
+
+ // Signal: reader1 received a partition for the second time (server returned it back).
+ CountDownLatch reader1GotPartitionBack = new CountDownLatch(1);
+
+ // Signal: reader2 received at least one partition (rebalancing took place).
+ CountDownLatch reader2GotPartition = new CountDownLatch(1);
+
+ ExecutorService reader1Executor = Executors.newSingleThreadExecutor(r -> new Thread(r, "remove-return-reader1-executor"));
+ ExecutorService reader2Executor = Executors.newSingleThreadExecutor(r -> new Thread(r, "remove-return-reader2-executor"));
+
+ ReaderSettings readerSettings = ReaderSettings.newBuilder()
+ .addTopic(TopicReadSettings.newBuilder()
+ .setPath(testTopic)
+ .build())
+ .setConsumerName(TEST_CONSUMER)
+ .build();
+
+ AsyncReader reader1 = client.createAsyncReader(readerSettings, ReadEventHandlersSettings.newBuilder()
+ .setExecutor(reader1Executor)
+ .setEventHandler(new ReadEventHandler() {
+ @Override
+ public void onMessages(tech.ydb.topic.read.events.DataReceivedEvent event) {
+ }
+
+ @Override
+ public void onStartPartitionSession(tech.ydb.topic.read.events.StartPartitionSessionEvent event) {
+ long partitionId = event.getPartitionSession().getPartitionId();
+ long sessionId = event.getPartitionSession().getId();
+
+ if (activeSessions.containsKey(partitionId)) {
+ logger.error("VIOLATION: Reader-1 got StartPartitionSession for partition {} session {}" +
+ " while session {} is still active (PartitionSessionClosed not yet delivered)",
+ partitionId, sessionId, activeSessions.get(partitionId));
+ orderingViolation.set(true);
+ }
+
+ activeSessions.put(partitionId, sessionId);
+ event.confirm();
+
+ int count = startCounts
+ .computeIfAbsent(partitionId, k -> new AtomicInteger(0))
+ .incrementAndGet();
+ logger.info("Reader-1: onStartPartitionSession partition={} session={} count={}", partitionId, sessionId, count);
+
+ if (count == 1) {
+ reader1FirstAssignment.countDown();
+ } else {
+ reader1GotPartitionBack.countDown();
+ }
+ }
+
+ @Override
+ public void onPartitionSessionClosed(tech.ydb.topic.read.events.PartitionSessionClosedEvent event) {
+ long partitionId = event.getPartitionSession().getPartitionId();
+ long sessionId = event.getPartitionSession().getId();
+ logger.info("Reader-1: onPartitionSessionClosed partition={} session={}", partitionId, sessionId);
+ activeSessions.remove(partitionId);
+ }
+ })
+ .build()
+ );
+
+ // Step 1: Initialize reader1 and wait for all partitions.
+ reader1.init().join();
+ sendMessage("trigger-assignment");
+ assertTrue("Reader-1 did not receive all partitions on first assignment",
+ reader1FirstAssignment.await(15, TimeUnit.SECONDS));
+ logger.info("Reader-1 has all {} partitions", partitionCount);
+
+ // Step 2: Start reader2 with the same consumer to trigger server-side rebalancing.
+ AsyncReader reader2 = client.createAsyncReader(readerSettings, ReadEventHandlersSettings.newBuilder()
+ .setExecutor(reader2Executor)
+ .setEventHandler(new ReadEventHandler() {
+ @Override
+ public void onMessages(tech.ydb.topic.read.events.DataReceivedEvent event) {
+ }
+
+ @Override
+ public void onStartPartitionSession(tech.ydb.topic.read.events.StartPartitionSessionEvent event) {
+ long partitionId = event.getPartitionSession().getPartitionId();
+ logger.info("Reader-2: onStartPartitionSession partition={}", partitionId);
+ event.confirm();
+ reader2GotPartition.countDown();
+ }
+ })
+ .build()
+ );
+
+ reader2.init().join();
+ logger.info("Reader-2 initialized, waiting for partition reassignment from server");
+
+ // Step 3: Wait for reader2 to receive at least one partition, then immediately shut it down.
+ boolean reader2ReceivedPartition = reader2GotPartition.await(15, TimeUnit.SECONDS);
+ if (!reader2ReceivedPartition) {
+ logger.warn("Reader-2 did not receive any partition — test inconclusive, skipping ordering check");
+ reader2.shutdown().get(10, TimeUnit.SECONDS);
+ reader1.shutdown().get(10, TimeUnit.SECONDS);
+ reader1Executor.shutdownNow();
+ reader2Executor.shutdownNow();
+ Assert.fail("Reader-2 did not receive any partition — test inconclusive, skipping ordering check");
+ }
+
+ logger.info("Reader-2 received a partition, shutting it down immediately to trigger return to reader1");
+ reader2.shutdown().get(10, TimeUnit.SECONDS);
+
+ // Step 4: Wait for reader1 to receive a partition back from the server.
+ boolean partitionReturnedToReader1 = reader1GotPartitionBack.await(20, TimeUnit.SECONDS);
+ logger.info("Reader-1 got partition back: {}", partitionReturnedToReader1);
+
+ reader1.shutdown().get(10, TimeUnit.SECONDS);
+ reader1Executor.shutdownNow();
+ reader2Executor.shutdownNow();
+
+ // Step 5: Verify ordering.
+ if (partitionReturnedToReader1) {
+ assertFalse(
+ "Event ordering violation: StartPartitionSession was received before PartitionSessionClosed" +
+ " for the same partition when the server reassigned it back",
+ orderingViolation.get());
+ logger.info("testPartitionRemoveAndImmediateReturn PASSED");
+ } else {
+ logger.warn("Server did not reassign any partition back to reader-1 within the timeout — test inconclusive");
+ assertFalse("Ordering violation detected even though no partition was returned", orderingViolation.get());
+ }
+ }
+
+ /**
+ * Scenario:
+ * The client holds a StopPartitionSession event and delays calling confirm() on it.
+ * While the client is still "thinking" about releasing the partition, Reader-1 is shut down
+ * externally (simulating a crash or forced close). This causes the SDK to close the gRPC
+ * session, which fires onPartitionSessionClosed for all active partitions.
+ * Only after onPartitionSessionClosed fires does the client call the stored StopPartitionSession.confirm().
+ *
+ * This verifies that a late StopPartitionSession.confirm() (called after the partition is already
+ * closed by the session shutdown) is handled gracefully — it must be a safe no-op and must not
+ * crash, send spurious responses to the server, or cause ordering violations.
+ *
+ * Steps:
+ * 1. Create Reader-1. confirm() StartPartitionSession immediately, but store
+ * StopPartitionSession events and do NOT confirm() them yet.
+ * 2. Initialize Reader-1, send a message, wait until all partitions are assigned.
+ * 3. Connect Reader-2 with the same consumer to trigger server-side rebalancing.
+ * Server sends graceful StopPartitionSession to Reader-1 for at least one partition.
+ * 4. Wait for Reader-1 to receive the StopPartitionSession event (not confirmed).
+ * 5. Shut down Reader-1 — this closes the session and fires onPartitionSessionClosed.
+ * 6. Call confirm() on ALL stored StopPartitionSession events — session is already closed.
+ * 7. Verify no exception and no ordering violation.
+ */
+ @Test
+ public void testLateStopConfirmAfterPartitionRevoked() throws Exception {
+ logger.info("Starting testLateStopConfirmAfterPartitionRevoked");
+
+ // Stored StopPartitionSession events — confirm() will be called late
+ List pendingStopEvents =
+ Collections.synchronizedList(new ArrayList<>());
+
+ Map activeSessions = new ConcurrentHashMap<>();
+ AtomicBoolean orderingViolation = new AtomicBoolean(false);
+
+ // Counted down when all partitions are assigned to Reader-1 for the first time
+ CountDownLatch reader1AllPartitionsAssigned = new CountDownLatch(partitionCount);
+ // Counted down when a stop request arrives at Reader-1 (before confirm)
+ CountDownLatch reader1StopReceived = new CountDownLatch(1);
+ // Counted down each time a partition is actually closed on Reader-1
+ CountDownLatch reader1PartitionClosed = new CountDownLatch(partitionCount);
+ // Reader-2 has received at least one partition
+ CountDownLatch reader2GotPartition = new CountDownLatch(1);
+
+ ExecutorService reader1Executor = Executors.newSingleThreadExecutor(
+ r -> new Thread(r, "late-stop-reader1-executor"));
+ ExecutorService reader2Executor = Executors.newSingleThreadExecutor(
+ r -> new Thread(r, "late-stop-reader2-executor"));
+
+ ReaderSettings readerSettings = ReaderSettings.newBuilder()
+ .addTopic(TopicReadSettings.newBuilder().setPath(testTopic).build())
+ .setConsumerName(TEST_CONSUMER)
+ .build();
+
+ // Step 1: Build Reader-1
+ AsyncReader reader1 = client.createAsyncReader(readerSettings, ReadEventHandlersSettings.newBuilder()
+ .setExecutor(reader1Executor)
+ .setEventHandler(new ReadEventHandler() {
+ @Override
+ public void onMessages(tech.ydb.topic.read.events.DataReceivedEvent event) {
+ }
+
+ @Override
+ public void onStartPartitionSession(tech.ydb.topic.read.events.StartPartitionSessionEvent event) {
+ long partitionId = event.getPartitionSession().getPartitionId();
+ long sessionId = event.getPartitionSession().getId();
+ logger.info("Reader-1: onStartPartitionSession partition={}, session={}", partitionId, sessionId);
+
+ if (activeSessions.containsKey(partitionId)) {
+ logger.error("VIOLATION: Reader-1 got StartPartitionSession for partition {} session {}" +
+ " while session {} is still active",
+ partitionId, sessionId, activeSessions.get(partitionId));
+ orderingViolation.set(true);
+ }
+ activeSessions.put(partitionId, sessionId);
+ // Confirm the start immediately so the server knows we own the partitions
+ event.confirm();
+ reader1AllPartitionsAssigned.countDown();
+ }
+
+ @Override
+ public void onStopPartitionSession(tech.ydb.topic.read.events.StopPartitionSessionEvent event) {
+ long partitionId = event.getPartitionSession().getPartitionId();
+ logger.info("Reader-1: onStopPartitionSession partition={} — storing, NOT confirming yet",
+ partitionId);
+ // Store the event, deliberately do NOT call confirm() yet
+ pendingStopEvents.add(event);
+ reader1StopReceived.countDown();
+ }
+
+ @Override
+ public void onPartitionSessionClosed(tech.ydb.topic.read.events.PartitionSessionClosedEvent event) {
+ long partitionId = event.getPartitionSession().getPartitionId();
+ long sessionId = event.getPartitionSession().getId();
+ logger.info("Reader-1: onPartitionSessionClosed partition={}, session={}", partitionId, sessionId);
+ activeSessions.remove(partitionId);
+ reader1PartitionClosed.countDown();
+ }
+ })
+ .build()
+ );
+
+ // Step 2: Initialize Reader-1 and wait for all partitions
+ reader1.init().join();
+ sendMessage("test-trigger");
+ assertTrue("Reader-1 did not receive all partition assignments within timeout",
+ reader1AllPartitionsAssigned.await(15, TimeUnit.SECONDS));
+ logger.info("Reader-1 owns all {} partitions", partitionCount);
+
+ // Step 3: Connect Reader-2 to trigger server-side rebalancing.
+ // Server will send graceful StopPartitionSession to Reader-1 for at least one partition.
+ AsyncReader reader2 = client.createAsyncReader(readerSettings, ReadEventHandlersSettings.newBuilder()
+ .setExecutor(reader2Executor)
+ .setEventHandler(new ReadEventHandler() {
+ @Override
+ public void onMessages(tech.ydb.topic.read.events.DataReceivedEvent event) {
+ }
+
+ @Override
+ public void onStartPartitionSession(tech.ydb.topic.read.events.StartPartitionSessionEvent event) {
+ logger.info("Reader-2: onStartPartitionSession partition={}",
+ event.getPartitionSession().getPartitionId());
+ event.confirm();
+ reader2GotPartition.countDown();
+ }
+ })
+ .build()
+ );
+ reader2.init().join();
+ logger.info("Reader-2 initialized — waiting for StopPartitionSession to arrive at Reader-1");
+
+ // Step 4: Wait for Reader-1 to receive at least one StopPartitionSession (not confirmed)
+ assertTrue("Reader-1 did not receive StopPartitionSession within timeout",
+ reader1StopReceived.await(20, TimeUnit.SECONDS));
+ logger.info("Reader-1 received StopPartitionSession (not confirmed yet) — shutting down reader");
+
+ // Step 5: Shut down Reader-1 while StopPartitionSession.confirm() is still pending.
+ // This causes the session to close, which calls onStop() -> handleClosePartitionSession()
+ // -> onPartitionSessionClosed() for all remaining active partitions.
+ CompletableFuture reader1Shutdown = reader1.shutdown();
+
+ assertTrue("Reader-1 partitions were not closed within timeout",
+ reader1PartitionClosed.await(15, TimeUnit.SECONDS));
+ logger.info("Reader-1: all partitions closed. Now calling delayed StopPartitionSession.confirm()...");
+
+ // Step 6: Call confirm() on all stored StopPartitionSession events.
+ // The session is already closed, so this must be a safe no-op.
+ for (tech.ydb.topic.read.events.StopPartitionSessionEvent event : pendingStopEvents) {
+ logger.info("Calling late confirm() for StopPartitionSession on partition {}",
+ event.getPartitionSession().getPartitionId());
+ event.confirm();
+ }
+ logger.info("All late StopPartitionSession.confirm() calls completed — no exception thrown");
+
+ reader1Shutdown.get(10, TimeUnit.SECONDS);
+
+ // Allow some time for any unexpected side effects
+ Thread.sleep(500);
+
+ // Wait for Reader-2 to receive the partition (after Reader-1 released it) — best effort
+ boolean reader2ReceivedPartition = reader2GotPartition.await(10, TimeUnit.SECONDS);
+ logger.info("Reader-2 received partition: {}", reader2ReceivedPartition);
+
+ // Cleanup
+ reader2.shutdown().get(10, TimeUnit.SECONDS);
+ reader1Executor.shutdownNow();
+ reader2Executor.shutdownNow();
+
+ // Step 7: Verify no ordering violation
+ assertFalse("Event ordering violation: StartPartitionSession received while session was still active",
+ orderingViolation.get());
+ logger.info("testLateStopConfirmAfterPartitionRevoked PASSED");
+ }
+
+ private @NotNull TopicReaderEventOrderingTest.StructureTest2 getStructureForRaceCondition() {
+ // Map for tracking partition and attached sessions
+ ConcurrentHashMap> reader1PartitionSession = new ConcurrentHashMap<>();
+
+ // Map for tracking partition and is reader1 in cleanup. false -> reader 1 read partition is in progress
+ // true -> reader 1 read partition is detached from partition
+ ConcurrentHashMap reader1CleanupInProgress = new ConcurrentHashMap<>();
+
+ // Map for tracking partition and is reader1 in cleanup. false -> reader1 not started read partition or cleanUp wasn't completed
+ // true -> reader1 completed cleanup
+ ConcurrentHashMap reader1CleanupCompleted = new ConcurrentHashMap<>();
+ for (long i = 0; i < partitionCount; i++) {
+ reader1CleanupCompleted.put(i, new AtomicBoolean(false));
+ }
+
+ // Simple value to detect race condition
+ AtomicBoolean raceConditionDetected = new AtomicBoolean(false);
+ CountDownLatch reader1Started = new CountDownLatch(partitionCount);
+ CountDownLatch reader1CleanupStarted = new CountDownLatch(partitionCount);
+ CountDownLatch reader2Started = new CountDownLatch(partitionCount);
+
+ // Some latch in which reader1 stuck for 1 minute. Be careful to increment partition count!
+ // All single threads are stuck for 5 seconds
+ CountDownLatch allowReader1ToFinish = new CountDownLatch(1);
+
+ // Create two single-threaded executors to simulate the scenario
+ ExecutorService reader1Executor = Executors.newSingleThreadExecutor(r -> new Thread(r, "reader-1-executor"));
+ ExecutorService reader2Executor = Executors.newSingleThreadExecutor(r -> new Thread(r, "reader-2-executor"));
+
+ ReaderSettings readerSettings = ReaderSettings.newBuilder()
+ .addTopic(TopicReadSettings.newBuilder()
+ .setPath(testTopic)
+ .build())
+ .setConsumerName(TEST_CONSUMER)
+ .build();
+ StructureTest2 structureTest2 = new StructureTest2(reader1PartitionSession, reader1CleanupInProgress, reader1CleanupCompleted, raceConditionDetected, reader1Started, reader1CleanupStarted, reader2Started, allowReader1ToFinish, reader1Executor, reader2Executor, readerSettings);
+ return structureTest2;
+ }
+
+ private static class StructureTest2 {
+ public final ConcurrentHashMap> reader1PartitionSession;
+ public final ConcurrentHashMap reader1CleanupInProgress;
+ public final ConcurrentHashMap reader1CleanupCompleted;
+ public final AtomicBoolean raceConditionDetected;
+ public final CountDownLatch reader1Started;
+ public final CountDownLatch reader1CleanupStarted;
+ public final CountDownLatch reader2Started;
+ public final CountDownLatch allowReader1ToFinish;
+ public final ExecutorService reader1Executor;
+ public final ExecutorService reader2Executor;
+ public final ReaderSettings readerSettings;
+
+ public StructureTest2(ConcurrentHashMap> reader1PartitionSession, ConcurrentHashMap reader1CleanupInProgress, ConcurrentHashMap reader1CleanupCompleted, AtomicBoolean raceConditionDetected, CountDownLatch reader1Started, CountDownLatch reader1CleanupStarted, CountDownLatch reader2Started, CountDownLatch allowReader1ToFinish, ExecutorService reader1Executor, ExecutorService reader2Executor, ReaderSettings readerSettings) {
+ this.reader1PartitionSession = reader1PartitionSession;
+ this.reader1CleanupInProgress = reader1CleanupInProgress;
+ this.reader1CleanupCompleted = reader1CleanupCompleted;
+ this.raceConditionDetected = raceConditionDetected;
+ this.reader1Started = reader1Started;
+ this.reader1CleanupStarted = reader1CleanupStarted;
+ this.reader2Started = reader2Started;
+ this.allowReader1ToFinish = allowReader1ToFinish;
+ this.reader1Executor = reader1Executor;
+ this.reader2Executor = reader2Executor;
+ this.readerSettings = readerSettings;
+ }
+ }
+
+ private @NotNull TopicReaderEventOrderingTest.StructureTest1 getStructureForOrderGarantees() {
+ Map> eventLog = new ConcurrentHashMap<>();
+ for (long i = 0; i < partitionCount; i++) {
+ eventLog.put(i, Collections.synchronizedList(new ArrayList<>()));
+ }
+
+ Map activeSessions = new ConcurrentHashMap<>();
+
+ CountDownLatch startReceived = new CountDownLatch(partitionCount);
+ CountDownLatch closeReceived = new CountDownLatch(partitionCount);
+ AtomicBoolean orderingViolation = new AtomicBoolean(false);
+
+ ExecutorService executor = Executors.newSingleThreadExecutor(r -> new Thread(r, "test-event-executor"));
+
+ ReaderSettings readerSettings = ReaderSettings.newBuilder()
+ .addTopic(TopicReadSettings.newBuilder()
+ .setPath(testTopic)
+ .build())
+ .setConsumerName(TEST_CONSUMER)
+ .build();
+ return new StructureTest1(eventLog, activeSessions, startReceived, closeReceived, orderingViolation, executor, readerSettings);
+ }
+
+ private static class StructureTest1 {
+ public final Map> eventLog;
+ public final Map activeSessions;
+ public final CountDownLatch startReceived;
+ public final CountDownLatch closeReceived;
+ public final AtomicBoolean orderingViolation;
+ public final ExecutorService executor;
+ public final ReaderSettings readerSettings;
+
+ public StructureTest1(Map> eventLog, Map activeSessions, CountDownLatch startReceived, CountDownLatch closeReceived, AtomicBoolean orderingViolation, ExecutorService executor, ReaderSettings readerSettings) {
+ this.eventLog = eventLog;
+ this.activeSessions = activeSessions;
+ this.startReceived = startReceived;
+ this.closeReceived = closeReceived;
+ this.orderingViolation = orderingViolation;
+ this.executor = executor;
+ this.readerSettings = readerSettings;
+ }
+ }
+
+ private AsyncReader getAsyncReaderForOrderGaran(ReaderSettings readerSettings, ExecutorService executor, Map> eventLog, Map activeSessions, AtomicBoolean orderingViolation, CountDownLatch startReceived, CountDownLatch closeReceived) {
+ return client.createAsyncReader(readerSettings, ReadEventHandlersSettings.newBuilder()
+ .setExecutor(executor)
+ .setEventHandler(new ReadEventHandler() {
+
+ @Override
+ public void onMessages(tech.ydb.topic.read.events.DataReceivedEvent event) {
+ long partitionId = event.getPartitionSession().getPartitionId();
+ eventLog.get(partitionId).add("onMessages[session=" + event.getPartitionSession().getId() + "]");
+ }
+
+ @Override
+ public void onStartPartitionSession(tech.ydb.topic.read.events.StartPartitionSessionEvent event) {
+ long sessionId = event.getPartitionSession().getId();
+ long partitionId = event.getPartitionSession().getPartitionId();
+
+ // Record start event
+ eventLog.get(partitionId).add("onStartPartitionSession[partitionId = " + partitionId + ",session=" + sessionId + "]");
+ logger.info("onStartPartitionSession: session={}", sessionId);
+
+ if (activeSessions.get(partitionId) != null) {
+ logger.error("START event received while session {} is still active", activeSessions.get(partitionId));
+ orderingViolation.set(true);
+ }
+
+ activeSessions.put(partitionId, sessionId);
+ event.confirm();
+ startReceived.countDown();
+ }
+
+ @Override
+ public void onPartitionSessionClosed(tech.ydb.topic.read.events.PartitionSessionClosedEvent event) {
+ long sessionId = event.getPartitionSession().getId();
+ long partitionId = event.getPartitionSession().getPartitionId();
+
+ // Record close event
+ eventLog.get(partitionId).add("onPartitionSessionClosed[partitionId =" + partitionId + ",session=" + sessionId + "]");
+
+ logger.info("onPartitionSessionClosed: session={}", sessionId);
+ activeSessions.remove(partitionId);
+ closeReceived.countDown();
+ }
+ })
+ .build()
+ );
+ }
+
+ private AsyncReader getAsyncReader2ForRaceCondition(ReaderSettings readerSettings, ExecutorService reader2Executor, ConcurrentHashMap reader1CleanupInProgress, AtomicBoolean raceConditionDetected, ConcurrentHashMap reader1CleanupCompleted, CountDownLatch reader2Started) {
+ return client.createAsyncReader(readerSettings, ReadEventHandlersSettings.newBuilder()
+ .setExecutor(reader2Executor)
+ .setEventHandler(new ReadEventHandler() {
+ @Override
+ public void onMessages(tech.ydb.topic.read.events.DataReceivedEvent event) {
+ // No-op
+ }
+
+ @Override
+ public void onStartPartitionSession(tech.ydb.topic.read.events.StartPartitionSessionEvent event) {
+ long partitionId = event.getPartitionSession().getPartitionId();
+ PartitionSession session = event.getPartitionSession();
+ logger.info("Reader-2: onStartPartitionSession - partition={}, session={}",
+ session.getPartitionId(), session.getId());
+
+ // Check if Reader-1 is still cleaning up
+ if (reader1CleanupInProgress.get(partitionId).get()) {
+ logger.error("RACE CONDITION DETECTED: Reader-2 received partition {} while Reader-1 is still cleaning up",
+ session.getPartitionId());
+ raceConditionDetected.set(true);
+ }
+
+ if (!reader1CleanupCompleted.get(partitionId).get()) {
+ logger.warn("Reader-2 received partition {} before Reader-1 completed cleanup",
+ session.getPartitionId());
+ }
+
+ event.confirm();
+ reader2Started.countDown();
+ }
+ })
+ .build()
+ );
+ }
+
+ private AsyncReader getAsyncReader1ForRaceCondition(ReaderSettings readerSettings, ExecutorService reader1Executor, ConcurrentHashMap> reader1PartitionSession, CountDownLatch reader1Started, ConcurrentHashMap reader1CleanupInProgress, CountDownLatch reader1CleanupStarted, CountDownLatch allowReader1ToFinish, ConcurrentHashMap reader1CleanupCompleted) {
+ return client.createAsyncReader(readerSettings, ReadEventHandlersSettings.newBuilder()
+ .setExecutor(reader1Executor)
+ .setEventHandler(new ReadEventHandler() {
+ @Override
+ public void onMessages(tech.ydb.topic.read.events.DataReceivedEvent event) {
+ }
+
+ @Override
+ public void onStartPartitionSession(tech.ydb.topic.read.events.StartPartitionSessionEvent event) {
+ long partitionId = event.getPartitionSession().getPartitionId();
+ PartitionSession session = event.getPartitionSession();
+ logger.info("Reader-1: onStartPartitionSession - partition={}, session={}",
+ session.getPartitionId(), session.getId());
+ reader1PartitionSession.compute(partitionId, (k, ref) -> {
+ if (ref == null) {
+ ref = new AtomicReference<>();
+ }
+ ref.set(session);
+ return ref;
+ });
+
+ event.confirm();
+ reader1Started.countDown();
+ }
+
+ @Override
+ public void onPartitionSessionClosed(tech.ydb.topic.read.events.PartitionSessionClosedEvent event) {
+ long partitionId = event.getPartitionSession().getPartitionId();
+ PartitionSession session = event.getPartitionSession();
+ logger.info("Reader-1: onPartitionSessionClosed - partition={}, session={}",
+ session.getPartitionId(), session.getId());
+ logger.info("Reader-1: before closing resources");
+
+ reader1CleanupInProgress.compute(partitionId, (k, ref) -> {
+ if (ref == null) {
+ ref = new AtomicBoolean();
+ }
+ ref.set(true);
+ return ref;
+ });
+
+ reader1CleanupStarted.countDown();
+
+ // Step 4. Block Reader-1 cleanup to simulate slow resource release. (e.g., closing database connections, flushing buffers)
+ try {
+ boolean finished = allowReader1ToFinish.await(5, TimeUnit.SECONDS);
+ if (!finished) {
+ logger.error("Reader-1: cleanup timeout");
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.error("Reader-1: cleanup interrupted", e);
+ }
+
+ logger.info("Reader-1: after closing resources");
+
+ reader1CleanupInProgress.get(partitionId).set(false);
+ reader1CleanupCompleted.get(partitionId).set(true);
+
+ }
+
+ @Override
+ public void onReaderClosed(tech.ydb.topic.read.events.ReaderClosedEvent event) {
+ logger.info("Reader-1: onReaderClosed");
+ }
+ })
+ .build()
+ );
+ }
+}