import json
from confluent_kafka import Consumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import MessageField, SerializationContext
# Configuration for Kafka Consumer
consumer_config = {
"bootstrap.servers": "<bootstrap-server-url>", # Your Kafka bootstrap server
"group.id": "<consumer-group-1>", # Your consumer group id
"auto.offset.reset": "earliest",
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "PLAIN",
"sasl.username": "<USERNAME>", # Your Confluent API key
"sasl.password": "<API-SECRET>", # Your Confluent API secret
}
# Configuration for Schema Registry
schema_registry_config = {
"url": "<schema-registry-url>", # Your Schema Registry URL
"basic.auth.user.info": "<username-auth:password-auth>" # Your Schema Registry API key and secret,
}
# Topic to consume from
topics = ["topic1", "topic2"]
schema_registry_client = SchemaRegistryClient(schema_registry_config)
avro_deserializer = AvroDeserializer(schema_registry_client)
consumer = Consumer(consumer_config)
consumer.subscribe(topics)
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print(f"Consumer error: {msg.error()}")
continue
ctx = SerializationContext(msg.topic(), MessageField.VALUE)
deserialized_value = avro_deserializer(msg.value(), ctx)
print(f"{msg.topic()} - {msg.key()}")
print(f"{json.dumps(deserialized_value, indent=2)}")
except KeyboardInterrupt:
pass
finally:
# Close the consumer
consumer.close()