diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java index f12dc77f724..33fb6bf0cc8 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java @@ -81,6 +81,7 @@ import org.apache.phoenix.jdbc.PhoenixPreparedStatement; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.replication.reader.ReplicationLogReplayService; import org.apache.phoenix.schema.CompiledConditionalTTLExpression; import org.apache.phoenix.schema.CompiledTTLExpression; import org.apache.phoenix.schema.ConditionalTTLExpression; @@ -199,6 +200,17 @@ public CompactionScanner(RegionCoprocessorEnvironment env, Store store, this.maxLookbackWindowStart = this.maxLookbackInMillis == 0 ? compactionTime : compactionTime - (this.maxLookbackInMillis + 1); + Configuration conf = env.getConfiguration(); + boolean replayEnabled = + conf.getBoolean(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED, + ReplicationLogReplayService.DEFAULT_REPLICATION_REPLAY_ENABLED); + boolean guardEnabled = + conf.getBoolean(ReplicationLogReplayService.REPLICATION_COMPACTION_GUARD_ENABLED, + ReplicationLogReplayService.DEFAULT_REPLICATION_COMPACTION_GUARD_ENABLED); + if (replayEnabled && guardEnabled) { + this.maxLookbackWindowStart = ReplicationLogReplayService.applyReplicationConsistencyGuard( + this.maxLookbackWindowStart, conf, tableName, columnFamilyName); + } ColumnFamilyDescriptor cfd = store.getColumnFamilyDescriptor(); this.major = major && !forceMinorCompaction; this.minVersion = cfd.getMinVersions(); diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java index 24d40faac77..e413ed73e9b 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogReplayService.java @@ -29,6 +29,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; /** @@ -50,6 +51,17 @@ public class ReplicationLogReplayService { */ public static final boolean DEFAULT_REPLICATION_REPLAY_ENABLED = false; + /** + * Configuration key for enabling/disabling the replication compaction guard + */ + public static final String REPLICATION_COMPACTION_GUARD_ENABLED = + "phoenix.replication.compaction.guard.enabled"; + + /** + * Default value for replication compaction guard enabled flag + */ + public static final boolean DEFAULT_REPLICATION_COMPACTION_GUARD_ENABLED = true; + /** * Number of threads in the executor pool for the replication replay service */ @@ -83,7 +95,7 @@ public class ReplicationLogReplayService { private ScheduledExecutorService scheduler; private volatile boolean isRunning = false; - private ReplicationLogReplayService(final Configuration conf) { + protected ReplicationLogReplayService(final Configuration conf) { this.conf = conf; } @@ -105,6 +117,16 @@ public static ReplicationLogReplayService getInstance(Configuration conf) throws return instance; } + @VisibleForTesting + public static void setInstanceForTesting(ReplicationLogReplayService testInstance) { + instance = testInstance; + } + + @VisibleForTesting + public static void resetInstanceForTesting() { + instance = null; + } + /** * Starts the replication log replay service by initializing the scheduler and scheduling periodic * replay operations for each HA Group. @@ -229,6 +251,37 @@ protected long getConsistencyPoint() throws IOException, SQLException { return consistencyPoint; } + /** + * Applies the replication replay consistency point as a floor on maxLookbackWindowStart. On + * standby clusters, this prevents compaction from dropping delete markers that have timestamps + * newer than the consistency point. + */ + public static long applyReplicationConsistencyGuard(long currentMaxLookbackWindowStart, + Configuration conf, String tableName, String columnFamilyName) { + try { + long consistencyPoint = getInstance(conf).getConsistencyPoint(); + return adjustMaxLookbackWindowStart(currentMaxLookbackWindowStart, consistencyPoint, + tableName, columnFamilyName); + } catch (Exception e) { + LOG.warn("Replication guard enabled but consistency point unavailable for table={} store={}." + + " Retaining all delete markers.", tableName, columnFamilyName, e); + return 0L; + } + } + + @VisibleForTesting + static long adjustMaxLookbackWindowStart(long currentMaxLookbackWindowStart, + long consistencyPoint, String tableName, String columnFamilyName) { + long adjusted = Math.min(currentMaxLookbackWindowStart, consistencyPoint); + if (adjusted < currentMaxLookbackWindowStart) { + LOG.info( + "Replication guard: table={} store={} maxLookbackWindowStart adjusted from {} to {}" + + " (consistencyPoint={})", + tableName, columnFamilyName, currentMaxLookbackWindowStart, adjusted, consistencyPoint); + } + return adjusted; + } + /** Returns the list of HA groups on the cluster */ protected List getReplicationGroups() throws SQLException { return HAGroupStoreManager.getInstance(conf).getHAGroupNames(); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java new file mode 100644 index 00000000000..68ff6ed986f --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardDisabledIT.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication.reader; + +import static org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY; +import static org.apache.phoenix.util.TestUtil.assertRawRowCount; +import static org.junit.Assert.assertFalse; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Map; +import org.apache.hadoop.hbase.TableName; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.ManualEnvironmentEdge; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.TestUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; + +/** + * Integration test verifying that the replication compaction guard does NOT interfere with normal + * compaction when explicitly disabled via configuration. + */ +@Category(NeedsOwnMiniClusterTest.class) +public class CompactionReplicationGuardDisabledIT extends BaseTest { + + private static final int MAX_LOOKBACK_AGE = 15; + private static final int ROWS_POPULATED = 2; + private ManualEnvironmentEdge injectEdge; + + @BeforeClass + public static synchronized void doSetup() throws Exception { + Map props = Maps.newHashMapWithExpectedSize(5); + props.put(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(MAX_LOOKBACK_AGE)); + props.put(QueryServices.PHOENIX_COMPACTION_ENABLED, Boolean.toString(true)); + props.put(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED, + Boolean.toString(true)); + props.put(ReplicationLogReplayService.REPLICATION_COMPACTION_GUARD_ENABLED, + Boolean.toString(false)); + props.put("hbase.procedure.remote.dispatcher.delay.msec", "0"); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @Before + public void beforeTest() throws Exception { + EnvironmentEdgeManager.reset(); + injectEdge = new ManualEnvironmentEdge(); + injectEdge.setValue(System.currentTimeMillis()); + EnvironmentEdgeManager.injectEdge(injectEdge); + } + + @After + public synchronized void afterTest() throws Exception { + ReplicationLogReplayService.resetInstanceForTesting(); + EnvironmentEdgeManager.reset(); + boolean refCountLeaked = isAnyStoreRefCountLeaked(); + assertFalse("refCount leaked", refCountLeaked); + } + + /** + * When guard is disabled, delete markers are purged normally by maxLookback even though the + * consistency point would have protected them if the guard were enabled. + */ + @Test(timeout = 120000L) + public void testGuardDisabledDeleteMarkersPurgedByMaxLookback() throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); + createTable(dataTableName); + TableName dataTable = TableName.valueOf(dataTableName); + populateTable(dataTableName); + + injectEdge.incrementValue(1); + long beforeDeleteTime = EnvironmentEdgeManager.currentTimeMillis(); + + // Delete a row + conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE id = 'a'"); + conn.commit(); + injectEdge.incrementValue(1); + + // Set consistency point BEFORE delete — guard would retain if enabled + long consistencyPoint = beforeDeleteTime - 1; + ReplicationLogReplayService mockService = mock(ReplicationLogReplayService.class); + when(mockService.getConsistencyPoint()).thenReturn(consistencyPoint); + ReplicationLogReplayService.setInstanceForTesting(mockService); + + // Advance past maxLookback + injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000); + + flush(dataTable); + majorCompact(dataTable); + + // Guard disabled — delete marker purged by maxLookback as normal + assertRawRowCount(conn, dataTable, ROWS_POPULATED - 1); + } + } + + private void flush(TableName table) throws IOException { + getUtility().getAdmin().flush(table); + } + + private void majorCompact(TableName table) throws Exception { + TestUtil.majorCompact(getUtility(), table); + } + + private void createTable(String tableName) throws SQLException { + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute( + "CREATE TABLE " + tableName + " (id VARCHAR(10) NOT NULL PRIMARY KEY, val1 VARCHAR(10)," + + " val2 VARCHAR(10), val3 VARCHAR(10))"); + conn.commit(); + } + } + + private void populateTable(String tableName) throws SQLException { + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement() + .execute("UPSERT INTO " + tableName + " VALUES ('a', 'ab', 'abc', 'abcd')"); + conn.commit(); + conn.createStatement() + .execute("UPSERT INTO " + tableName + " VALUES ('b', 'bc', 'bcd', 'bcde')"); + conn.commit(); + } + } +} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java new file mode 100644 index 00000000000..7788155eec3 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/CompactionReplicationGuardIT.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication.reader; + +import static org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY; +import static org.apache.phoenix.util.TestUtil.assertRawRowCount; +import static org.junit.Assert.assertFalse; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Map; +import org.apache.hadoop.hbase.TableName; +import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest; +import org.apache.phoenix.query.BaseTest; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.apache.phoenix.util.ManualEnvironmentEdge; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.TestUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; + +/** + * Integration tests for the replication consistency point compaction guard. Verifies that + * CompactionScanner retains delete markers with timestamps newer than the consistency point on + * clusters where replication replay is enabled. + */ +@Category(NeedsOwnMiniClusterTest.class) +public class CompactionReplicationGuardIT extends BaseTest { + + private static final int MAX_LOOKBACK_AGE = 15; + private static final int ROWS_POPULATED = 2; + private ManualEnvironmentEdge injectEdge; + + @BeforeClass + public static synchronized void doSetup() throws Exception { + Map props = Maps.newHashMapWithExpectedSize(5); + props.put(PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(MAX_LOOKBACK_AGE)); + props.put(QueryServices.PHOENIX_COMPACTION_ENABLED, Boolean.toString(true)); + props.put(ReplicationLogReplayService.PHOENIX_REPLICATION_REPLAY_ENABLED, + Boolean.toString(true)); + props.put(ReplicationLogReplayService.REPLICATION_COMPACTION_GUARD_ENABLED, + Boolean.toString(true)); + props.put("hbase.procedure.remote.dispatcher.delay.msec", "0"); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + @Before + public void beforeTest() throws Exception { + EnvironmentEdgeManager.reset(); + injectEdge = new ManualEnvironmentEdge(); + injectEdge.setValue(System.currentTimeMillis()); + EnvironmentEdgeManager.injectEdge(injectEdge); + } + + @After + public synchronized void afterTest() throws Exception { + ReplicationLogReplayService.resetInstanceForTesting(); + EnvironmentEdgeManager.reset(); + boolean refCountLeaked = isAnyStoreRefCountLeaked(); + assertFalse("refCount leaked", refCountLeaked); + } + + /** + * Test 1: Guard retains delete markers that maxLookback would have purged. The consistency point + * is set BEFORE the delete timestamp, so the delete marker is newer than the consistency point + * and must be retained even after maxLookback window passes. + */ + @Test(timeout = 120000L) + public void testGuardRetainsDeleteMarkersNewerThanConsistencyPoint() throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); + createTable(dataTableName); + TableName dataTable = TableName.valueOf(dataTableName); + populateTable(dataTableName); + + injectEdge.incrementValue(1); + long beforeDeleteTime = EnvironmentEdgeManager.currentTimeMillis(); + + // Delete a row + conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE id = 'a'"); + conn.commit(); + injectEdge.incrementValue(1); + + // Set consistency point BEFORE the delete — meaning replay hasn't caught up to the delete + long consistencyPoint = beforeDeleteTime - 1; + injectMockConsistencyPoint(consistencyPoint); + + // Advance time past maxLookback window — without guard, marker would be purged + injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000); + + flush(dataTable); + majorCompact(dataTable); + + // Delete marker should be retained because its timestamp > consistencyPoint + assertRawRowCount(conn, dataTable, ROWS_POPULATED); + } + } + + /** + * Test 2: Both maxLookback and guard allow purge. The consistency point has advanced past the + * delete marker AND maxLookback window has passed — marker should be purged. + */ + @Test(timeout = 120000L) + public void testDeleteMarkersPurgedWhenOlderThanBothConsistencyPointAndMaxLookback() + throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); + createTable(dataTableName); + TableName dataTable = TableName.valueOf(dataTableName); + populateTable(dataTableName); + + injectEdge.incrementValue(1); + + // Delete a row + conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE id = 'a'"); + conn.commit(); + injectEdge.incrementValue(1); + + // Advance time past maxLookback + injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000); + + // Set consistency point to current time — replay is fully caught up + long consistencyPoint = EnvironmentEdgeManager.currentTimeMillis(); + injectMockConsistencyPoint(consistencyPoint); + + flush(dataTable); + majorCompact(dataTable); + + // Delete marker should be purged — both guard and maxLookback agree + assertRawRowCount(conn, dataTable, ROWS_POPULATED - 1); + } + } + + /** + * Test 3: MaxLookback retains even when guard wouldn't. Consistency point has advanced past the + * delete, but we're still within the maxLookback window — marker retained by maxLookback. + */ + @Test(timeout = 120000L) + public void testMaxLookbackRetainsEvenWhenGuardAllowsPurge() throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); + createTable(dataTableName); + TableName dataTable = TableName.valueOf(dataTableName); + populateTable(dataTableName); + + injectEdge.incrementValue(1); + + // Delete a row + conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE id = 'a'"); + conn.commit(); + injectEdge.incrementValue(1); + + // Set consistency point to current time — guard would allow purge + long consistencyPoint = EnvironmentEdgeManager.currentTimeMillis(); + injectMockConsistencyPoint(consistencyPoint); + + // Do NOT advance past maxLookback — still within the window + injectEdge.incrementValue(1); + + flush(dataTable); + majorCompact(dataTable); + + // Delete marker retained because still within maxLookback window + assertRawRowCount(conn, dataTable, ROWS_POPULATED); + } + } + + /** + * Test 4: Guard fallback when consistency point unavailable — retains all delete markers. When + * the replay service throws an exception (e.g., not initialized), the guard falls back to + * retaining all markers to avoid data loss. + */ + @Test(timeout = 120000L) + public void testGuardFallbackRetainsAllWhenConsistencyPointUnavailable() throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); + createTable(dataTableName); + TableName dataTable = TableName.valueOf(dataTableName); + populateTable(dataTableName); + + injectEdge.incrementValue(1); + + // Delete a row + conn.createStatement().execute("DELETE FROM " + dataTableName + " WHERE id = 'a'"); + conn.commit(); + injectEdge.incrementValue(1); + + // Inject a mock that throws — simulating uninitialized replay service + ReplicationLogReplayService mockService = mock(ReplicationLogReplayService.class); + when(mockService.getConsistencyPoint()) + .thenThrow(new IOException("HA groups not initialized")); + ReplicationLogReplayService.setInstanceForTesting(mockService); + + // Advance past maxLookback + injectEdge.incrementValue(MAX_LOOKBACK_AGE * 1000 + 1000); + + flush(dataTable); + majorCompact(dataTable); + + // Fallback retains all — delete marker NOT purged despite maxLookback passing + assertRawRowCount(conn, dataTable, ROWS_POPULATED); + } + } + + private void injectMockConsistencyPoint(long consistencyPoint) throws IOException, SQLException { + ReplicationLogReplayService mockService = mock(ReplicationLogReplayService.class); + when(mockService.getConsistencyPoint()).thenReturn(consistencyPoint); + ReplicationLogReplayService.setInstanceForTesting(mockService); + } + + private void flush(TableName table) throws IOException { + getUtility().getAdmin().flush(table); + } + + private void majorCompact(TableName table) throws Exception { + TestUtil.majorCompact(getUtility(), table); + } + + private void createTable(String tableName) throws SQLException { + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute( + "CREATE TABLE " + tableName + " (id VARCHAR(10) NOT NULL PRIMARY KEY, val1 VARCHAR(10)," + + " val2 VARCHAR(10), val3 VARCHAR(10))"); + conn.commit(); + } + } + + private void populateTable(String tableName) throws SQLException { + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement() + .execute("UPSERT INTO " + tableName + " VALUES ('a', 'ab', 'abc', 'abcd')"); + conn.commit(); + conn.createStatement() + .execute("UPSERT INTO " + tableName + " VALUES ('b', 'bc', 'bcd', 'bcde')"); + conn.commit(); + } + } +} diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java new file mode 100644 index 00000000000..0cffee289b0 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationCompactionGuardTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication.reader; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +/** + * Tests for the replication consistency point guard in ReplicationLogReplayService. Tests the pure + * adjustment logic (adjustMaxLookbackWindowStart) which is the core of the guard — ensuring + * maxLookbackWindowStart is floored to the consistency point. + */ +public class ReplicationCompactionGuardTest { + + private static final String TABLE_NAME = "TEST_TABLE"; + private static final String CF_NAME = "0"; + + @Test + public void testAdjustsWindowWhenConsistencyPointIsLower() { + long maxLookbackWindowStart = 1000000L; + long consistencyPoint = 500000L; + + long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart, + consistencyPoint, TABLE_NAME, CF_NAME); + + assertEquals(consistencyPoint, result); + } + + @Test + public void testNoChangeWhenConsistencyPointIsHigher() { + long maxLookbackWindowStart = 500000L; + long consistencyPoint = 1000000L; + + long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart, + consistencyPoint, TABLE_NAME, CF_NAME); + + assertEquals(maxLookbackWindowStart, result); + } + + @Test + public void testNoChangeWhenConsistencyPointEqualsWindowStart() { + long maxLookbackWindowStart = 500000L; + long consistencyPoint = 500000L; + + long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart, + consistencyPoint, TABLE_NAME, CF_NAME); + + assertEquals(maxLookbackWindowStart, result); + } + + @Test + public void testConsistencyPointAtZeroRetainsAll() { + long maxLookbackWindowStart = 1000000L; + long consistencyPoint = 0L; + + long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart, + consistencyPoint, TABLE_NAME, CF_NAME); + + assertEquals(0L, result); + } + + @Test + public void testLargeTimestampsNoAdjustmentNeeded() { + long maxLookbackWindowStart = System.currentTimeMillis() - 86400000L; + long consistencyPoint = System.currentTimeMillis() - 120000L; + + long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart, + consistencyPoint, TABLE_NAME, CF_NAME); + + assertEquals(maxLookbackWindowStart, result); + } + + @Test + public void testConsistencyPointFarInPastPushesWindowBack() { + long maxLookbackWindowStart = System.currentTimeMillis() - 86400000L; + long consistencyPoint = System.currentTimeMillis() - 604800000L; + + long result = ReplicationLogReplayService.adjustMaxLookbackWindowStart(maxLookbackWindowStart, + consistencyPoint, TABLE_NAME, CF_NAME); + + assertEquals(consistencyPoint, result); + } +}