-
Notifications
You must be signed in to change notification settings - Fork 600
HDDS-14974. Change SCMStateMachine to use ScmInvoker #10079
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 11 commits
5299231
498ad64
7991aae
620cac6
755c900
ee9f481
95c163f
9de155f
a65a5c1
36da113
d00a2cf
90c5052
874dad7
8d91412
77602d4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -41,6 +41,7 @@ | |
| import org.apache.hadoop.hdds.scm.AddSCMRequest; | ||
| import org.apache.hadoop.hdds.scm.RemoveSCMRequest; | ||
| import org.apache.hadoop.hdds.scm.ScmConfigKeys; | ||
| import org.apache.hadoop.hdds.scm.ha.invoker.ScmInvoker; | ||
| import org.apache.hadoop.hdds.scm.server.StorageContainerManager; | ||
| import org.apache.hadoop.hdds.security.SecurityConfig; | ||
| import org.apache.hadoop.ozone.OzoneConsts; | ||
|
|
@@ -223,6 +224,9 @@ public SCMStateMachine getSCMStateMachine() { | |
| @Override | ||
| public void registerStateMachineHandler(final RequestType handlerType, | ||
| final Object handler) { | ||
| if (handler instanceof ScmInvoker) { | ||
| stateMachine.registerInvoker(handlerType, (ScmInvoker) handler); | ||
| } | ||
| stateMachine.registerHandler(handlerType, handler); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should add |
||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -41,6 +41,7 @@ | |||||
| import org.apache.hadoop.hdds.scm.block.DeletedBlockLogImpl; | ||||||
| import org.apache.hadoop.hdds.scm.exceptions.SCMException; | ||||||
| import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes; | ||||||
| import org.apache.hadoop.hdds.scm.ha.invoker.ScmInvoker; | ||||||
| import org.apache.hadoop.hdds.scm.server.StorageContainerManager; | ||||||
| import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey; | ||||||
| import org.apache.hadoop.hdds.utils.TransactionInfo; | ||||||
|
|
@@ -77,6 +78,7 @@ public class SCMStateMachine extends BaseStateMachine { | |||||
|
|
||||||
| private StorageContainerManager scm; | ||||||
| private Map<RequestType, Object> handlers; | ||||||
| private Map<RequestType, ScmInvoker<?>> invokers; | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| private SCMHADBTransactionBuffer transactionBuffer; | ||||||
| private final SimpleStateMachineStorage storage = | ||||||
| new SimpleStateMachineStorage(); | ||||||
|
|
@@ -93,6 +95,7 @@ public SCMStateMachine(final StorageContainerManager scm, | |||||
| SCMHADBTransactionBuffer buffer) { | ||||||
| this.scm = scm; | ||||||
| this.handlers = new EnumMap<>(RequestType.class); | ||||||
| this.invokers = new EnumMap<>(RequestType.class); | ||||||
| this.transactionBuffer = buffer; | ||||||
| TransactionInfo latestTrxInfo = this.transactionBuffer.getLatestTrxInfo(); | ||||||
| if (!latestTrxInfo.isDefault()) { | ||||||
|
|
@@ -117,6 +120,11 @@ public void registerHandler(RequestType type, Object handler) { | |||||
| handlers.put(type, handler); | ||||||
| } | ||||||
|
|
||||||
| public void registerInvoker(RequestType type, | ||||||
| ScmInvoker<?> invoker) { | ||||||
| invokers.put(type, invoker); | ||||||
| } | ||||||
|
|
||||||
| @Override | ||||||
| public SnapshotInfo getLatestSnapshot() { | ||||||
| // Transaction buffer will be null during scm initlialization phase | ||||||
|
|
@@ -179,6 +187,10 @@ public CompletableFuture<Message> applyTransaction( | |||||
| } | ||||||
|
|
||||||
| private Message process(final SCMRatisRequest request) throws Exception { | ||||||
| final ScmInvoker<?> invoker = invokers.get(request.getType()); | ||||||
| if (invoker != null) { | ||||||
| return invoker.invokeLocal(request.getOperation(), request.getArguments()); | ||||||
| } | ||||||
| return process(request, handlers.get(request.getType())); | ||||||
| } | ||||||
|
|
||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,8 +22,10 @@ | |
| import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DeletedBlocksTransactionSummary; | ||
| import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; | ||
| import org.apache.hadoop.hdds.scm.block.DeletedBlockLogStateManager; | ||
| import org.apache.hadoop.hdds.scm.ha.SCMRatisResponse; | ||
| import org.apache.hadoop.hdds.scm.ha.SCMRatisServer; | ||
| import org.apache.hadoop.hdds.utils.db.Table; | ||
| import org.apache.ratis.protocol.Message; | ||
|
|
||
| /** Code generated for {@link DeletedBlockLogStateManager}. Do not modify. */ | ||
| public class DeletedBlockLogStateManagerInvoker extends ScmInvoker<DeletedBlockLogStateManager> { | ||
|
|
@@ -106,7 +108,9 @@ public void removeTransactionsFromDB(ArrayList arg0, DeletedBlocksTransactionSum | |
|
|
||
| @SuppressWarnings("unchecked") | ||
| @Override | ||
| public Object invokeLocal(String methodName, Object[] p) throws Exception { | ||
| public Message invokeLocal(String methodName, Object[] p) throws Exception { | ||
| final Class<?> returnType; | ||
| final Object returnValue; | ||
| switch (methodName) { | ||
| case "addTransactionsToDB": | ||
| final ArrayList arg0 = p.length > 0 ? (ArrayList) p[0] : null; | ||
|
|
@@ -115,7 +119,9 @@ public Object invokeLocal(String methodName, Object[] p) throws Exception { | |
| return null; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Russole , For void.class, we probably should return
I agree to change ScmInvokerCodeGenerator separately after we make sure that the code change is good.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch, thanks! |
||
|
|
||
| case "getReadOnlyIterator": | ||
| return getImpl().getReadOnlyIterator(); | ||
| returnType = Table.KeyValueIterator.class; | ||
| returnValue = getImpl().getReadOnlyIterator(); | ||
| break; | ||
|
|
||
| case "onFlush": | ||
| getImpl().onFlush(); | ||
|
|
@@ -136,5 +142,7 @@ public Object invokeLocal(String methodName, Object[] p) throws Exception { | |
| default: | ||
| throw new IllegalArgumentException("Method not found: " + methodName + " in DeletedBlockLogStateManager"); | ||
| } | ||
|
|
||
| return SCMRatisResponse.encode(returnValue, returnType); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this method is called with
ScmInvoker, ratherinvoker.getImpl().ozone/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServer.java
Lines 75 to 76 in 874dad7
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right -- we need to pass invoker but not getImpl().