File tree Expand file tree Collapse file tree
kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka Expand file tree Collapse file tree Original file line number Diff line number Diff line change 22
33
44import com .dtstack .flink .sql .format .SerializationMetricWrapper ;
5- import org .apache .flink .api .common .functions .RuntimeContext ;
65import org .apache .flink .api .common .serialization .SerializationSchema ;
7- import org .apache .flink .formats .json .JsonRowSchemaConverter ;
86import org .apache .flink .formats .json .JsonRowSerializationSchema ;
9- import org .apache .flink .metrics .Counter ;
10- import org .apache .flink .metrics .Meter ;
117import org .apache .flink .shaded .jackson2 .com .fasterxml .jackson .databind .ObjectMapper ;
8+ import org .apache .flink .shaded .jackson2 .com .fasterxml .jackson .databind .node .ObjectNode ;
129import org .apache .flink .streaming .util .serialization .KeyedSerializationSchema ;
1310import org .apache .flink .types .Row ;
14- import org .apache .flink .shaded .jackson2 .com .fasterxml .jackson .databind .node .ObjectNode ;
15- import org .slf4j .Logger ;
16- import org .slf4j .LoggerFactory ;
1711
1812public class CustomerKeyedSerializationSchema implements KeyedSerializationSchema <Row > {
1913
20- private Logger log = LoggerFactory .getLogger (getClass ());
21-
2214 private static final long serialVersionUID = 1L ;
2315 private final SerializationMetricWrapper serializationMetricWrapper ;
2416 private String [] partitionKeys ;
@@ -61,7 +53,7 @@ private byte[] serializeJsonKey(JsonRowSerializationSchema jsonRowSerializationS
6153 }
6254 return sb .toString ().getBytes ();
6355 }catch (Exception e ){
64- log . error ( "serializeJsonKey error" , e );
56+
6557 }
6658 return null ;
6759
You can’t perform that action at this time.
0 commit comments