Skip to content

Commit a65a5c1

Browse files
committed
Fix implementation based on comment
1 parent 9de155f commit a65a5c1

4 files changed

Lines changed: 18 additions & 23 deletions

File tree

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.hadoop.hdds.scm.AddSCMRequest;
4242
import org.apache.hadoop.hdds.scm.RemoveSCMRequest;
4343
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
44+
import org.apache.hadoop.hdds.scm.ha.invoker.ScmInvoker;
4445
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
4546
import org.apache.hadoop.hdds.security.SecurityConfig;
4647
import org.apache.hadoop.ozone.OzoneConsts;
@@ -223,6 +224,9 @@ public SCMStateMachine getSCMStateMachine() {
223224
@Override
224225
public void registerStateMachineHandler(final RequestType handlerType,
225226
final Object handler) {
227+
if (handler instanceof ScmInvoker) {
228+
stateMachine.registerInvoker(handlerType, (ScmInvoker) handler);
229+
}
226230
stateMachine.registerHandler(handlerType, handler);
227231
}
228232

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ public void registerHandler(RequestType type, Object handler) {
120120
handlers.put(type, handler);
121121
}
122122

123-
public void registerStateMachineInvoker(RequestType type,
123+
public void registerInvoker(RequestType type,
124124
ScmInvoker<?> invoker) {
125125
invokers.put(type, invoker);
126126
}
@@ -189,7 +189,7 @@ public CompletableFuture<Message> applyTransaction(
189189
private Message process(final SCMRatisRequest request) throws Exception {
190190
final ScmInvoker<?> invoker = invokers.get(request.getType());
191191
if (invoker != null) {
192-
return process(request, invoker);
192+
return invoker.invokeLocal(request.getOperation(), request.getArguments());
193193
}
194194
return process(request, handlers.get(request.getType()));
195195
}
@@ -212,24 +212,6 @@ public static Message process(final SCMRatisRequest request, Object handler) thr
212212
}
213213
}
214214

215-
public static Message process(final SCMRatisRequest request,
216-
final ScmInvoker<?> invoker) throws Exception {
217-
218-
if (invoker == null) {
219-
throw new IOException("No invoker found for request type "
220-
+ request.getType());
221-
}
222-
223-
final Object result = invoker.invoke(
224-
request.getOperation(),
225-
request.getArguments());
226-
227-
final Class<?> returnType =
228-
invoker.getReturnType(request.getOperation(), request.getParameterTypes());
229-
230-
return SCMRatisResponse.encode(result, returnType);
231-
}
232-
233215
@Override
234216
public void notifyLogFailed(Throwable ex,
235217
RaftProtos.LogEntryProto failedEntry) {

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/invoker/DeletedBlockLogStateManagerInvoker.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@
2323
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DeletedBlocksTransactionSummary;
2424
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
2525
import org.apache.hadoop.hdds.scm.block.DeletedBlockLogStateManager;
26+
import org.apache.hadoop.hdds.scm.ha.SCMRatisResponse;
2627
import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;
2728
import org.apache.hadoop.hdds.utils.db.Table;
29+
import org.apache.ratis.protocol.Message;
2830

2931
/** Code generated for {@link DeletedBlockLogStateManager}. Do not modify. */
3032
public class DeletedBlockLogStateManagerInvoker extends ScmInvoker<DeletedBlockLogStateManager> {
@@ -107,7 +109,9 @@ public void removeTransactionsFromDB(ArrayList arg0, DeletedBlocksTransactionSum
107109

108110
@SuppressWarnings("unchecked")
109111
@Override
110-
public Object invokeLocal(String methodName, Object[] p) throws Exception {
112+
public Message invokeLocal(String methodName, Object[] p) throws Exception {
113+
final Class<?> returnType;
114+
final Object returnValue;
111115
switch (methodName) {
112116
case "addTransactionsToDB":
113117
final ArrayList arg0 = p.length > 0 ? (ArrayList) p[0] : null;
@@ -116,7 +120,9 @@ public Object invokeLocal(String methodName, Object[] p) throws Exception {
116120
return null;
117121

118122
case "getReadOnlyIterator":
119-
return getImpl().getReadOnlyIterator();
123+
returnType = Table.KeyValueIterator.class;
124+
returnValue = getImpl().getReadOnlyIterator();
125+
break;
120126

121127
case "onFlush":
122128
getImpl().onFlush();
@@ -137,6 +143,8 @@ public Object invokeLocal(String methodName, Object[] p) throws Exception {
137143
default:
138144
throw new IllegalArgumentException("Method not found: " + methodName + " in DeletedBlockLogStateManager");
139145
}
146+
147+
return SCMRatisResponse.encode(returnValue, returnType);
140148
}
141149

142150
@Override

hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/invoker/ScmInvoker.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.hadoop.hdds.scm.ha.SCMRatisRequest;
2727
import org.apache.hadoop.hdds.scm.ha.SCMRatisResponse;
2828
import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;
29+
import org.apache.ratis.protocol.Message;
2930

3031
/**
3132
* Invokes methods without using reflection.
@@ -62,7 +63,7 @@ public final Object invoke(String methodName, Object[] args) throws Exception {
6263
public abstract Class<?> getReturnType(String methodName, Class<?>[] parameterTypes);
6364

6465
/** For non-@Replicate methods. */
65-
abstract Object invokeLocal(String methodName, Object[] args) throws Exception;
66+
public abstract Message invokeLocal(String methodName, Object[] args) throws Exception;
6667

6768
/** For @Replicate DIRECT methods. */
6869
final Object invokeReplicateDirect(NameAndParameterTypes method, Object[] args) throws SCMException {

0 commit comments

Comments
 (0)