-
Notifications
You must be signed in to change notification settings - Fork 489
Compaction deletes non-shared files #6060
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
e6a82f8
f941721
a26e1c1
4657475
23a3a9a
aae1b46
45ccf4d
4e5247b
27e6fd8
0a90f36
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,29 +32,54 @@ public class DataFileValue { | |
| private final long size; | ||
| private final long numEntries; | ||
| private long time = -1; | ||
| private boolean shared; | ||
|
|
||
| public DataFileValue(long size, long numEntries, long time, boolean shared) { | ||
| this.size = size; | ||
| this.numEntries = numEntries; | ||
| this.time = time; | ||
| this.shared = shared; | ||
| } | ||
|
|
||
| public DataFileValue(long size, long numEntries, boolean shared) { | ||
| this.size = size; | ||
| this.numEntries = numEntries; | ||
| this.time = -1; | ||
| this.shared = shared; | ||
| } | ||
|
|
||
| public DataFileValue(long size, long numEntries, long time) { | ||
| this.size = size; | ||
| this.numEntries = numEntries; | ||
| this.time = time; | ||
| this.shared = false; | ||
| } | ||
|
|
||
| public DataFileValue(long size, long numEntries) { | ||
| this.size = size; | ||
| this.numEntries = numEntries; | ||
| this.time = -1; | ||
| this.shared = false; | ||
| } | ||
|
|
||
| public DataFileValue(String encodedDFV) { | ||
| String[] ba = encodedDFV.split(","); | ||
|
|
||
| size = Long.parseLong(ba[0]); | ||
| numEntries = Long.parseLong(ba[1]); | ||
| time = -1; | ||
| shared = false; | ||
|
|
||
| if (ba.length == 3) { | ||
| time = Long.parseLong(ba[2]); | ||
| } else { | ||
| time = -1; | ||
| // Could be old format with time, or new format with shared | ||
| try { | ||
| time = Long.parseLong(ba[2]); | ||
| } catch (NumberFormatException e) { | ||
| shared = Boolean.parseBoolean(ba[2]); | ||
| } | ||
| } else if (ba.length == 4) { | ||
| shared = Boolean.parseBoolean(ba[2]); | ||
| time = Long.parseLong(ba[3]); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -78,15 +103,19 @@ public long getTime() { | |
| return time; | ||
| } | ||
|
|
||
| public boolean isShared() { | ||
| return shared; | ||
| } | ||
|
|
||
| public byte[] encode() { | ||
| return encodeAsString().getBytes(UTF_8); | ||
| } | ||
|
|
||
| public String encodeAsString() { | ||
| if (time >= 0) { | ||
| return ("" + size + "," + numEntries + "," + time); | ||
| return ("" + size + "," + numEntries + "," + shared + "," + time); | ||
| } | ||
| return ("" + size + "," + numEntries); | ||
| return ("" + size + "," + numEntries + "," + shared); | ||
| } | ||
|
|
||
| public Value encodeAsValue() { | ||
|
|
@@ -97,20 +126,21 @@ public Value encodeAsValue() { | |
| public boolean equals(Object o) { | ||
| if (o instanceof DataFileValue odfv) { | ||
|
|
||
| return size == odfv.size && numEntries == odfv.numEntries && time == odfv.time; | ||
| return size == odfv.size && numEntries == odfv.numEntries && time == odfv.time | ||
| && shared == odfv.shared; | ||
| } | ||
|
|
||
| return false; | ||
| } | ||
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return Long.valueOf(size + numEntries).hashCode(); | ||
| return Long.valueOf(size + numEntries + (shared ? 1 : 0)).hashCode(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We might want to change this to |
||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return size + " " + numEntries; | ||
| return size + " " + numEntries + " " + shared; | ||
| } | ||
|
|
||
| public void setTime(long time) { | ||
|
|
@@ -120,6 +150,10 @@ public void setTime(long time) { | |
| this.time = time; | ||
| } | ||
|
|
||
| public void setShared(boolean shared) { | ||
| this.shared = shared; | ||
| } | ||
|
|
||
| /** | ||
| * @return true if {@link #wrapFileIterator} would wrap a given iterator, false otherwise. | ||
| */ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -34,6 +34,7 @@ | |
| import java.util.HashSet; | ||
| import java.util.Iterator; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Map.Entry; | ||
| import java.util.SortedMap; | ||
| import java.util.TreeMap; | ||
|
|
@@ -179,6 +180,48 @@ public static void deleteTable(TableId tableId, boolean insertDeletes, ServerCon | |
| return new Pair<>(result, sizes); | ||
| } | ||
|
|
||
| private static void markTabletFilesAsShared(ServerContext ctx, TabletMetadata srcTablet) { | ||
| if (srcTablet.getFiles().isEmpty()) { | ||
| return; | ||
| } | ||
|
|
||
| Map<StoredTabletFile,DataFileValue> currentFilesMap = srcTablet.getFilesMap(); | ||
|
|
||
| // Skip if all files are already shared | ||
| boolean anyNonShared = currentFilesMap.values().stream().anyMatch(dfv -> !dfv.isShared()); | ||
| if (!anyNonShared) { | ||
| return; | ||
| } | ||
|
|
||
| try (var conditionalMutator = ctx.getAmple().conditionallyMutateTablets()) { | ||
| var tabletMutator = conditionalMutator.mutateTablet(srcTablet.getExtent()) | ||
| .requireAbsentOperation().requireFiles(currentFilesMap); | ||
|
|
||
| // Write updated DataFileValues with shared=true for any non-shared files | ||
| for (var entry : currentFilesMap.entrySet()) { | ||
| StoredTabletFile file = entry.getKey(); | ||
| DataFileValue dfv = entry.getValue(); | ||
| if (!dfv.isShared()) { | ||
| DataFileValue sharedDfv = new DataFileValue(dfv.getSize(), dfv.getNumEntries(), | ||
| dfv.isTimeSet() ? dfv.getTime() : -1, true); | ||
| tabletMutator.putFile(file, sharedDfv); | ||
| log.debug("Marking file {} as shared in source tablet {} (conditional)", | ||
| file.getFileName(), srcTablet.getExtent()); | ||
| } | ||
| } | ||
|
|
||
| tabletMutator.submit(tm -> false, () -> "mark source files as shared for clone"); | ||
|
|
||
| var result = conditionalMutator.process().get(srcTablet.getExtent()); | ||
| if (result.getStatus() != Ample.ConditionalResult.Status.ACCEPTED) { | ||
| log.debug( | ||
| "Conditional mutation to mark files as shared was rejected for tablet {}, " | ||
| + "tablet changed concurrently, clone will retry for this tablet", | ||
| srcTablet.getExtent()); | ||
|
Comment on lines
+218
to
+220
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It might be better to return a boolean here for success/fail (see comment below). Could probably move the debug log statement down to the calling location too. |
||
| } | ||
| } | ||
| } | ||
|
|
||
| private static Mutation createCloneMutation(TableId srcTableId, TableId tableId, | ||
| Iterable<Entry<Key,Value>> tablet) { | ||
|
|
||
|
|
@@ -191,7 +234,12 @@ private static Mutation createCloneMutation(TableId srcTableId, TableId tableId, | |
| if (!cf.startsWith("../") && !cf.contains(":")) { | ||
| cf = "../" + srcTableId + entry.getKey().getColumnQualifier(); | ||
| } | ||
| m.put(entry.getKey().getColumnFamily(), new Text(cf), entry.getValue()); | ||
|
|
||
| DataFileValue ogVal = new DataFileValue(entry.getValue().get()); | ||
| DataFileValue newSharedVal = new DataFileValue(ogVal.getSize(), ogVal.getNumEntries(), | ||
| ogVal.isTimeSet() ? ogVal.getTime() : -1, true); | ||
|
|
||
| m.put(entry.getKey().getColumnFamily(), new Text(cf), newSharedVal.encodeAsValue()); | ||
| } else if (entry.getKey().getColumnFamily().equals(CurrentLocationColumnFamily.NAME)) { | ||
| m.put(LastLocationColumnFamily.NAME, entry.getKey().getColumnQualifier(), entry.getValue()); | ||
| } else if (entry.getKey().getColumnFamily().equals(LastLocationColumnFamily.NAME)) { | ||
|
|
@@ -350,6 +398,11 @@ public static void cloneTable(ServerContext context, TableId srcTableId, TableId | |
| while (true) { | ||
|
|
||
| try { | ||
| try (TabletsMetadata tabletsMetadata = createCloneScanner(null, srcTableId, context)) { | ||
| for (TabletMetadata tablet : tabletsMetadata) { | ||
| markTabletFilesAsShared(context, tablet); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If there is a failure here, then you probably want a |
||
| } | ||
| } | ||
| initializeClone(null, srcTableId, tableId, context, bw); | ||
|
|
||
| // the following loop looks changes in the file that occurred during the copy.. if files | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,6 +27,7 @@ | |
| import static org.apache.accumulo.core.util.LazySingletons.GSON; | ||
|
|
||
| import java.time.Duration; | ||
| import java.util.ArrayList; | ||
| import java.util.HashSet; | ||
| import java.util.Optional; | ||
| import java.util.Set; | ||
|
|
@@ -70,6 +71,10 @@ public CommitCompaction(CompactionCommitData commitData, String newDatafile) { | |
| this.newDatafile = newDatafile; | ||
| } | ||
|
|
||
| private record CompactionFileResult(TabletMetadata tabletMetadata, | ||
| ArrayList<StoredTabletFile> filesToDeleteViaGc) { | ||
| } | ||
|
|
||
| @Override | ||
| public Repo<FateEnv> call(FateId fateId, FateEnv env) throws Exception { | ||
| var ecid = ExternalCompactionId.of(commitData.ecid); | ||
|
|
@@ -82,25 +87,38 @@ public Repo<FateEnv> call(FateId fateId, FateEnv env) throws Exception { | |
| // process died and now its running again. In this case commit should do nothing, but its | ||
| // important to still carry on with the rest of the steps after commit. This code ignores a that | ||
| // fact that a commit may not have happened in the current call and continues for this reason. | ||
| TabletMetadata tabletMetadata = commitCompaction(env.getContext(), ecid, newFile); | ||
| CompactionFileResult fileResult = commitCompaction(env.getContext(), ecid, newFile); | ||
|
|
||
| String loc = null; | ||
| if (tabletMetadata != null && tabletMetadata.getLocation() != null) { | ||
| loc = tabletMetadata.getLocation().getHostPortSession(); | ||
| if (fileResult != null && fileResult.tabletMetadata.getLocation() != null) { | ||
| loc = fileResult.tabletMetadata.getLocation().getHostPortSession(); | ||
| } | ||
|
|
||
| // This will causes the tablet to be reexamined to see if it needs any more compactions. | ||
| var extent = KeyExtent.fromThrift(commitData.textent); | ||
| env.getEventPublisher().event(extent, "Compaction completed %s", extent); | ||
|
|
||
| return new PutGcCandidates(commitData, loc); | ||
| return new PutGcCandidates(commitData, loc, fileResult.filesToDeleteViaGc()); | ||
| } | ||
|
|
||
| KeyExtent getExtent() { | ||
| return KeyExtent.fromThrift(commitData.textent); | ||
| } | ||
|
|
||
| private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId ecid, | ||
| private ArrayList<StoredTabletFile> computeNonSharedFiles(TabletMetadata tablet, | ||
| CompactionMetadata ecm) { | ||
| ArrayList<StoredTabletFile> nonSharedFiles = new ArrayList<>(); | ||
| var tabletFilesMap = tablet.getFilesMap(); | ||
| for (StoredTabletFile file : ecm.getJobFiles()) { | ||
| DataFileValue dfv = tabletFilesMap.get(file); | ||
| if (dfv == null || !dfv.isShared()) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think dfv should not be null. If it is, that seems like a larger issue. I don't think we should delete these files. |
||
| nonSharedFiles.add(file); | ||
| } | ||
| } | ||
| return nonSharedFiles; | ||
| } | ||
|
|
||
| private CompactionFileResult commitCompaction(ServerContext ctx, ExternalCompactionId ecid, | ||
| Optional<ReferencedTabletFile> newDatafile) { | ||
|
|
||
| var tablet = | ||
|
|
@@ -110,6 +128,8 @@ private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId | |
| .incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(10)).backOffFactor(1.5) | ||
| .logInterval(Duration.ofMinutes(3)).createRetry(); | ||
|
|
||
| ArrayList<StoredTabletFile> filesToDeleteViaGc = new ArrayList<>(); | ||
|
|
||
| while (canCommitCompaction(ecid, tablet)) { | ||
| CompactionMetadata ecm = tablet.getExternalCompactions().get(ecid); | ||
|
|
||
|
|
@@ -122,7 +142,7 @@ private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId | |
| try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) { | ||
| var tabletMutator = tabletsMutator.mutateTablet(getExtent()).requireAbsentOperation() | ||
| .requireCompaction(ecid).requireSame(tablet, LOCATION) | ||
| .requireFiles(commitData.getJobFiles()); | ||
| .requireFiles(tablet.getFilesMap()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The code before this change was validating that the files that are part of the compaction are part of the tablet. I think that should always be true. This change is checking that the tablets files are the same as when this method started. This might not be true as a compaction could have occurred adding a new file. |
||
|
|
||
| if (ecm.getKind() == CompactionKind.USER) { | ||
| tabletMutator.requireSame(tablet, SELECTED, COMPACTED); | ||
|
|
@@ -143,6 +163,7 @@ private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId | |
|
|
||
| var result = tabletsMutator.process().get(getExtent()); | ||
| if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) { | ||
| filesToDeleteViaGc = computeNonSharedFiles(tablet, ecm); | ||
| // Compaction was successfully committed to the tablet so log it | ||
| TabletLogger.compacted(getExtent(), ecid, commitData.kind, commitData.getJobFiles(), | ||
| newDatafile); | ||
|
|
@@ -160,7 +181,7 @@ private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId | |
| } | ||
| } | ||
|
|
||
| return tablet; | ||
| return new CompactionFileResult(tablet, filesToDeleteViaGc); | ||
| } | ||
|
|
||
| private void updateTabletForCompaction(TCompactionStats stats, ExternalCompactionId ecid, | ||
|
|
@@ -216,11 +237,14 @@ private void updateTabletForCompaction(TCompactionStats stats, ExternalCompactio | |
| ecm.getJobFiles().forEach(tabletMutator::putScan); | ||
| } | ||
| ecm.getJobFiles().forEach(tabletMutator::deleteFile); | ||
|
|
||
| tabletMutator.deleteExternalCompaction(ecid); | ||
|
|
||
| if (newDatafile.isPresent()) { | ||
| tabletMutator.putFile(newDatafile.orElseThrow(), | ||
| new DataFileValue(stats.getFileSize(), stats.getEntriesWritten())); | ||
| // Mark new compaction files as not shared. | ||
| DataFileValue newFileValue = | ||
| new DataFileValue(stats.getFileSize(), stats.getEntriesWritten(), false); | ||
| tabletMutator.putFile(newDatafile.orElseThrow(), newFileValue); | ||
| } | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should always encode the time. If there are 3 parts, then the DataFileValue was written before this code and shared is false. If there are 4 parts, then it was written with the new code.