Skip to content

Commit 71a1524

Browse files
author
yanxi0227
committed
Merge branch 'v1.5.0_dev' of github.com:DTStack/flinkStreamSQL into v1.5.0_dev
2 parents be12caa + 508050c commit 71a1524

36 files changed

Lines changed: 1380 additions & 69 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/side/AllReqRow.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
* @author xuchao
3838
*/
3939

40-
public abstract class AllReqRow extends RichFlatMapFunction<Row, Row>{
40+
public abstract class AllReqRow extends RichFlatMapFunction<Row, Row> implements ISideReqRow {
4141

4242
protected SideInfo sideInfo;
4343

@@ -48,8 +48,6 @@ public AllReqRow(SideInfo sideInfo){
4848

4949
}
5050

51-
protected abstract Row fillData(Row input, Object sideInput);
52-
5351
protected abstract void initCache() throws SQLException;
5452

5553
protected abstract void reloadCache();

core/src/main/java/com/dtstack/flink/sql/side/AsyncReqRow.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
* @author xuchao
4141
*/
4242

43-
public abstract class AsyncReqRow extends RichAsyncFunction<Row, Row> {
43+
public abstract class AsyncReqRow extends RichAsyncFunction<Row, Row> implements ISideReqRow {
4444

4545
private static final long serialVersionUID = 2098635244857937717L;
4646

@@ -79,9 +79,6 @@ protected boolean openCache(){
7979
return sideInfo.getSideCache() != null;
8080
}
8181

82-
83-
protected abstract Row fillData(Row input, Object sideInput);
84-
8582
protected void dealMissKey(Row input, ResultFuture<Row> resultFuture){
8683
if(sideInfo.getJoinType() == JoinType.LEFT){
8784
//Reserved left table data
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.side;
20+
21+
import org.apache.flink.types.Row;
22+
23+
/**
24+
*
25+
* Date: 2018/12/4
26+
* Company: www.dtstack.com
27+
* @author xuchao
28+
*/
29+
public interface ISideReqRow {
30+
31+
Row fillData(Row input, Object sideInput);
32+
33+
}

core/src/main/java/com/dtstack/flink/sql/table/TableInfo.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
2424

25+
import java.io.Serializable;
2526
import java.util.List;
2627

2728
/**
@@ -31,7 +32,7 @@
3132
* @author xuchao
3233
*/
3334

34-
public abstract class TableInfo {
35+
public abstract class TableInfo implements Serializable {
3536

3637
public static final String PARALLELISM_KEY = "parallelism";
3738

@@ -121,6 +122,29 @@ public void addFieldType(String fieldType){
121122
fieldTypeList.add(fieldType);
122123
}
123124

125+
public void setFields(String[] fields) {
126+
this.fields = fields;
127+
}
128+
129+
public void setFieldTypes(String[] fieldTypes) {
130+
this.fieldTypes = fieldTypes;
131+
}
132+
133+
public void setFieldClasses(Class<?>[] fieldClasses) {
134+
this.fieldClasses = fieldClasses;
135+
}
136+
137+
public List<String> getFieldList() {
138+
return fieldList;
139+
}
140+
141+
public List<String> getFieldTypeList() {
142+
return fieldTypeList;
143+
}
144+
145+
public List<Class> getFieldClassList() {
146+
return fieldClassList;
147+
}
124148

125149
public void finish(){
126150
this.fields = fieldList.toArray(new String[fieldList.size()]);

hbase/hbase-side/hbase-all-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAllReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public HbaseAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo
6666
}
6767

6868
@Override
69-
protected Row fillData(Row input, Object sideInput) {
69+
public Row fillData(Row input, Object sideInput) {
7070
Map<String, Object> sideInputList = (Map<String, Object>) sideInput;
7171
Row row = new Row(sideInfo.getOutFieldInfoList().size());
7272
for(Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()){

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/HbaseAsyncReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ public void asyncInvoke(Row input, ResultFuture<Row> resultFuture) throws Except
160160
}
161161

162162
@Override
163-
protected Row fillData(Row input, Object sideInput){
163+
public Row fillData(Row input, Object sideInput){
164164

165165
List<Object> sideInputList = (List<Object>) sideInput;
166166
Row row = new Row(sideInfo.getOutFieldInfoList().size());

launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,15 @@ private static String getLocalCoreJarPath(String localSqlRootJar){
5959
}
6060

6161
public static void main(String[] args) throws Exception {
62-
if (args.length==1 && args[0].endsWith(".json")){
62+
if (args.length == 1 && args[0].endsWith(".json")){
6363
args = parseJson(args);
6464
}
65+
6566
LauncherOptionParser optionParser = new LauncherOptionParser(args);
6667
LauncherOptions launcherOptions = optionParser.getLauncherOptions();
6768
String mode = launcherOptions.getMode();
6869
List<String> argList = optionParser.getProgramExeArgList();
70+
6971
if(mode.equals(ClusterMode.local.name())) {
7072
String[] localArgs = argList.toArray(new String[argList.size()]);
7173
Main.main(localArgs);

mongo/mongo-side/mongo-all-side/src/main/java/com/dtstack/flink/sql/side/mongo/MongoAllReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public MongoAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo
8080
}
8181

8282
@Override
83-
protected Row fillData(Row input, Object sideInput) {
83+
public Row fillData(Row input, Object sideInput) {
8484
Map<String, Object> cacheInfo = (Map<String, Object>) sideInput;
8585
Row row = new Row(sideInfo.getOutFieldInfoList().size());
8686
for (Map.Entry<Integer, Integer> entry : sideInfo.getInFieldIndex().entrySet()) {

mysql/mysql-sink/src/main/java/com/dtstack/flink/sql/sink/mysql/MysqlSink.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import com.dtstack.flink.sql.sink.IStreamSinkGener;
2424
import com.dtstack.flink.sql.sink.rdb.RdbSink;
2525
import com.dtstack.flink.sql.sink.rdb.format.RetractJDBCOutputFormat;
26-
2726
import java.util.List;
2827
import java.util.Map;
2928

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>sql.side.oracle</artifactId>
7+
<groupId>com.dtstack.flink</groupId>
8+
<version>1.0-SNAPSHOT</version>
9+
<relativePath>../pom.xml</relativePath>
10+
</parent>
11+
<modelVersion>4.0.0</modelVersion>
12+
13+
<artifactId>sql.side.all.oracle</artifactId>
14+
<version>1.0-SNAPSHOT</version>
15+
<name>oracle-all-side</name>
16+
<packaging>jar</packaging>
17+
18+
<properties>
19+
<sql.side.oracle.core.version>1.0-SNAPSHOT</sql.side.oracle.core.version>
20+
</properties>
21+
22+
<dependencies>
23+
<dependency>
24+
<groupId>com.dtstack.flink</groupId>
25+
<artifactId>sql.side.oracle.core</artifactId>
26+
<version>${sql.side.oracle.core.version}</version>
27+
</dependency>
28+
</dependencies>
29+
30+
<build>
31+
<plugins>
32+
<plugin>
33+
<groupId>org.apache.maven.plugins</groupId>
34+
<artifactId>maven-shade-plugin</artifactId>
35+
<version>1.4</version>
36+
<executions>
37+
<execution>
38+
<phase>package</phase>
39+
<goals>
40+
<goal>shade</goal>
41+
</goals>
42+
<configuration>
43+
<artifactSet>
44+
<excludes>
45+
46+
</excludes>
47+
</artifactSet>
48+
<filters>
49+
<filter>
50+
<artifact>*:*</artifact>
51+
<excludes>
52+
<exclude>META-INF/*.SF</exclude>
53+
<exclude>META-INF/*.DSA</exclude>
54+
<exclude>META-INF/*.RSA</exclude>
55+
</excludes>
56+
</filter>
57+
</filters>
58+
</configuration>
59+
</execution>
60+
</executions>
61+
</plugin>
62+
63+
<plugin>
64+
<artifactId>maven-antrun-plugin</artifactId>
65+
<version>1.2</version>
66+
<executions>
67+
<execution>
68+
<id>copy-resources</id>
69+
<!-- here the phase you need -->
70+
<phase>package</phase>
71+
<goals>
72+
<goal>run</goal>
73+
</goals>
74+
<configuration>
75+
<tasks>
76+
<copy todir="${basedir}/../../../plugins/oracleallside">
77+
<fileset dir="target/">
78+
<include name="${project.artifactId}-${project.version}.jar"/>
79+
</fileset>
80+
</copy>
81+
82+
<move file="${basedir}/../../../plugins/oracleallside/${project.artifactId}-${project.version}.jar"
83+
tofile="${basedir}/../../../plugins/oracleallside/${project.name}.jar"/>
84+
</tasks>
85+
</configuration>
86+
</execution>
87+
</executions>
88+
</plugin>
89+
</plugins>
90+
</build>
91+
</project>

0 commit comments

Comments
 (0)