Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,14 @@ private boolean canShuffleToMemory(long requestedSize) {
public synchronized void waitForShuffleToMergeMemory() throws InterruptedException {
long startTime = System.currentTimeMillis();
while(usedMemory > memoryLimit) {
wait();
wait(1000);
if (usedMemory > memoryLimit && !inMemoryMapOutputs.isEmpty()) {
// Avoid deadlock: if memory is over limit but commitMemory is below mergeThreshold,
// no merge will be triggered automatically. Force a merge to free memory.
LOG.info("Memory limit exceeded with no merge triggered (usedMemory={}, commitMemory={}, mergeThreshold={})."
+ " Forcing in-memory merge to avoid deadlock.", usedMemory, commitMemory, mergeThreshold);
startMemToDiskMerge();
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Waited for " + (System.currentTimeMillis() - startTime) + " for memory to become"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.atLeastOnce;
Expand Down Expand Up @@ -255,6 +256,80 @@ public void testIntermediateMemoryMergeAccounting() throws Exception {
assertEquals(data1.length + data2.length, mergeManager.getUsedMemory());
}

@Test(timeout = 30000)
public void testWaitForShuffleToMergeMemoryNoDeadlock() throws Throwable {
Configuration conf = new TezConfiguration(defaultConf);
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, false);
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, IntWritable.class.getName());
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, IntWritable.class.getName());

Path localDir = new Path(workDir, "local");
localFs.mkdirs(localDir);
conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDir.toString());

FileSystem fs = FileSystem.getLocal(conf);
LocalDirAllocator localDirAllocator = new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
InputContext inputContext = createMockInputContext(UUID.randomUUID().toString());
ExceptionReporter exceptionReporter = mock(ExceptionReporter.class);

// initialMemoryAvailable=2000000: memoryLimit=2000000, mergeThreshold=1800000, maxSingleShuffleLimit=500000
MergeManager mergeManager = new MergeManager(conf, fs, localDirAllocator, inputContext,
null, null, null, null, exceptionReporter, 2000000, null, false, -1);
mergeManager.configureAndStart();

// Generate valid IFile data (~490000 bytes, below maxSingleShuffleLimit=500000)
InputAttemptIdentifier id1 = new InputAttemptIdentifier(0, 0);
InputAttemptIdentifier id2 = new InputAttemptIdentifier(1, 0);
InputAttemptIdentifier id3 = new InputAttemptIdentifier(2, 0);
InputAttemptIdentifier id4 = new InputAttemptIdentifier(3, 0);
InputAttemptIdentifier id5 = new InputAttemptIdentifier(4, 0);
byte[] data1 = generateDataBySize(conf, 490000, id1);
byte[] data2 = generateDataBySize(conf, 490000, id2);

// Reserve and commit 2 outputs: commitMemory ~= 980000 < mergeThreshold=1800000 (no auto-merge)
MapOutput mo1 = mergeManager.reserve(id1, data1.length, data1.length, 0);
MapOutput mo2 = mergeManager.reserve(id2, data2.length, data2.length, 0);
assertEquals(MapOutput.Type.MEMORY, mo1.getType());
assertEquals(MapOutput.Type.MEMORY, mo2.getType());
System.arraycopy(data1, 0, mo1.getMemory(), 0, data1.length);
System.arraycopy(data2, 0, mo2.getMemory(), 0, data2.length);
mo1.commit();
mo2.commit();

// Reserve 3 more outputs without committing to push usedMemory > memoryLimit=2000000
MapOutput mo3 = mergeManager.reserve(id3, data1.length, data1.length, 0);
MapOutput mo4 = mergeManager.reserve(id4, data1.length, data1.length, 0);
MapOutput mo5 = mergeManager.reserve(id5, data1.length, data1.length, 0);
assertEquals(MapOutput.Type.MEMORY, mo3.getType());
assertEquals(MapOutput.Type.MEMORY, mo4.getType());
assertEquals(MapOutput.Type.MEMORY, mo5.getType());

// Verify deadlock precondition: usedMemory > memoryLimit, commitMemory < mergeThreshold
assertTrue("usedMemory should exceed memoryLimit", mergeManager.getUsedMemory() > 2000000L);
assertTrue("commitMemory should be below mergeThreshold", mergeManager.getCommitMemory() < 1800000L);

// waitForShuffleToMergeMemory() should complete without deadlock:
// the fix forces a merge when memory is exhausted but commitMemory < mergeThreshold
Thread waitThread = new Thread(() -> {
try {
mergeManager.waitForShuffleToMergeMemory();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
waitThread.start();
waitThread.join(15000);
assertFalse("waitForShuffleToMergeMemory deadlocked", waitThread.isAlive());

if (waitThread.isAlive()) {
waitThread.interrupt();
}
mo3.abort();
mo4.abort();
mo5.abort();
mergeManager.close(true);
}

@Test
public void testDiskMergeWithCodec() throws Throwable {
Configuration conf = new TezConfiguration(defaultConf);
Expand Down
Loading