Documentation Index
Fetch the complete documentation index at: https://docs.sonarx.com/llms.txt
Use this file to discover all available pages before exploring further.
SonarX Kafka
SonarX Kafka enables teams to consume real-time blockchain data via Kafka topics, delivering on-chain events seconds from the tip with the broadest chain coverage in the industry. It is designed for applications that require low-latency streaming, high throughput, and seamless integration into existing data pipelines.
By leveraging Kafka, SonarX Kafka fits naturally into modern event-driven architectures, allowing you to process, enrich, and persist blockchain data in real time.
Key Capabilities
- Seconds-from-the-tip latency for on-chain events
- Broad multi-chain coverage across leading blockchains
- Kafka-native delivery for scalable, fault-tolerant streaming
- Schema-consistent messages for reliable downstream processing
- Easy integration with data warehouses, stream processors, and analytics platforms
Common Use Cases
- Real-time analytics and dashboards
- Alerts and monitoring for on-chain activity
- Indexing and enrichment pipelines
- Feeding data warehouses and lakehouses
- Powering low-latency applications and APIs
Python Integration Example
Below is a simple example using confluent-kafka to consume SonarX Kafka topics in Python.
import json
from confluent_kafka import Consumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import MessageField, SerializationContext
# Configuration for Kafka Consumer
consumer_config = {
"bootstrap.servers": "<bootstrap-server-url>", # Your Kafka bootstrap server
"group.id": "<consumer-group-1>", # Your consumer group id
"auto.offset.reset": "earliest",
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "PLAIN",
"sasl.username": "<USERNAME>", # Your Confluent API key
"sasl.password": "<API-SECRET>", # Your Confluent API secret
}
# Configuration for Schema Registry
schema_registry_config = {
"url": "<schema-registry-url>", # Your Schema Registry URL
"basic.auth.user.info": "<username-auth:password-auth>" # Your Schema Registry API key and secret,
}
# Topic to consume from
topics = ["topic1", "topic2"]
schema_registry_client = SchemaRegistryClient(schema_registry_config)
avro_deserializer = AvroDeserializer(schema_registry_client)
consumer = Consumer(consumer_config)
consumer.subscribe(topics)
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print(f"Consumer error: {msg.error()}")
continue
ctx = SerializationContext(msg.topic(), MessageField.VALUE)
deserialized_value = avro_deserializer(msg.value(), ctx)
print(f"{msg.topic()} - {msg.key()}")
print(f"{json.dumps(deserialized_value, indent=2)}")
except KeyboardInterrupt:
pass
finally:
# Close the consumer
consumer.close()
This consumer listens to a SonarX real-time topic and processes blockchain events as they are published.
Getting Started
- Request access to SonarX Real-Time Data
- Receive Kafka broker credentials and topic details
- Connect using your preferred Kafka client
- Start streaming blockchain data in real time
SonarX Kafka makes it easy to build fast, reliable, and scalable blockchain-powered systems using the tools your team already knows.