Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.hdds.scm.AddSCMRequest;
import org.apache.hadoop.hdds.scm.RemoveSCMRequest;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.ha.invoker.ScmInvoker;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.security.SecurityConfig;
import org.apache.hadoop.ozone.OzoneConsts;
Expand Down Expand Up @@ -223,6 +224,9 @@ public SCMStateMachine getSCMStateMachine() {
@Override
public void registerStateMachineHandler(final RequestType handlerType,
final Object handler) {
if (handler instanceof ScmInvoker) {
stateMachine.registerInvoker(handlerType, (ScmInvoker) handler);
Comment on lines +227 to +228
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this method is called with ScmInvoker, rather invoker.getImpl().

default <T extends SCMHandler> T getProxyHandler(ScmInvoker<T> invoker) {
registerStateMachineHandler(invoker.getType(), invoker.getImpl());

[main] INFO ha.SCMRatisServerImpl: registerStateMachineHandler(FINALIZE, org.apache.hadoop.hdds.scm.server.upgrade.FinalizationStateManagerImpl@273a5a8a): invoker=false
[main] INFO ha.SCMRatisServerImpl: registerStateMachineHandler(SEQUENCE_ID, org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator$StateManagerImpl@76544c0a): invoker=false
[main] INFO ha.SCMRatisServerImpl: registerStateMachineHandler(PIPELINE, org.apache.hadoop.hdds.scm.pipeline.PipelineStateManagerImpl@57a6a933): invoker=false
[main] INFO ha.SCMRatisServerImpl: registerStateMachineHandler(CONTAINER, org.apache.hadoop.hdds.scm.container.ContainerStateManagerImpl@31f295b6): invoker=false
[main] INFO ha.SCMRatisServerImpl: registerStateMachineHandler(BLOCK, org.apache.hadoop.hdds.scm.block.DeletedBlockLogStateManagerImpl@423a0e1d): invoker=false
[main] INFO ha.SCMRatisServerImpl: registerStateMachineHandler(STATEFUL_SERVICE_CONFIG, org.apache.hadoop.hdds.scm.ha.StatefulServiceStateManagerImpl@1c240cf2): invoker=false

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right -- we need to pass invoker but not getImpl().

registerStateMachineHandler(invoker.getType(), invoker);

}
stateMachine.registerHandler(handlerType, handler);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add else.

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImpl;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
import org.apache.hadoop.hdds.scm.ha.invoker.ScmInvoker;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey;
import org.apache.hadoop.hdds.utils.TransactionInfo;
Expand Down Expand Up @@ -77,6 +78,7 @@ public class SCMStateMachine extends BaseStateMachine {

private StorageContainerManager scm;
private Map<RequestType, Object> handlers;
private Map<RequestType, ScmInvoker<?>> invokers;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private Map<RequestType, ScmInvoker<?>> invokers;
private Map<RequestType, ScmInvoker<?>> invokers;

private SCMHADBTransactionBuffer transactionBuffer;
private final SimpleStateMachineStorage storage =
new SimpleStateMachineStorage();
Expand All @@ -93,6 +95,7 @@ public SCMStateMachine(final StorageContainerManager scm,
SCMHADBTransactionBuffer buffer) {
this.scm = scm;
this.handlers = new EnumMap<>(RequestType.class);
this.invokers = new EnumMap<>(RequestType.class);
this.transactionBuffer = buffer;
TransactionInfo latestTrxInfo = this.transactionBuffer.getLatestTrxInfo();
if (!latestTrxInfo.isDefault()) {
Expand All @@ -117,6 +120,11 @@ public void registerHandler(RequestType type, Object handler) {
handlers.put(type, handler);
}

public void registerInvoker(RequestType type,
ScmInvoker<?> invoker) {
invokers.put(type, invoker);
}

@Override
public SnapshotInfo getLatestSnapshot() {
// Transaction buffer will be null during scm initlialization phase
Expand Down Expand Up @@ -179,6 +187,10 @@ public CompletableFuture<Message> applyTransaction(
}

private Message process(final SCMRatisRequest request) throws Exception {
final ScmInvoker<?> invoker = invokers.get(request.getType());
if (invoker != null) {
return invoker.invokeLocal(request.getOperation(), request.getArguments());
}
return process(request, handlers.get(request.getType()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DeletedBlocksTransactionSummary;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.scm.block.DeletedBlockLogStateManager;
import org.apache.hadoop.hdds.scm.ha.SCMRatisResponse;
import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.ratis.protocol.Message;

/** Code generated for {@link DeletedBlockLogStateManager}. Do not modify. */
public class DeletedBlockLogStateManagerInvoker extends ScmInvoker<DeletedBlockLogStateManager> {
Expand Down Expand Up @@ -106,7 +108,9 @@ public void removeTransactionsFromDB(ArrayList arg0, DeletedBlocksTransactionSum

@SuppressWarnings("unchecked")
@Override
public Object invokeLocal(String methodName, Object[] p) throws Exception {
public Message invokeLocal(String methodName, Object[] p) throws Exception {
final Class<?> returnType;
final Object returnValue;
switch (methodName) {
case "addTransactionsToDB":
final ArrayList arg0 = p.length > 0 ? (ArrayList) p[0] : null;
Expand All @@ -115,7 +119,9 @@ public Object invokeLocal(String methodName, Object[] p) throws Exception {
return null;
Copy link
Copy Markdown
Contributor

@szetszwo szetszwo Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Russole , For void.class, we probably should return Message.EMPTY. Sorry that I missed it last time.

Further improvements to ScmInvokerCodeGenerator will be handled in a separate JIRA.

I agree to change ScmInvokerCodeGenerator separately after we make sure that the code change is good.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, thanks!
Updated to return Message.EMPTY.


case "getReadOnlyIterator":
return getImpl().getReadOnlyIterator();
returnType = Table.KeyValueIterator.class;
returnValue = getImpl().getReadOnlyIterator();
break;

case "onFlush":
getImpl().onFlush();
Expand All @@ -136,5 +142,7 @@ public Object invokeLocal(String methodName, Object[] p) throws Exception {
default:
throw new IllegalArgumentException("Method not found: " + methodName + " in DeletedBlockLogStateManager");
}

return SCMRatisResponse.encode(returnValue, returnType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.hdds.scm.ha.SCMRatisRequest;
import org.apache.hadoop.hdds.scm.ha.SCMRatisResponse;
import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;
import org.apache.ratis.protocol.Message;

/**
* Invokes methods without using reflection.
Expand Down Expand Up @@ -55,8 +56,12 @@ public final T getProxy() {
return proxy;
}

public final Object invoke(String methodName, Object[] args) throws Exception {
return invokeLocal(methodName, args);
}

/** For non-@Replicate methods. */
abstract Object invokeLocal(String methodName, Object[] args) throws Exception;
public abstract Message invokeLocal(String methodName, Object[] args) throws Exception;

/** For @Replicate DIRECT methods. */
final Object invokeReplicateDirect(NameAndParameterTypes method, Object[] args) throws SCMException {
Expand Down
Loading