Derick
1236 words
6 minutes
监听事件 —— 让你的程序实时感知区块链的脉搏

“查询让我们理解过去,

监听让我们捕捉现在。”


一、为什么要监听区块链事件?#

在区块链世界中,每一笔交易、每一次合约调用、每一个新区块的产生

都代表着状态的改变

如果我们只依赖 HTTP 方式去轮询(polling):

for {
    latestBlock, _ := client.BlockByNumber(ctx, nil)
    fmt.Println(latestBlock.Number())
    time.Sleep(time.Second * 5)
}

——这虽然能工作,但效率极低、延迟高,还容易漏数据。

所以,Ethereum 节点提供了“推送式”的交互接口:

当区块或日志变化时,节点主动通知客户端。

这正是:

事件监听(Event Subscription)机制


二、两种监听通道:WSS 与 gRPC#

1. WebSocket 模式(最常用)#

特点:

  • 基于 eth_subscribe RPC 机制;
  • 通信协议是 wss://
  • 节点会主动推送事件;
  • 适用于 DApp 后端、监控、通知等。

典型 URL:

wss://mainnet.infura.io/ws/v3/<YOUR_PROJECT_ID>

2. gRPC 模式(底层流式接口)#

gRPC 模式是 go-ethereum 在 1.10+ 版本后提供的另一套接口,

它通过 Protocol Buffers 定义数据结构,

实现更高性能的实时流式订阅

特点:

  • 二进制通信(比 JSON-RPC 快 3~5 倍);
  • 支持原生流式调用(stream);
  • 适合高并发、低延迟的企业级系统;
  • 节点需以 -grpc 启动。

典型 URL:

grpc://127.0.0.1:8545

三、监听类型总览#

监听类型函数(go-ethereum)适用通道说明
新区块SubscribeNewHead()WSS/gRPC监听新产生的区块头
合约事件SubscribeFilterLogs()WSS/gRPC监听智能合约日志
待确认交易SubscribePendingTransactions()WSS监听交易池中的交易
同步状态SubscribeSyncing()WSS/gRPC监听节点同步进度
历史日志FilterLogs()HTTP/WSS拉取过去区块的事件

四、WSS 模式实战#

示例 1:监听新区块#

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/ethereum/go-ethereum/core/types"
	"github.com/ethereum/go-ethereum/ethclient"
)

func main() {
	client, err := ethclient.Dial("wss://sepolia.infura.io/ws/v3/YOUR_PROJECT_ID")
	if err != nil {
		log.Fatalf("❌ 无法连接节点: %v", err)
	}
	defer client.Close()

	headers := make(chan *types.Header)
	sub, err := client.SubscribeNewHead(context.Background(), headers)
	if err != nil {
		log.Fatalf("❌ 订阅失败: %v", err)
	}

	fmt.Println("✅ 正在监听新区块事件...")
	for {
		select {
		case err := <-sub.Err():
			log.Println("⚠️ 订阅错误:", err)
		case header := <-headers:
			fmt.Printf("⛓️ 新区块 #%v | 哈希: %s\n", header.Number, header.Hash())
		}
	}
}

输出示例:

✅ 正在监听新区块事件...
⛓️ 新区块 #5862145 | 哈希: 0x9e1234...
⛓️ 新区块 #5862146 | 哈希: 0xbca987...

示例 2:监听合约事件(Transfer)#

package main

import (
	"context"
	"fmt"
	"log"
	"math/big"
	"strings"

	"github.com/ethereum/go-ethereum"
	"github.com/ethereum/go-ethereum/accounts/abi"
	"github.com/ethereum/go-ethereum/common"
	"github.com/ethereum/go-ethereum/core/types"
	"github.com/ethereum/go-ethereum/ethclient"
)

