Kafka Web3 Source Connector — Publish Web3 compatible Blockchain data to Kafka topics

Satya
4 min readJul 16, 2020

In this post, I am going to explain how to retrieve Blocks / Transactions / Events data from any Web3 compatible blockchain (Ethereum, Aion, …) and publish to Kafka topics in real-time.

Kafka Connectors ?

Kafka Connectors are ready to use components, which can be used to import data from external systems to Kafka topics and export data from Kafka topics to external systems. Kafka Connectors are written using Kafka Connect framework, which is a set of Java Api provided by Kafka to write a connector.

There are two types of Kafka connectors :

  • Source Connector : To collect data from an external system and publish into a Kafka topic
  • Sink Connector : To deliver data from a Kafka topic to an external system

Most of the time, you don’t need to write your own connector. You can check at Confluent Hub if a connector available for your external system.

Kafka Web3 Source Connector

I will go through a Source Connector implementation, “Kafka Web3 Source Connector”. The source for this project can be found at https://github.com/satran004/kafka-web3-connector .

You may want to check the source code of this project to know how to write your own Kafka connector.

This source connector implementation can read blockchain data from a Web3 Jsonrpc endpoint and publish Blocks / Transactions / Events data to any Kafka topic.

There are two source connector implementations in this project:

  1. Block Source Connector : com.bloxbean.kafka.connectors.web3.source.blocks.BlockSourceConnector
  2. Event Logs Connector : com.bloxbean.kafka.connectors.web3.source.events.EventSourceConnector

Block Source Connector

You can use “Block Source Connector” to fetch blocks or transactions data from a web3 jsonrpc endpoint of a blockchain (Example: Ethereum, Aion) and publish the json data to a topic.

It supports two scenarios:

  • Publish Blocks with Transaction data to one Kafka topic
  • Publish Blocks with transaction ids to one topic and transaction details to another topic.

Event Logs Connector

This source connector can be used to retrieve smart contract event logs from a web3 compatible blockchain through it’s web3 jsonrpc endpoint and publish to Kafka topics.

It supports following scenarios

  • Publish all event logs to a Kafka topic
  • Apply event log filters and publish only matching event logs to a Kafka topic

You can use these connectors both in standalone and distributed mode.

For simplicity, I am providing the configuration for standalone mode in this post. For distributed mode configuration, you can refer to the “config” folder of this project.

How to Run In Standalone Mode From Source

> Check out the source from https://github.com/satran004/kafka-web3-connector

> Install Java 1.8 (Minimum) , Maven, Kafka on your machine

> Start Kafka server

Compile and build the package

$> mvn clean package

Run Block Source connector to fetch blocks from web3 json rpc endpoint

You can find a sample configuration for Block Connector in standalone mode under “config” folder. Here’s a sample configuration to fetch blocks from Aion blockchain. Similar configuration can be found for Ethereum blockchain under the same folder.

  • Replace proper value for web3_rpc_url, start_block.
  • If you want to publish blocks and transactions in to separate topics, update topic & transaction_topic properties accordingly. Otherwise, you can comment transaction_topic property.

Edit “config/connector-web3-blocks-source.properties

###### connector-web3-blocks-source.properties   ######name=bloxbean-web3-source-connectorconnector.class=com.bloxbean.kafka.connectors.web3.source.blocks.BlockSourceConnectortasks.max=1web3_rpc_url=http://<host>:8545topic=aion-blocks#To publish transactions with blocks, comment the below line. #Otherwise, transactions will be published to the following topic
transaction_topic=aion-transactions
#Comma separated list of ignored fields from Block object.
ignore_block_fields=logsBloom,extraData
#Comma separated ist of ignored field from Transaction object. #Supported options: input
ignore_transaction_fields=input

start_block=4721700
block_time=10
no_of_blocks_for_finality=0

To run,

> Set KAFKA_HOME environment variable

$> export KAFKA_HOME=<path_to_kafka_installation_folder>$> $KAFKA_HOME/bin/connect-standalone.sh config/connect-standalone.properties config/connector-web3-blocks-source.properties

If everything is working as expected, the connector will start fetching blocks in real-time and you should be seeing similar logs

[2020-07-16 12:46:32,467] INFO WorkerSourceTask{id=bloxbean-web3-source-connector-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:200)
[2020-07-16 12:46:32,537] INFO Cluster ID: ri4jhQhdTCuB7RuCMLjbhQ (org.apache.kafka.clients.Metadata:365)
[2020-07-16 12:46:32,928] INFO Successfully fetched block : 4729045 (com.bloxbean.kafka.connectors.web3.source.blocks.BlockSourceTask:94)
[2020-07-16 12:46:32,983] INFO Successfully fetched block : 4729046 (com.bloxbean.kafka.connectors.web3.source.blocks.BlockSourceTask:94)

Now check the kafka topic for published messages

$> $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic aion-blocks --from-beginning

Note: Make sure the topic name in the above command is same as the topic name mentioned in the connector configuration file.

Run Event Logs Source connector to fetch Event Logs from web3 json rpc endpoint

You can find a sample configuration for Event Log Connector in standalone mode under “config” folder. Here’s a sample configuration to fetch events from Aion blockchain. Similar configuration can be found for Ethereum blockchain under the same folder.

The below configuration fetches all delegation events from Aion Staking Pool Registry contract. Check event_logs_filter_address & event_logs_filter_topics properties.

  • You can customize event_logs_filter_* values according to your requirement.
  • Provide value for web3_rpc_url property.
  • Update topic, start_block property if required.

Edit “config/connector-web3-blocks-source.properties

name=bloxbean-web3-events-source-connector
connector.class=com.bloxbean.kafka.connectors.web3.source.events.EventSourceConnector
tasks.max=1
web3_rpc_url=http://<host>:8545topic=web3-eventsstart_block=6117319
block_time=10
no_of_blocks_for_finality=0

####################################################################################
# The following options are only applicable for EventsSourceConnector
####################################################################################
event_logs_filter_addresses=0xa008e42a76e2e779175c589efdb2a0e742b40d8d421df2b93a8a0b13090c7cc8
event_logs_filter_topics=0x41445344656c6567617465640000000000000000000000000000000000000000

####################################################################################
# Target kafka topic's key
# Comma separated list of following options
# Options: blockNumber, logIndex, address, topic, transactionHash, transactionIndex
# Default: transactionHash,logIndex
####################################################################################
#event_logs_kafka_keys=

To run,

> Set KAFKA_HOME environment variable

$> export KAFKA_HOME=<path_to_kafka_installation_folder>$> $KAFKA_HOME/bin/connect-standalone.sh config/connect-standalone.properties config/connector-web3-events-source.properties

Subscribe to the event Kafka topic to see the fetched event log data in real-time.

Run the connector in distributed mode

To run the Kafka web3 source connectors in distributed mode, you can download the connector jar from the release section of the project or from target folder (if you are building from source).

https://github.com/satran004/kafka-web3-connector/releases/

Links :

--

--