Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,11 @@ ConditionalTabletMutator requireSame(TabletMetadata tabletMetadata, ColumnType t
*/
ConditionalTabletMutator requireFiles(Set<StoredTabletFile> files);

/*
* Require that a tablet contain all the files in the set with the exact same DataFileValue
*/
ConditionalTabletMutator requireFiles(Map<StoredTabletFile,DataFileValue> files);

/**
* Require that a tablet have less than or equals the specified number of files.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,29 +32,54 @@ public class DataFileValue {
private final long size;
private final long numEntries;
private long time = -1;
private boolean shared;

public DataFileValue(long size, long numEntries, long time, boolean shared) {
this.size = size;
this.numEntries = numEntries;
this.time = time;
this.shared = shared;
}

public DataFileValue(long size, long numEntries, boolean shared) {
this.size = size;
this.numEntries = numEntries;
this.time = -1;
this.shared = shared;
}

public DataFileValue(long size, long numEntries, long time) {
this.size = size;
this.numEntries = numEntries;
this.time = time;
this.shared = false;
}

public DataFileValue(long size, long numEntries) {
this.size = size;
this.numEntries = numEntries;
this.time = -1;
this.shared = false;
}

public DataFileValue(String encodedDFV) {
String[] ba = encodedDFV.split(",");

size = Long.parseLong(ba[0]);
numEntries = Long.parseLong(ba[1]);
time = -1;
shared = false;

if (ba.length == 3) {
time = Long.parseLong(ba[2]);
} else {
time = -1;
// Could be old format with time, or new format with shared

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should always encode the time. If there are 3 parts, then the DataFileValue was written before this code and shared is false. If there are 4 parts, then it was written with the new code.

try {
time = Long.parseLong(ba[2]);
} catch (NumberFormatException e) {
shared = Boolean.parseBoolean(ba[2]);
}
} else if (ba.length == 4) {
shared = Boolean.parseBoolean(ba[2]);
time = Long.parseLong(ba[3]);
}
}

Expand All @@ -78,15 +103,19 @@ public long getTime() {
return time;
}

public boolean isShared() {
return shared;
}

public byte[] encode() {
return encodeAsString().getBytes(UTF_8);
}

public String encodeAsString() {
if (time >= 0) {
return ("" + size + "," + numEntries + "," + time);
return ("" + size + "," + numEntries + "," + shared + "," + time);
}
return ("" + size + "," + numEntries);
return ("" + size + "," + numEntries + "," + shared);
}

public Value encodeAsValue() {
Expand All @@ -97,20 +126,21 @@ public Value encodeAsValue() {
public boolean equals(Object o) {
if (o instanceof DataFileValue odfv) {

return size == odfv.size && numEntries == odfv.numEntries && time == odfv.time;
return size == odfv.size && numEntries == odfv.numEntries && time == odfv.time
&& shared == odfv.shared;
}

return false;
}

@Override
public int hashCode() {
return Long.valueOf(size + numEntries).hashCode();
return Long.valueOf(size + numEntries + (shared ? 1 : 0)).hashCode();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to change this to Objects.hash(size, numEntries, shared)

}

@Override
public String toString() {
return size + " " + numEntries;
return size + " " + numEntries + " " + shared;
}

public void setTime(long time) {
Expand All @@ -120,6 +150,10 @@ public void setTime(long time) {
this.time = time;
}

public void setShared(boolean shared) {
this.shared = shared;
}

/**
* @return true if {@link #wrapFileIterator} would wrap a given iterator, false otherwise.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.encodePrevEndRow;

import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiConsumer;
Expand All @@ -52,6 +53,7 @@
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.Ample.ConditionalTabletMutator;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CompactedColumnFamily;
Expand Down Expand Up @@ -363,6 +365,17 @@ public ConditionalTabletMutator requireFiles(Set<StoredTabletFile> files) {
return this;
}

@Override
public ConditionalTabletMutator requireFiles(Map<StoredTabletFile,DataFileValue> files) {
Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate.");
Condition c = SetEncodingIterator.createConditionWithVal(files.entrySet(),
entry -> new Pair<>(entry.getKey().getMetadata().getBytes(UTF_8),
entry.getValue().encode()),
DataFileColumnFamily.NAME);
mutation.addCondition(c);
return this;
}

@Override
public ConditionalTabletMutator requireLessOrEqualsFiles(long limit) {
Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.SortedMap;
import java.util.TreeMap;
Expand Down Expand Up @@ -179,6 +180,48 @@ public static void deleteTable(TableId tableId, boolean insertDeletes, ServerCon
return new Pair<>(result, sizes);
}

private static void markTabletFilesAsShared(ServerContext ctx, TabletMetadata srcTablet) {
if (srcTablet.getFiles().isEmpty()) {
return;
}

Map<StoredTabletFile,DataFileValue> currentFilesMap = srcTablet.getFilesMap();

// Skip if all files are already shared
boolean anyNonShared = currentFilesMap.values().stream().anyMatch(dfv -> !dfv.isShared());
if (!anyNonShared) {
return;
}

try (var conditionalMutator = ctx.getAmple().conditionallyMutateTablets()) {
var tabletMutator = conditionalMutator.mutateTablet(srcTablet.getExtent())
.requireAbsentOperation().requireFiles(currentFilesMap);

// Write updated DataFileValues with shared=true for any non-shared files
for (var entry : currentFilesMap.entrySet()) {
StoredTabletFile file = entry.getKey();
DataFileValue dfv = entry.getValue();
if (!dfv.isShared()) {
DataFileValue sharedDfv = new DataFileValue(dfv.getSize(), dfv.getNumEntries(),
dfv.isTimeSet() ? dfv.getTime() : -1, true);
tabletMutator.putFile(file, sharedDfv);
log.debug("Marking file {} as shared in source tablet {} (conditional)",
file.getFileName(), srcTablet.getExtent());
}
}

tabletMutator.submit(tm -> false, () -> "mark source files as shared for clone");

var result = conditionalMutator.process().get(srcTablet.getExtent());
if (result.getStatus() != Ample.ConditionalResult.Status.ACCEPTED) {
log.debug(
"Conditional mutation to mark files as shared was rejected for tablet {}, "
+ "tablet changed concurrently, clone will retry for this tablet",
srcTablet.getExtent());
Comment on lines +218 to +220

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be better to return a boolean here for success/fail (see comment below). Could probably move the debug log statement down to the calling location too.

}
}
}

private static Mutation createCloneMutation(TableId srcTableId, TableId tableId,
Iterable<Entry<Key,Value>> tablet) {

Expand All @@ -191,7 +234,12 @@ private static Mutation createCloneMutation(TableId srcTableId, TableId tableId,
if (!cf.startsWith("../") && !cf.contains(":")) {
cf = "../" + srcTableId + entry.getKey().getColumnQualifier();
}
m.put(entry.getKey().getColumnFamily(), new Text(cf), entry.getValue());

DataFileValue ogVal = new DataFileValue(entry.getValue().get());
DataFileValue newSharedVal = new DataFileValue(ogVal.getSize(), ogVal.getNumEntries(),
ogVal.isTimeSet() ? ogVal.getTime() : -1, true);

m.put(entry.getKey().getColumnFamily(), new Text(cf), newSharedVal.encodeAsValue());
} else if (entry.getKey().getColumnFamily().equals(CurrentLocationColumnFamily.NAME)) {
m.put(LastLocationColumnFamily.NAME, entry.getKey().getColumnQualifier(), entry.getValue());
} else if (entry.getKey().getColumnFamily().equals(LastLocationColumnFamily.NAME)) {
Expand Down Expand Up @@ -350,6 +398,11 @@ public static void cloneTable(ServerContext context, TableId srcTableId, TableId
while (true) {

try {
try (TabletsMetadata tabletsMetadata = createCloneScanner(null, srcTableId, context)) {
for (TabletMetadata tablet : tabletsMetadata) {
markTabletFilesAsShared(context, tablet);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a failure here, then you probably want a continue to restart the loop. I don't think you want to continue on a failure.

}
}
initializeClone(null, srcTableId, tableId, context, bw);

// the following loop looks changes in the file that occurred during the copy.. if files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.apache.accumulo.core.util.LazySingletons.GSON;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -70,6 +71,10 @@ public CommitCompaction(CompactionCommitData commitData, String newDatafile) {
this.newDatafile = newDatafile;
}

private record CompactionFileResult(TabletMetadata tabletMetadata,
ArrayList<StoredTabletFile> filesToDeleteViaGc) {
}

@Override
public Repo<FateEnv> call(FateId fateId, FateEnv env) throws Exception {
var ecid = ExternalCompactionId.of(commitData.ecid);
Expand All @@ -82,25 +87,38 @@ public Repo<FateEnv> call(FateId fateId, FateEnv env) throws Exception {
// process died and now its running again. In this case commit should do nothing, but its
// important to still carry on with the rest of the steps after commit. This code ignores a that
// fact that a commit may not have happened in the current call and continues for this reason.
TabletMetadata tabletMetadata = commitCompaction(env.getContext(), ecid, newFile);
CompactionFileResult fileResult = commitCompaction(env.getContext(), ecid, newFile);

String loc = null;
if (tabletMetadata != null && tabletMetadata.getLocation() != null) {
loc = tabletMetadata.getLocation().getHostPortSession();
if (fileResult != null && fileResult.tabletMetadata.getLocation() != null) {
loc = fileResult.tabletMetadata.getLocation().getHostPortSession();
}

// This will causes the tablet to be reexamined to see if it needs any more compactions.
var extent = KeyExtent.fromThrift(commitData.textent);
env.getEventPublisher().event(extent, "Compaction completed %s", extent);

return new PutGcCandidates(commitData, loc);
return new PutGcCandidates(commitData, loc, fileResult.filesToDeleteViaGc());
}

KeyExtent getExtent() {
return KeyExtent.fromThrift(commitData.textent);
}

private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId ecid,
private ArrayList<StoredTabletFile> computeNonSharedFiles(TabletMetadata tablet,
CompactionMetadata ecm) {
ArrayList<StoredTabletFile> nonSharedFiles = new ArrayList<>();
var tabletFilesMap = tablet.getFilesMap();
for (StoredTabletFile file : ecm.getJobFiles()) {
DataFileValue dfv = tabletFilesMap.get(file);
if (dfv == null || !dfv.isShared()) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think dfv should not be null. If it is, that seems like a larger issue. I don't think we should delete these files.

nonSharedFiles.add(file);
}
}
return nonSharedFiles;
}

private CompactionFileResult commitCompaction(ServerContext ctx, ExternalCompactionId ecid,
Optional<ReferencedTabletFile> newDatafile) {

var tablet =
Expand All @@ -110,6 +128,8 @@ private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId
.incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(10)).backOffFactor(1.5)
.logInterval(Duration.ofMinutes(3)).createRetry();

ArrayList<StoredTabletFile> filesToDeleteViaGc = new ArrayList<>();

while (canCommitCompaction(ecid, tablet)) {
CompactionMetadata ecm = tablet.getExternalCompactions().get(ecid);

Expand All @@ -122,7 +142,7 @@ private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId
try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) {
var tabletMutator = tabletsMutator.mutateTablet(getExtent()).requireAbsentOperation()
.requireCompaction(ecid).requireSame(tablet, LOCATION)
.requireFiles(commitData.getJobFiles());
.requireFiles(tablet.getFilesMap());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code before this change was validating that the files that are part of the compaction are part of the tablet. I think that should always be true. This change is checking that the tablets files are the same as when this method started. This might not be true as a compaction could have occurred adding a new file.


if (ecm.getKind() == CompactionKind.USER) {
tabletMutator.requireSame(tablet, SELECTED, COMPACTED);
Expand All @@ -143,6 +163,7 @@ private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId

var result = tabletsMutator.process().get(getExtent());
if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) {
filesToDeleteViaGc = computeNonSharedFiles(tablet, ecm);
// Compaction was successfully committed to the tablet so log it
TabletLogger.compacted(getExtent(), ecid, commitData.kind, commitData.getJobFiles(),
newDatafile);
Expand All @@ -160,7 +181,7 @@ private TabletMetadata commitCompaction(ServerContext ctx, ExternalCompactionId
}
}

return tablet;
return new CompactionFileResult(tablet, filesToDeleteViaGc);
}

private void updateTabletForCompaction(TCompactionStats stats, ExternalCompactionId ecid,
Expand Down Expand Up @@ -216,11 +237,14 @@ private void updateTabletForCompaction(TCompactionStats stats, ExternalCompactio
ecm.getJobFiles().forEach(tabletMutator::putScan);
}
ecm.getJobFiles().forEach(tabletMutator::deleteFile);

tabletMutator.deleteExternalCompaction(ecid);

if (newDatafile.isPresent()) {
tabletMutator.putFile(newDatafile.orElseThrow(),
new DataFileValue(stats.getFileSize(), stats.getEntriesWritten()));
// Mark new compaction files as not shared.
DataFileValue newFileValue =
new DataFileValue(stats.getFileSize(), stats.getEntriesWritten(), false);
tabletMutator.putFile(newDatafile.orElseThrow(), newFileValue);
}
}

Expand Down
Loading
Loading