func main() {
	client, err := ethclient.Dial("wss://sepolia.infura.io/ws/v3/YOUR_PROJECT_ID")
	if err != nil {
		log.Fatalf("❌ 连接节点失败: %v", err)
	}

	// ERC20 合约地址
	tokenAddress := common.HexToAddress("0xYourTokenAddress")

	query := ethereum.FilterQuery{
		Addresses: []common.Address{tokenAddress},
	}

	logsCh := make(chan types.Log)
	sub, err := client.SubscribeFilterLogs(context.Background(), query, logsCh)
	if err != nil {
		log.Fatalf("❌ 订阅失败: %v", err)
	}

	tokenAbi, _ := abi.JSON(strings.NewReader(`[{"anonymous":false,"inputs":[
		{"indexed":true,"name":"from","type":"address"},
		{"indexed":true,"name":"to","type":"address"},
		{"indexed":false,"name":"value","type":"uint256"}
	],"name":"Transfer","type":"event"}]`))

	fmt.Println("🎧 已开始监听 ERC20 Transfer 事件...")

	for {
		select {
		case err := <-sub.Err():
			log.Println("⚠️ 错误:", err)
		case vLog := <-logsCh:
			var transfer struct {
				From  common.Address
				To    common.Address
				Value *big.Int
			}

			tokenAbi.UnpackIntoInterface(&transfer, "Transfer", vLog.Data)
			transfer.From = common.HexToAddress(vLog.Topics[1].Hex())
			transfer.To = common.HexToAddress(vLog.Topics[2].Hex())

			fmt.Printf("\n📦 区块: %d\nFrom: %s\nTo: %s\nValue: %s\n",
				vLog.BlockNumber, transfer.From.Hex(), transfer.To.Hex(), transfer.Value.String())
		}
	}
}

输出示例:

🎧 已开始监听 ERC20 Transfer 事件...

📦 区块: 5439812
From: 0x23dE...B45C
To:   0x91a0...CCfA
Value: 1000000000000000000

五、gRPC 模式:更底层的流式监听#

以太坊的 gRPC 接口定义在 go-ethereum 的 proto 文件中:

interfaces/grpc.proto

通过 protoc 生成的接口可直接被 Go 调用。

你可以使用官方客户端包 github.com/ethereum/go-ethereum/rpcDialContext 的方式连接 gRPC 节点。

1. 节点启动参数#

如果你运行的是本地 geth 节点,可以这样启动:

geth --grpc --grpc.addr 0.0.0.0 --grpc.port 8547

2. 连接 gRPC 通道#

import (
	"context"
	"log"

	"github.com/ethereum/go-ethereum/rpc"
)

func main() {
	client, err := rpc.DialContext(context.Background(), "grpc://127.0.0.1:8547")
	if err != nil {
		log.Fatalf("❌ 无法连接 gRPC 节点: %v", err)
	}
	defer client.Close()

	log.Println("✅ 已连接到 gRPC 节点")
}

3. 通过 gRPC 订阅新区块#

在 gRPC 模式下,所有的订阅事件都是通过流(stream)进行的。

以下示例展示如何监听新区块头(NewHeads):

import (
	"context"
	"fmt"
	"log"

	"github.com/ethereum/go-ethereum/core/types"
	"github.com/ethereum/go-ethereum/rpc"
)

func main() {
	client, err := rpc.DialContext(context.Background(), "grpc://127.0.0.1:8547")
	if err != nil {
		log.Fatalf("❌ gRPC 连接失败: %v", err)
	}
	defer client.Close()

	headers := make(chan *types.Header)
	sub, err := client.EthSubscribe(context.Background(), headers, "newHeads")
	if err != nil {
		log.Fatalf("❌ 订阅失败: %v", err)
	}

	fmt.Println("🎧 正在通过 gRPC 监听新区块...")

	for {
		select {
		case err := <-sub.Err():
			log.Println("⚠️ 订阅错误:", err)
		case h := <-headers:
			fmt.Printf("⛓️ 新区块 #%v | 哈希: %s\n", h.Number, h.Hash())
		}
	}
}

输出示例:

🎧 正在通过 gRPC 监听新区块...
⛓️ 新区块 #8923121 | 哈希: 0x42b2f...
⛓️ 新区块 #8923122 | 哈希: 0x5cc99...

4. gRPC 模式监听日志事件#

与 WSS 类似,只是连接方式不同:

sub, err := client.EthSubscribe(context.Background(), logsCh, "logs", map[string]interface{}{
	"address": []string{"0xYourContract"},
})

一旦事件被触发,客户端会收到结构化的 types.Log 对象。

六、底层机制:事件是如何被推送的?#

  1. 节点执行交易 → 生成日志(Log)和收据(Receipt);
  2. 新区块被打包并广播;
  3. 订阅服务检测到匹配事件(通过 FilterQuery);
  4. 节点将事件推送给客户端通道;
  5. 客户端在通道中读取事件流,实时处理。

八、小结与思考#

关键点说明
ethclient.SubscribeNewHead监听新区块
ethclient.SubscribeFilterLogs监听合约事件
rpc.DialContext("grpc://...")通过 gRPC 通道连接节点
EthSubscribe()通用订阅接口
gRPC 模式更高效的事件流通信方式
WebSocket 模式最兼容的通用监听方式
监听事件 —— 让你的程序实时感知区块链的脉搏
https://blog.ithuo.net/posts/blockchain-tutorial-evm-5/
Author
Derick
Published at
2024-12-08