Skip to content

Commit 112c528

Browse files
author
gituser
committed
Merge branch '1.10_release_4.1.x' into 1.10_release_4.2.x
2 parents 4914d94 + 7c6e8f7 commit 112c528

2 files changed

Lines changed: 25 additions & 16 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ public class ExecuteProcessHelper {
104104
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
105105

106106
private static final String TIME_ZONE = "timezone";
107-
private static final String PLUGIN_PATH_STR = "pluginPath";
107+
private static final String PLUGIN_LOCAL_STR = "pluginPath";
108108
private static final String PLUGIN_LOAD_STR = "pluginLoadMode";
109109

110110
public static FlinkPlanner flinkPlanner = new FlinkPlanner();
@@ -137,8 +137,8 @@ public static ParamsInfo parseParams(String[] args) throws Exception {
137137
dirtyProperties.put(PLUGIN_LOAD_STR, pluginLoadMode);
138138
}
139139

140-
if (!pluginLoadMode.equalsIgnoreCase(EPluginLoadMode.LOCALTEST.name()) && Objects.isNull(dirtyProperties.get(PLUGIN_PATH_STR))) {
141-
dirtyProperties.put(PLUGIN_PATH_STR, localSqlPluginPath);
140+
if (!pluginLoadMode.equalsIgnoreCase(EPluginLoadMode.LOCALTEST.name()) && Objects.isNull(dirtyProperties.get(PLUGIN_LOCAL_STR))) {
141+
dirtyProperties.put(PLUGIN_LOCAL_STR, localSqlPluginPath);
142142
}
143143

144144
List<URL> jarUrlList = getExternalJarUrls(options.getAddjar());
@@ -393,9 +393,10 @@ public static Set<URL> registerTable(
393393
return Sets.newHashSet();
394394
}
395395
pluginClassPathSets.add(PluginUtil.buildDirtyPluginUrl(
396-
String.valueOf(dirtyProperties.get(DirtyKeys.PLUGIN_TYPE_STR)),
397-
String.valueOf(dirtyProperties.get(DirtyKeys.PLUGIN_PATH_STR)),
398-
String.valueOf(dirtyProperties.get(DirtyKeys.PLUGIN_LOAD_MODE_STR))
396+
String.valueOf(dirtyProperties.get(DirtyKeys.PLUGIN_TYPE_STR)),
397+
localSqlPluginPath,
398+
remoteSqlPluginPath,
399+
pluginLoadMode
399400
));
400401
return pluginClassPathSets;
401402
}

core/src/main/java/com/dtstack/flink/sql/util/PluginUtil.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.io.File;
3333
import java.io.IOException;
3434
import java.net.MalformedURLException;
35+
import java.net.URI;
3536
import java.net.URL;
3637
import java.nio.charset.StandardCharsets;
3738
import java.util.ArrayList;
@@ -49,12 +50,14 @@
4950

5051
public class PluginUtil {
5152

52-
private static String SP = File.separator;
53+
private static final String SP = File.separator;
5354

5455
private static final String JAR_SUFFIX = ".jar";
5556

5657
private static final String CLASS_PRE_STR = "com.dtstack.flink.sql";
5758

59+
private static final String DIRTY_DATA_PRE = "dirtyData";
60+
5861
private static final Logger LOG = LoggerFactory.getLogger(PluginUtil.class);
5962

6063
private static final ObjectMapper objectMapper = new ObjectMapper();
@@ -161,27 +164,32 @@ public static URL buildFinalJarFilePath(String pluginType, String tableType, Str
161164
* build dirty data url from plugin path
162165
*
163166
* @param dirtyType type of dirty type
164-
* @param pluginPath plugin path
167+
* @param localPath local path
168+
* @param remotePath remote path
165169
* @param pluginLoadMode load plugin mode
166170
* @return dirty plugin url
167171
* @throws Exception exception
168172
*/
169173
public static URL buildDirtyPluginUrl(
170-
String dirtyType,
171-
String pluginPath,
172-
String pluginLoadMode) throws Exception {
174+
String dirtyType,
175+
String localPath,
176+
String remotePath,
177+
String pluginLoadMode) throws Exception {
173178
if (Objects.isNull(dirtyType)) {
174179
dirtyType = DirtyConsumerFactory.DEFAULT_DIRTY_TYPE;
175180
}
176181
String prefix = String.format("dirtyConsumer-%s", dirtyType.toLowerCase()).toLowerCase();
177182
String consumerType = DirtyConsumerFactory.DIRTY_CONSUMER_PATH + File.separator + dirtyType;
178-
String consumerJar = PluginUtil.getJarFileDirPath(consumerType, pluginPath, pluginLoadMode);
183+
String consumerJar = PluginUtil.getJarFileDirPath(consumerType, localPath, pluginLoadMode);
179184
String jarFileName = PluginUtil.getCoreJarFileName(
180-
consumerJar,
181-
prefix,
182-
pluginLoadMode
185+
consumerJar,
186+
prefix,
187+
pluginLoadMode
183188
);
184-
return new URL("file:" + consumerJar + SP + jarFileName);
189+
String path = remotePath == null ? localPath : remotePath;
190+
String dirtyPath = path + SP + DIRTY_DATA_PRE + SP + dirtyType;
191+
URI uri = URI.create("file:" + dirtyPath + SP + jarFileName);
192+
return uri.toURL();
185193
}
186194

187195
public static URL getRemoteSideJarFilePath(String pluginType, String sideOperator, String tableType, String remoteSqlRootDir, String localSqlPluginPath, String pluginLoadMode) throws Exception {

0 commit comments

Comments
 (0)