33CREATE TABLE tableName(
44 colName colType,
55 ...
6- colNameX colType
6+ colNameX colType,
7+ [primary key (colName)]
78 )WITH(
89 type ='db2',
910 url ='jdbcUrl',
@@ -25,29 +26,112 @@ CREATE TABLE tableName(
2526| tableName| db2表名称|
2627| colName | 列名称|
2728| colType | 列类型 [ colType支持的类型] ( docs/colType.md ) |
29+ | primary key | updateMode为UPSERT时,需要指定的主键信息,不需要和数据库一致|
30+
2831
2932## 4.参数:
3033
3134| 参数名称| 含义| 是否必填| 默认值|
3235| ----| ----| ----| ----|
33- | type | 表名 输出表类型[ mysq| ; hbase| ; elasticsearch] | 是||
34- | url | 连接db2数据库 jdbcUrl | 是||
35- | userName | db2连接用户名 | 是||
36- | password | db2连接密码| 是||
37- | tableName | db2表名称| 是||
38- | parallelism | 并行度设置| 否| 1|
36+ | type | 结果表插件类型,必须为db2| 是||
37+ | url | 连接db2数据库 jdbcUrl | 是||
38+ | userName | db2连接用户名 | 是||
39+ | password | db2连接密码| 是||
40+ | tableName | db2表名称| 是||
41+ | schema | db2表空间| 否||
42+ | parallelism | 并行度设置| 否| 1|
43+ | batchSize | flush的大小| 否| 100|
44+ | batchWaitInterval | flush的时间间隔,单位ms| 否| 1000|
45+ | allReplace| true:新值替换旧值| 否| false|
46+ | updateMode| APPEND:不回撤数据,只下发增量数据,UPSERT:先删除回撤数据,然后更新| 否| 结果表设置主键则为UPSERT|
3947
4048## 5.样例:
49+
50+ 回溯流删除
51+
4152```
53+
54+ CREATE TABLE source1 (
55+ id int,
56+ name VARCHAR
57+ )WITH(
58+ type ='kafka11',
59+ bootstrapServers ='172.16.8.107:9092',
60+ zookeeperQuorum ='172.16.8.107:2181/kafka',
61+ offsetReset ='latest',
62+ topic ='mqTest03',
63+ timezone='Asia/Shanghai',
64+ topicIsPattern ='false'
65+ );
66+
67+
68+
69+ CREATE TABLE source2(
70+ id int,
71+ address VARCHAR
72+ )WITH(
73+ type ='kafka11',
74+ bootstrapServers ='172.16.8.107:9092',
75+ zookeeperQuorum ='172.16.8.107:2181/kafka',
76+ offsetReset ='latest',
77+ topic ='mqTest04',
78+ timezone='Asia/Shanghai',
79+ topicIsPattern ='false'
80+ );
81+
82+
4283CREATE TABLE MyResult(
43- channel VARCHAR,
44- pv VARCHAR
45- )WITH(
46- type ='db2',
47- url ='jdbc:db2://172.16.8.104:50000/test?charset=utf8',
48- userName ='dtstack',
49- password ='abc123',
50- tableName ='pv2',
51- parallelism ='1'
52- )
53- ```
84+ id int,
85+ name VARCHAR,
86+ address VARCHAR,
87+ primary key (id)
88+ )WITH(
89+ type='db2',
90+ url='jdbc:db2://172.16.10.251:50000/mqTest',
91+ userName='DB2INST1',
92+ password='abc123',
93+ tableName='USER_INFO2',
94+ schema = 'DTSTACK',
95+ updateMode = 'upsert', // 设置 primary key则默认为upsert
96+ batchSize = '1'
97+ );
98+
99+ insert into MyResult
100+ select
101+ s1.id,
102+ s1.name,
103+ s2.address
104+ from
105+ source1 s1
106+ left join
107+ source2 s2
108+ on
109+ s1.id = s2.id
110+
111+
112+
113+ ```
114+
115+
116+ DB2结果表建表语句:
117+
118+ ``` aidl
119+ CREATE TABLE "DTSTACK "."USER_INFO2" (
120+ "ID" INTEGER ,
121+ "NAME" VARCHAR(50 OCTETS) ,
122+ "ADDRESS" VARCHAR(50 OCTETS) )
123+ IN "USERSPACE1"
124+ ORGANIZE BY ROW
125+ ;
126+
127+ GRANT CONTROL ON TABLE "DTSTACK "."USER_INFO2" TO USER "DB2INST1"
128+ ;
129+ ```
130+
131+
132+
133+ 数据结果:
134+
135+ 向Topic mqTest03 发送数据 {"name":"maqi","id":1001} 插入 (1001,"maqi",null)
136+
137+ 向Topic mqTest04 发送数据 {"address":"hz","id":1001} 删除 (1001,"maqi",null) 插入 (1001,"maqi","hz")
0 commit comments