@@ -38,18 +38,18 @@ class KafkaMessage(typing.NamedTuple):
3838 The dictionary of message headers key/values.
3939 """
4040
41- key : typing .Optional [typing .Union [ bytes , str ] ]
41+ key : typing .Optional [typing .AnyStr ]
4242 """
4343 The message key.
4444 """
4545
46- value : typing .Union [ bytes , str ]
46+ value : typing .AnyStr
4747 """
4848 The message value.
4949 """
5050
5151
52- KeyMapper = typing .Callable [[AnyCloudEvent ], typing .Union [ bytes , str ] ]
52+ KeyMapper = typing .Callable [[AnyCloudEvent ], typing .AnyStr ]
5353"""
5454A callable function that creates a Kafka message key, given a CloudEvent instance.
5555"""
@@ -174,7 +174,7 @@ def to_structured(
174174 f"Failed to map message key with error: { type (e ).__name__ } ('{ e } ')"
175175 )
176176
177- attrs = event .get_attributes (). copy ( )
177+ attrs : dict [ str , typing . Any ] = dict ( event .get_attributes ())
178178
179179 try :
180180 data = data_marshaller (event .data )
@@ -208,7 +208,7 @@ def from_structured(
208208 message : KafkaMessage ,
209209 event_type : typing .Optional [typing .Type [AnyCloudEvent ]] = None ,
210210 data_unmarshaller : typing .Optional [types .MarshallerType ] = None ,
211- envelope_unmarshaller : typing .Optional [types .MarshallerType ] = None ,
211+ envelope_unmarshaller : typing .Optional [types .UnmarshallerType ] = None ,
212212) -> AnyCloudEvent :
213213 """
214214 Returns a CloudEvent from a KafkaMessage in structured format.
@@ -232,20 +232,20 @@ def from_structured(
232232 "Failed to unmarshall message with error: " f"{ type (e ).__name__ } ('{ e } ')"
233233 )
234234
235- attributes = {}
235+ attributes : dict [ str , typing . Any ] = {}
236236 if message .key is not None :
237237 attributes ["partitionkey" ] = message .key
238238
239+ data : typing .Optional [typing .Any ] = None
239240 for name , value in structure .items ():
240- decoder = lambda x : x
241- if name == "data" :
242- decoder = lambda v : data_unmarshaller (v )
243- if name == "data_base64" :
244- decoder = lambda v : data_unmarshaller (base64 .b64decode (v ))
245- name = "data"
246-
247241 try :
248- decoded_value = decoder (value )
242+ if name == "data" :
243+ decoded_value = data_unmarshaller (value )
244+ elif name == "data_base64" :
245+ decoded_value = data_unmarshaller (base64 .b64decode (value ))
246+ name = "data"
247+ else :
248+ decoded_value = value
249249 except Exception as e :
250250 raise cloud_exceptions .DataUnmarshallerError (
251251 "Failed to unmarshall data with error: " f"{ type (e ).__name__ } ('{ e } ')"
0 commit comments