1- # 一、json格式数据源
21## 1.格式:
32```
43数据现在支持json格式{"xx":"bb","cc":"dd"}
@@ -22,7 +21,7 @@ CREATE TABLE tableName(
2221```
2322
2423## 2.支持的版本
25- kafka08,kafka09,kafka10,kafka11
24+ kafka08,kafka09,kafka10,kafka11及以上版本
2625 ** kafka读取和写入的版本必须一致,否则会有兼容性错误。**
2726
2827## 3.表结构定义
@@ -71,6 +70,75 @@ CREATE TABLE MyTable(
7170 sourcedatatype ='json' #可不设置
7271 );
7372```
73+ ## 6.支持嵌套json、数据类型字段解析
74+
75+ 嵌套json解析示例
76+
77+ json: {"name":"tom", "obj":{"channel": "root"}, "pv": 4, "xctime":1572932485}
78+ ```
79+ CREATE TABLE MyTable(
80+ name varchar,
81+ obj.channel varchar as channel,
82+ pv INT,
83+ xctime bigint,
84+ CHARACTER_LENGTH(channel) AS timeLeng
85+ )WITH(
86+ type ='kafka09',
87+ bootstrapServers ='172.16.8.198:9092',
88+ zookeeperQuorum ='172.16.8.198:2181/kafka',
89+ offsetReset ='latest',
90+ groupId='nbTest',
91+ topic ='nbTest1,nbTest2,nbTest3',
92+ --- topic ='mqTest.*',
93+ ---topicIsPattern='true',
94+ parallelism ='1'
95+ );
96+ ```
97+
98+ 数组类型字段解析示例
99+
100+ json: {"name":"tom", "obj":{"channel": "root"}, "user": [ {"pv": 4}, {"pv": 10}] , "xctime":1572932485}
101+ ```
102+ CREATE TABLE MyTable(
103+ name varchar,
104+ obj.channel varchar as channel,
105+ user[1].pv INT as pv,
106+ xctime bigint,
107+ CHARACTER_LENGTH(channel) AS timeLeng
108+ )WITH(
109+ type ='kafka09',
110+ bootstrapServers ='172.16.8.198:9092',
111+ zookeeperQuorum ='172.16.8.198:2181/kafka',
112+ offsetReset ='latest',
113+ groupId='nbTest',
114+ topic ='nbTest1,nbTest2,nbTest3',
115+ --- topic ='mqTest.*',
116+ ---topicIsPattern='true',
117+ parallelism ='1'
118+ );
119+ ```
120+ or
121+
122+ json: {"name":"tom", "obj":{"channel": "root"}, "pv": [ 4, 7, 10] , "xctime":1572932485}
123+ ```
124+ CREATE TABLE MyTable(
125+ name varchar,
126+ obj.channel varchar as channel,
127+ pv[1] INT as pv,
128+ xctime bigint,
129+ CHARACTER_LENGTH(channel) AS timeLeng
130+ )WITH(
131+ type ='kafka09',
132+ bootstrapServers ='172.16.8.198:9092',
133+ zookeeperQuorum ='172.16.8.198:2181/kafka',
134+ offsetReset ='latest',
135+ groupId='nbTest',
136+ topic ='nbTest1,nbTest2,nbTest3',
137+ --- topic ='mqTest.*',
138+ ---topicIsPattern='true',
139+ parallelism ='1'
140+ );
141+ ```
74142# 二、csv格式数据源
75143根据字段分隔符进行数据分隔,按顺序匹配sql中配置的列。如数据分隔列数和sql中配置的列数相等直接匹配;如不同参照lengthcheckpolicy策略处理。
76144## 1.参数:
0 commit comments