Skip to content

Commit c2bd394

Browse files
dazumaclaude
andauthored
feat: Added an implementation of Kafka Binding (#114)
Signed-off-by: Daniel Azuma <dazuma@gmail.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 7d113f5 commit c2bd394

6 files changed

Lines changed: 986 additions & 6 deletions

File tree

CLAUDE.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,19 @@ Handles encoding/decoding CloudEvents to/from HTTP (Rack env hashes). Supports:
5959

6060
`HttpBinding.default` returns a singleton with `JsonFormat` and `TextFormat` pre-registered. Custom formatters are registered via `register_formatter`.
6161

62+
### Kafka Binding (`KafkaBinding`)
63+
64+
Handles encoding/decoding CloudEvents to/from Kafka messages. CloudEvents 1.0 only (no V0.3). Supports:
65+
- **Binary content mode** — event attributes as `ce_*` headers (plain UTF-8, no percent-encoding), data in value
66+
- **Structured content mode** — entire event serialized in value (e.g., JSON)
67+
- **No batch mode** (per the Kafka spec)
68+
- **Tombstone support**`nil` value represents an event with no data
69+
- **Key mapping** — configurable callables to map between Kafka record keys and event attributes (default: `partitionkey` extension)
70+
71+
Kafka messages are represented as plain `{ key:, value:, headers: }` Hashes, decoupled from any specific Kafka client library.
72+
73+
`KafkaBinding.default` returns a singleton with `JsonFormat` and `TextFormat` pre-registered. Key mappers are configurable at construction and overridable per-call via `key_mapper:` / `reverse_key_mapper:` keyword arguments.
74+
6275
### Format Layer
6376

6477
- **`JsonFormat`** — Encodes/decodes `application/cloudevents+json` and batch format. Also handles JSON data encoding/decoding for binary mode.
@@ -77,6 +90,7 @@ All errors inherit from `CloudEventsError`: `NotCloudEventError`, `UnsupportedFo
7790

7891
## Contributing
7992

93+
- Use red-green test-driven development when making changes, unless instructed otherwise.
8094
- Conventional Commits format required (`fix:`, `feat:`, `docs:`, etc.)
8195
- Commits must be signed off (`git commit --signoff`)
8296
- Run `toys ci` before submitting PRs

features/step_definitions/steps.rb

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,17 +50,21 @@
5050
end
5151

5252
Given "Kafka Protocol Binding is supported" do
53-
pending "Kafka Protocol Binding is not yet implemented"
53+
@kafka_binding = CloudEvents::KafkaBinding.default
5454
end
5555

56-
Given "a Kafka message with payload:" do |_str|
57-
pending
56+
Given "a Kafka message with payload:" do |str|
57+
@kafka_value = str
5858
end
5959

60-
Given "Kafka headers:" do |_table|
61-
pending
60+
Given "Kafka headers:" do |table|
61+
@kafka_headers = {}
62+
table.hashes.each do |hash|
63+
@kafka_headers[hash["key"].strip] = hash["value"]
64+
end
6265
end
6366

6467
When "parsed as Kafka message" do
65-
pending
68+
message = { key: nil, value: @kafka_value, headers: @kafka_headers }
69+
@event = @kafka_binding.decode_event(message, reverse_key_mapper: nil)
6670
end

lib/cloud_events.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
require "cloud_events/event"
66
require "cloud_events/format"
77
require "cloud_events/http_binding"
8+
require "cloud_events/kafka_binding"
89
require "cloud_events/json_format"
910
require "cloud_events/text_format"
1011

lib/cloud_events/errors.rb

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,14 @@ class SpecVersionError < CloudEventsError
5151
class AttributeError < CloudEventsError
5252
end
5353

54+
##
55+
# An error raised when a protocol binding that does not support batch
56+
# content mode receives a batch. For example, the Kafka protocol binding
57+
# does not support batches per the CloudEvents spec.
58+
#
59+
class BatchNotSupportedError < CloudEventsError
60+
end
61+
5462
##
5563
# Alias of UnsupportedFormatError, for backward compatibility.
5664
#

0 commit comments

Comments
 (0)