Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.hdds.scm.AddSCMRequest;
import org.apache.hadoop.hdds.scm.RemoveSCMRequest;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.ha.invoker.ScmInvoker;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.security.SecurityConfig;
import org.apache.hadoop.ozone.OzoneConsts;
Expand Down Expand Up @@ -223,6 +224,9 @@ public SCMStateMachine getSCMStateMachine() {
@Override
public void registerStateMachineHandler(final RequestType handlerType,
final Object handler) {
if (handler instanceof ScmInvoker) {
stateMachine.registerInvoker(handlerType, (ScmInvoker) handler);
Comment on lines +227 to +228
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this method is called with ScmInvoker, rather invoker.getImpl().

default <T extends SCMHandler> T getProxyHandler(ScmInvoker<T> invoker) {
registerStateMachineHandler(invoker.getType(), invoker.getImpl());

[main] INFO ha.SCMRatisServerImpl: registerStateMachineHandler(FINALIZE, org.apache.hadoop.hdds.scm.server.upgrade.FinalizationStateManagerImpl@273a5a8a): invoker=false
[main] INFO ha.SCMRatisServerImpl: registerStateMachineHandler(SEQUENCE_ID, org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator$StateManagerImpl@76544c0a): invoker=false
[main] INFO ha.SCMRatisServerImpl: registerStateMachineHandler(PIPELINE, org.apache.hadoop.hdds.scm.pipeline.PipelineStateManagerImpl@57a6a933): invoker=false
[main] INFO ha.SCMRatisServerImpl: registerStateMachineHandler(CONTAINER, org.apache.hadoop.hdds.scm.container.ContainerStateManagerImpl@31f295b6): invoker=false
[main] INFO ha.SCMRatisServerImpl: registerStateMachineHandler(BLOCK, org.apache.hadoop.hdds.scm.block.DeletedBlockLogStateManagerImpl@423a0e1d): invoker=false
[main] INFO ha.SCMRatisServerImpl: registerStateMachineHandler(STATEFUL_SERVICE_CONFIG, org.apache.hadoop.hdds.scm.ha.StatefulServiceStateManagerImpl@1c240cf2): invoker=false

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right -- we need to pass invoker but not getImpl().

registerStateMachineHandler(invoker.getType(), invoker);

}
stateMachine.registerHandler(handlerType, handler);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add else.

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImpl;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
import org.apache.hadoop.hdds.scm.ha.invoker.ScmInvoker;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey;
import org.apache.hadoop.hdds.utils.TransactionInfo;
Expand Down Expand Up @@ -77,6 +78,7 @@ public class SCMStateMachine extends BaseStateMachine {

private StorageContainerManager scm;
private Map<RequestType, Object> handlers;
private Map<RequestType, ScmInvoker<?>> invokers;
private SCMHADBTransactionBuffer transactionBuffer;
private final SimpleStateMachineStorage storage =
new SimpleStateMachineStorage();
Expand All @@ -93,6 +95,7 @@ public SCMStateMachine(final StorageContainerManager scm,
SCMHADBTransactionBuffer buffer) {
this.scm = scm;
this.handlers = new EnumMap<>(RequestType.class);
this.invokers = new EnumMap<>(RequestType.class);
this.transactionBuffer = buffer;
TransactionInfo latestTrxInfo = this.transactionBuffer.getLatestTrxInfo();
if (!latestTrxInfo.isDefault()) {
Expand All @@ -117,6 +120,11 @@ public void registerHandler(RequestType type, Object handler) {
handlers.put(type, handler);
}

public void registerInvoker(RequestType type,
ScmInvoker<?> invoker) {
invokers.put(type, invoker);
}

@Override
public SnapshotInfo getLatestSnapshot() {
// Transaction buffer will be null during scm initlialization phase
Expand Down Expand Up @@ -179,6 +187,10 @@ public CompletableFuture<Message> applyTransaction(
}

private Message process(final SCMRatisRequest request) throws Exception {
final ScmInvoker<?> invoker = invokers.get(request.getType());
if (invoker != null) {
return invoker.invokeLocal(request.getOperation(), request.getArguments());
}
return process(request, handlers.get(request.getType()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DeletedBlocksTransactionSummary;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.scm.block.DeletedBlockLogStateManager;
import org.apache.hadoop.hdds.scm.ha.SCMRatisResponse;
import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.ratis.protocol.Message;

/** Code generated for {@link DeletedBlockLogStateManager}. Do not modify. */
public class DeletedBlockLogStateManagerInvoker extends ScmInvoker<DeletedBlockLogStateManager> {
Expand Down Expand Up @@ -106,35 +108,41 @@ public void removeTransactionsFromDB(ArrayList arg0, DeletedBlocksTransactionSum

@SuppressWarnings("unchecked")
@Override
public Object invokeLocal(String methodName, Object[] p) throws Exception {
public Message invokeLocal(String methodName, Object[] p) throws Exception {
final Class<?> returnType;
final Object returnValue;
switch (methodName) {
case "addTransactionsToDB":
final ArrayList arg0 = p.length > 0 ? (ArrayList) p[0] : null;
final DeletedBlocksTransactionSummary arg1 = p.length > 1 ? (DeletedBlocksTransactionSummary) p[1] : null;
getImpl().addTransactionsToDB(arg0, arg1);
return null;
return Message.EMPTY;

case "getReadOnlyIterator":
return getImpl().getReadOnlyIterator();
returnType = Table.KeyValueIterator.class;
returnValue = getImpl().getReadOnlyIterator();
break;

case "onFlush":
getImpl().onFlush();
return null;
return Message.EMPTY;

case "reinitialize":
final Table arg2 = p.length > 0 ? (Table) p[0] : null;
final Table arg3 = p.length > 1 ? (Table) p[1] : null;
getImpl().reinitialize(arg2, arg3);
return null;
return Message.EMPTY;

case "removeTransactionsFromDB":
final ArrayList arg4 = p.length > 0 ? (ArrayList) p[0] : null;
final DeletedBlocksTransactionSummary arg5 = p.length > 1 ? (DeletedBlocksTransactionSummary) p[1] : null;
getImpl().removeTransactionsFromDB(arg4, arg5);
return null;
return Message.EMPTY;

default:
throw new IllegalArgumentException("Method not found: " + methodName + " in DeletedBlockLogStateManager");
}

return SCMRatisResponse.encode(returnValue, returnType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.hdds.scm.ha.SCMRatisRequest;
import org.apache.hadoop.hdds.scm.ha.SCMRatisResponse;
import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;
import org.apache.ratis.protocol.Message;

/**
* Invokes methods without using reflection.
Expand Down Expand Up @@ -55,8 +56,12 @@ public final T getProxy() {
return proxy;
}

public final Object invoke(String methodName, Object[] args) throws Exception {
return invokeLocal(methodName, args);
}

/** For non-@Replicate methods. */
abstract Object invokeLocal(String methodName, Object[] args) throws Exception;
public abstract Message invokeLocal(String methodName, Object[] args) throws Exception;

/** For @Replicate DIRECT methods. */
final Object invokeReplicateDirect(NameAndParameterTypes method, Object[] args) throws SCMException {
Expand Down