Skip to main content

SonarX Kafka

SonarX Kafka enables teams to consume real-time blockchain data via Kafka topics, delivering on-chain events seconds from the tip with the broadest chain coverage in the industry. It is designed for applications that require low-latency streaming, high throughput, and seamless integration into existing data pipelines. By leveraging Kafka, SonarX Kafka fits naturally into modern event-driven architectures, allowing you to process, enrich, and persist blockchain data in real time.

Key Capabilities

  • Seconds-from-the-tip latency for on-chain events
  • Broad multi-chain coverage across leading blockchains
  • Kafka-native delivery for scalable, fault-tolerant streaming
  • Schema-consistent messages for reliable downstream processing
  • Easy integration with data warehouses, stream processors, and analytics platforms

Common Use Cases

  • Real-time analytics and dashboards
  • Alerts and monitoring for on-chain activity
  • Indexing and enrichment pipelines
  • Feeding data warehouses and lakehouses
  • Powering low-latency applications and APIs

Python Integration Example

Below is a simple example using confluent-kafka to consume SonarX Kafka topics in Python.
import json

from confluent_kafka import Consumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import MessageField, SerializationContext

# Configuration for Kafka Consumer
consumer_config = {
    "bootstrap.servers": "<bootstrap-server-url>",  # Your Kafka bootstrap server
    "group.id": "<consumer-group-1>", # Your consumer group id
    "auto.offset.reset": "earliest",
    "security.protocol": "SASL_SSL",
    "sasl.mechanisms": "PLAIN",
    "sasl.username": "<USERNAME>",  # Your Confluent API key
    "sasl.password": "<API-SECRET>",  # Your Confluent API secret
}

# Configuration for Schema Registry
schema_registry_config = {
    "url": "<schema-registry-url>",  # Your Schema Registry URL
    "basic.auth.user.info": "<username-auth:password-auth>"  # Your Schema Registry API key and secret,
}


# Topic to consume from
topics = ["topic1", "topic2"]


schema_registry_client = SchemaRegistryClient(schema_registry_config)
avro_deserializer = AvroDeserializer(schema_registry_client)
consumer = Consumer(consumer_config)
consumer.subscribe(topics)

try:
    while True:
        msg = consumer.poll(1.0)

        if msg is None:
            continue

        if msg.error():
            print(f"Consumer error: {msg.error()}")
            continue

        ctx = SerializationContext(msg.topic(), MessageField.VALUE)
        deserialized_value = avro_deserializer(msg.value(), ctx)
        print(f"{msg.topic()} - {msg.key()}")
        print(f"{json.dumps(deserialized_value, indent=2)}")

except KeyboardInterrupt:
    pass
finally:
    # Close the consumer
    consumer.close()
This consumer listens to a SonarX real-time topic and processes blockchain events as they are published.

Getting Started

  1. Request access to SonarX Real-Time Data
  2. Receive Kafka broker credentials and topic details
  3. Connect using your preferred Kafka client
  4. Start streaming blockchain data in real time
SonarX Kafka makes it easy to build fast, reliable, and scalable blockchain-powered systems using the tools your team already knows.