Skip to main content

KAFKA::CONSUMER

Consume messages from a Kafka topic.

Syntax

actions:
<action-id>:
type: KAFKA::CONSUMER
props:
topic: <topic_name>
numOfMessages: <number_of_messages>
seekToOffset: <offset>
partition: <partition_number>
timeout: <timeout_in_milliseconds>

Properties

PropertyDescriptionType
<action-id>Unique identifier for the actionstring
typeType of the actionstring
propsProperties specific to this action. Refer to props section for more details.object

props

PropertyDescriptionTypeRequired
topicThe Kafka topic from which messages are to be consumed.stringtrue
numOfMessagesThe number of messages to consume.numbertrue
seekToOffsetThe offset from which to start consuming messages.number
partitionThe partition from which to consume messages.number
timeoutTimeout for consuming messages in milliseconds. Default value is 3000 ms.number

Example

kafkaConsumerTest.yml
actions:
publish:
type: KAFKA::CONSUMER
props:
topic: example_topic
value: "Hello, World!"

tests:
kafkaConsumerTest:
tasks:
- id: task1
resource: resources.kafka
action: actions.publish

Output

📝 Note: Can be accessed using context["last_output"]

KeyDescriptionType
messagesThe messages consumed from the Kafka topic.array
timeTakenTime taken for the consume operation, in milliseconds.long

Example

{
"messages": [
// ... consumed messages ...
],
"timeTaken": 5002
}