diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java index 5e1354d8f7..fa5d5df9ed 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java @@ -402,7 +402,14 @@ private boolean canShuffleToMemory(long requestedSize) { public synchronized void waitForShuffleToMergeMemory() throws InterruptedException { long startTime = System.currentTimeMillis(); while(usedMemory > memoryLimit) { - wait(); + wait(1000); + if (usedMemory > memoryLimit && !inMemoryMapOutputs.isEmpty()) { + // Avoid deadlock: if memory is over limit but commitMemory is below mergeThreshold, + // no merge will be triggered automatically. Force a merge to free memory. + LOG.info("Memory limit exceeded with no merge triggered (usedMemory={}, commitMemory={}, mergeThreshold={})." + + " Forcing in-memory merge to avoid deadlock.", usedMemory, commitMemory, mergeThreshold); + startMemToDiskMerge(); + } } if (LOG.isDebugEnabled()) { LOG.debug("Waited for " + (System.currentTimeMillis() - startTime) + " for memory to become" diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java index 048699de2e..caac28ca6a 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java @@ -19,6 +19,7 @@ package org.apache.tez.runtime.library.common.shuffle.orderedgrouped; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.atLeastOnce; @@ -255,6 +256,80 @@ public void testIntermediateMemoryMergeAccounting() throws Exception { assertEquals(data1.length + data2.length, mergeManager.getUsedMemory()); } + @Test(timeout = 30000) + public void testWaitForShuffleToMergeMemoryNoDeadlock() throws Throwable { + Configuration conf = new TezConfiguration(defaultConf); + conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, false); + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, IntWritable.class.getName()); + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, IntWritable.class.getName()); + + Path localDir = new Path(workDir, "local"); + localFs.mkdirs(localDir); + conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDir.toString()); + + FileSystem fs = FileSystem.getLocal(conf); + LocalDirAllocator localDirAllocator = new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS); + InputContext inputContext = createMockInputContext(UUID.randomUUID().toString()); + ExceptionReporter exceptionReporter = mock(ExceptionReporter.class); + + // initialMemoryAvailable=2000000: memoryLimit=2000000, mergeThreshold=1800000, maxSingleShuffleLimit=500000 + MergeManager mergeManager = new MergeManager(conf, fs, localDirAllocator, inputContext, + null, null, null, null, exceptionReporter, 2000000, null, false, -1); + mergeManager.configureAndStart(); + + // Generate valid IFile data (~490000 bytes, below maxSingleShuffleLimit=500000) + InputAttemptIdentifier id1 = new InputAttemptIdentifier(0, 0); + InputAttemptIdentifier id2 = new InputAttemptIdentifier(1, 0); + InputAttemptIdentifier id3 = new InputAttemptIdentifier(2, 0); + InputAttemptIdentifier id4 = new InputAttemptIdentifier(3, 0); + InputAttemptIdentifier id5 = new InputAttemptIdentifier(4, 0); + byte[] data1 = generateDataBySize(conf, 490000, id1); + byte[] data2 = generateDataBySize(conf, 490000, id2); + + // Reserve and commit 2 outputs: commitMemory ~= 980000 < mergeThreshold=1800000 (no auto-merge) + MapOutput mo1 = mergeManager.reserve(id1, data1.length, data1.length, 0); + MapOutput mo2 = mergeManager.reserve(id2, data2.length, data2.length, 0); + assertEquals(MapOutput.Type.MEMORY, mo1.getType()); + assertEquals(MapOutput.Type.MEMORY, mo2.getType()); + System.arraycopy(data1, 0, mo1.getMemory(), 0, data1.length); + System.arraycopy(data2, 0, mo2.getMemory(), 0, data2.length); + mo1.commit(); + mo2.commit(); + + // Reserve 3 more outputs without committing to push usedMemory > memoryLimit=2000000 + MapOutput mo3 = mergeManager.reserve(id3, data1.length, data1.length, 0); + MapOutput mo4 = mergeManager.reserve(id4, data1.length, data1.length, 0); + MapOutput mo5 = mergeManager.reserve(id5, data1.length, data1.length, 0); + assertEquals(MapOutput.Type.MEMORY, mo3.getType()); + assertEquals(MapOutput.Type.MEMORY, mo4.getType()); + assertEquals(MapOutput.Type.MEMORY, mo5.getType()); + + // Verify deadlock precondition: usedMemory > memoryLimit, commitMemory < mergeThreshold + assertTrue("usedMemory should exceed memoryLimit", mergeManager.getUsedMemory() > 2000000L); + assertTrue("commitMemory should be below mergeThreshold", mergeManager.getCommitMemory() < 1800000L); + + // waitForShuffleToMergeMemory() should complete without deadlock: + // the fix forces a merge when memory is exhausted but commitMemory < mergeThreshold + Thread waitThread = new Thread(() -> { + try { + mergeManager.waitForShuffleToMergeMemory(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + waitThread.start(); + waitThread.join(15000); + assertFalse("waitForShuffleToMergeMemory deadlocked", waitThread.isAlive()); + + if (waitThread.isAlive()) { + waitThread.interrupt(); + } + mo3.abort(); + mo4.abort(); + mo5.abort(); + mergeManager.close(true); + } + @Test public void testDiskMergeWithCodec() throws Throwable { Configuration conf = new TezConfiguration(defaultConf);