33CREATE TABLE tableName(
44 colName colType,
55 ...
6- colNameX colType
6+ colNameX colType,
7+ primary key (colName)
78 )WITH(
89 type ='sqlserver',
910 url ='jdbcUrl',
@@ -25,29 +26,90 @@ CREATE TABLE tableName(
2526| tableName| sqlserver表名称|
2627| colName | 列名称|
2728| colType | 列类型 [ colType支持的类型] ( docs/colType.md ) |
29+ | primary key | updateMode为UPSERT时,需要指定的主键信息|
2830
2931## 4.参数:
3032
3133| 参数名称| 含义| 是否必填| 默认值|
3234| ----| ----| ----| ----|
33- | type | 表名 输出表类型 [ mysq &# 124 ; hbase &# 124 ; elasticsearch ] | 是||
35+ | type | 结果表插件类型,必须为sqlserver | 是||
3436| url | 连接sqlserver数据库 jdbcUrl | 是||
35- | userName | sqlserver连接用户名 | 是||
36- | password | sqlserver连接密码| 是||
37- | tableName | sqlserver表名称| 是||
38- | parallelism | 并行度设置| 否| 1|
37+ | userName | sqlserver连接用户名 | 是||
38+ | password | sqlserver连接密码| 是||
39+ | tableName | sqlserver表名称| 是||
40+ | schema | sqlserver表空间| 否||
41+ | parallelism | 并行度设置| 否| 1|
42+ | batchSize | flush的大小| 否| 100|
43+ | batchWaitInterval | flush的时间间隔,单位ms| 否| 1000|
44+ | allReplace| true:新值替换旧值| 否| false|
45+ | updateMode| APPEND:不回撤数据,只下发增量数据,UPSERT:先删除回撤数据,然后更新| 否||
46+
3947
4048## 5.样例:
49+
50+ 回溯流删除
4151```
52+
53+ CREATE TABLE source1 (
54+ id int,
55+ name VARCHAR
56+ )WITH(
57+ type ='kafka11',
58+ bootstrapServers ='172.16.8.107:9092',
59+ zookeeperQuorum ='172.16.8.107:2181/kafka',
60+ offsetReset ='latest',
61+ topic ='mqTest03',
62+ timezone='Asia/Shanghai',
63+ topicIsPattern ='false'
64+ );
65+
66+
67+ CREATE TABLE source2(
68+ id int,
69+ address VARCHAR
70+ )WITH(
71+ type ='kafka11',
72+ bootstrapServers ='172.16.8.107:9092',
73+ zookeeperQuorum ='172.16.8.107:2181/kafka',
74+ offsetReset ='latest',
75+ topic ='mqTest04',
76+ timezone='Asia/Shanghai',
77+ topicIsPattern ='false'
78+ );
79+
80+
4281CREATE TABLE MyResult(
43- channel VARCHAR,
44- pv VARCHAR
45- )WITH(
46- type ='sqlserver',
47- url ='jdbc:jtds:sqlserver://172.16.8.104:1433;DatabaseName=mytest',
48- userName ='dtstack',
49- password ='abc123',
50- tableName ='pv2',
51- parallelism ='1'
52- )
53- ```
82+ id int,
83+ name VARCHAR,
84+ address VARCHAR,
85+ primary key (id)
86+ )WITH(
87+ type='sqlserver',
88+ url='jdbc:jtds:sqlserver://172.16.8.149:1433;DatabaseName=DTstack',
89+ userName='sa',
90+ password='Dtstack2018',
91+ tableName='user',
92+ schema = 'aaa',
93+ updateMode = 'upsert',
94+ batchSize = '1'
95+ );
96+
97+ insert into MyResult
98+ select
99+ s1.id,
100+ s1.name,
101+ s2.address
102+ from
103+ source1 s1
104+ left join
105+ source2 s2
106+ on
107+ s1.id = s2.id
108+
109+ ```
110+
111+ 数据结果:
112+
113+ 向Topic mqTest03 发送数据 {"name":"maqi","id":1001} 插入 (1001,"maqi",null)
114+
115+ 向Topic mqTest04 发送数据 {"address":"hz","id":1001} 删除 (1001,"maqi",null) 插入 (1001,"maqi","hz")
0 commit comments