Skip to content

Commit 7c6e8f7

Browse files
author
gituser
committed
Merge branch 'hotfix_1.10_4.1.x_36135' into 1.10_release_4.1.x
2 parents f89d16a + 44c3191 commit 7c6e8f7

2 files changed

Lines changed: 25 additions & 16 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ public class ExecuteProcessHelper {
104104
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
105105

106106
private static final String TIME_ZONE = "timezone";
107-
private static final String PLUGIN_PATH_STR = "pluginPath";
107+
private static final String PLUGIN_LOCAL_STR = "pluginPath";
108108
private static final String PLUGIN_LOAD_STR = "pluginLoadMode";
109109

110110
public static FlinkPlanner flinkPlanner = new FlinkPlanner();
@@ -137,8 +137,8 @@ public static ParamsInfo parseParams(String[] args) throws Exception {
137137
dirtyProperties.put(PLUGIN_LOAD_STR, pluginLoadMode);
138138
}
139139

140-
if (!pluginLoadMode.equalsIgnoreCase(EPluginLoadMode.LOCALTEST.name()) && Objects.isNull(dirtyProperties.get(PLUGIN_PATH_STR))) {
141-
dirtyProperties.put(PLUGIN_PATH_STR, localSqlPluginPath);
140+
if (!pluginLoadMode.equalsIgnoreCase(EPluginLoadMode.LOCALTEST.name()) && Objects.isNull(dirtyProperties.get(PLUGIN_LOCAL_STR))) {
141+
dirtyProperties.put(PLUGIN_LOCAL_STR, localSqlPluginPath);
142142
}
143143

144144
List<URL> jarUrlList = getExternalJarUrls(options.getAddjar());
@@ -393,9 +393,10 @@ public static Set<URL> registerTable(
393393
return Sets.newHashSet();
394394
}
395395
pluginClassPathSets.add(PluginUtil.buildDirtyPluginUrl(
396-
String.valueOf(dirtyProperties.get(DirtyKeys.PLUGIN_TYPE_STR)),
397-
String.valueOf(dirtyProperties.get(DirtyKeys.PLUGIN_PATH_STR)),
398-
String.valueOf(dirtyProperties.get(DirtyKeys.PLUGIN_LOAD_MODE_STR))
396+
String.valueOf(dirtyProperties.get(DirtyKeys.PLUGIN_TYPE_STR)),
397+
localSqlPluginPath,
398+
remoteSqlPluginPath,
399+
pluginLoadMode
399400
));
400401
return pluginClassPathSets;
401402
}

core/src/main/java/com/dtstack/flink/sql/util/PluginUtil.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.io.File;
3535
import java.io.IOException;
3636
import java.net.MalformedURLException;
37+
import java.net.URI;
3738
import java.net.URL;
3839
import java.nio.charset.StandardCharsets;
3940
import java.util.ArrayList;
@@ -51,12 +52,14 @@
5152

5253
public class PluginUtil {
5354

54-
private static String SP = File.separator;
55+
private static final String SP = File.separator;
5556

5657
private static final String JAR_SUFFIX = ".jar";
5758

5859
private static final String CLASS_PRE_STR = "com.dtstack.flink.sql";
5960

61+
private static final String DIRTY_DATA_PRE = "dirtyData";
62+
6063
private static final Logger LOG = LoggerFactory.getLogger(PluginUtil.class);
6164

6265
private static final ObjectMapper objectMapper = new ObjectMapper();
@@ -159,27 +162,32 @@ public static URL buildFinalJarFilePath(String pluginType, String tableType, Str
159162
* build dirty data url from plugin path
160163
*
161164
* @param dirtyType type of dirty type
162-
* @param pluginPath plugin path
165+
* @param localPath local path
166+
* @param remotePath remote path
163167
* @param pluginLoadMode load plugin mode
164168
* @return dirty plugin url
165169
* @throws Exception exception
166170
*/
167171
public static URL buildDirtyPluginUrl(
168-
String dirtyType,
169-
String pluginPath,
170-
String pluginLoadMode) throws Exception {
172+
String dirtyType,
173+
String localPath,
174+
String remotePath,
175+
String pluginLoadMode) throws Exception {
171176
if (Objects.isNull(dirtyType)) {
172177
dirtyType = DirtyConsumerFactory.DEFAULT_DIRTY_TYPE;
173178
}
174179
String prefix = String.format("dirtyConsumer-%s", dirtyType.toLowerCase()).toLowerCase();
175180
String consumerType = DirtyConsumerFactory.DIRTY_CONSUMER_PATH + File.separator + dirtyType;
176-
String consumerJar = PluginUtil.getJarFileDirPath(consumerType, pluginPath, pluginLoadMode);
181+
String consumerJar = PluginUtil.getJarFileDirPath(consumerType, localPath, pluginLoadMode);
177182
String jarFileName = PluginUtil.getCoreJarFileName(
178-
consumerJar,
179-
prefix,
180-
pluginLoadMode
183+
consumerJar,
184+
prefix,
185+
pluginLoadMode
181186
);
182-
return new URL("file:" + consumerJar + SP + jarFileName);
187+
String path = remotePath == null ? localPath : remotePath;
188+
String dirtyPath = path + SP + DIRTY_DATA_PRE + SP + dirtyType;
189+
URI uri = URI.create("file:" + dirtyPath + SP + jarFileName);
190+
return uri.toURL();
183191
}
184192

185193
public static URL getRemoteSideJarFilePath(String pluginType, String sideOperator, String tableType, String remoteSqlRootDir, String localSqlPluginPath, String pluginLoadMode) throws Exception {

0 commit comments

Comments
 (0)