1236 words
6 minutes
监听事件 —— 让你的程序实时感知区块链的脉搏
“查询让我们理解过去,
监听让我们捕捉现在。”
一、为什么要监听区块链事件?
在区块链世界中,每一笔交易、每一次合约调用、每一个新区块的产生
都代表着状态的改变。
如果我们只依赖 HTTP 方式去轮询(polling):
for {
latestBlock, _ := client.BlockByNumber(ctx, nil)
fmt.Println(latestBlock.Number())
time.Sleep(time.Second * 5)
}
——这虽然能工作,但效率极低、延迟高,还容易漏数据。
所以,Ethereum 节点提供了“推送式”的交互接口:
当区块或日志变化时,节点主动通知客户端。
这正是:
事件监听(Event Subscription)机制
二、两种监听通道:WSS 与 gRPC
1. WebSocket 模式(最常用)
特点:
- 基于
eth_subscribeRPC 机制; - 通信协议是
wss://; - 节点会主动推送事件;
- 适用于 DApp 后端、监控、通知等。
典型 URL:
wss://mainnet.infura.io/ws/v3/<YOUR_PROJECT_ID>
2. gRPC 模式(底层流式接口)
gRPC 模式是 go-ethereum 在 1.10+ 版本后提供的另一套接口,
它通过 Protocol Buffers 定义数据结构,
实现更高性能的实时流式订阅。
特点:
- 二进制通信(比 JSON-RPC 快 3~5 倍);
- 支持原生流式调用(stream);
- 适合高并发、低延迟的企业级系统;
- 节点需以
-grpc启动。
典型 URL:
grpc://127.0.0.1:8545
三、监听类型总览
| 监听类型 | 函数(go-ethereum) | 适用通道 | 说明 |
|---|---|---|---|
| 新区块 | SubscribeNewHead() | WSS/gRPC | 监听新产生的区块头 |
| 合约事件 | SubscribeFilterLogs() | WSS/gRPC | 监听智能合约日志 |
| 待确认交易 | SubscribePendingTransactions() | WSS | 监听交易池中的交易 |
| 同步状态 | SubscribeSyncing() | WSS/gRPC | 监听节点同步进度 |
| 历史日志 | FilterLogs() | HTTP/WSS | 拉取过去区块的事件 |
四、WSS 模式实战
示例 1:监听新区块
package main
import (
"context"
"fmt"
"log"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
)
func main() {
client, err := ethclient.Dial("wss://sepolia.infura.io/ws/v3/YOUR_PROJECT_ID")
if err != nil {
log.Fatalf("❌ 无法连接节点: %v", err)
}
defer client.Close()
headers := make(chan *types.Header)
sub, err := client.SubscribeNewHead(context.Background(), headers)
if err != nil {
log.Fatalf("❌ 订阅失败: %v", err)
}
fmt.Println("✅ 正在监听新区块事件...")
for {
select {
case err := <-sub.Err():
log.Println("⚠️ 订阅错误:", err)
case header := <-headers:
fmt.Printf("⛓️ 新区块 #%v | 哈希: %s\n", header.Number, header.Hash())
}
}
}
输出示例:
✅ 正在监听新区块事件...
⛓️ 新区块 #5862145 | 哈希: 0x9e1234...
⛓️ 新区块 #5862146 | 哈希: 0xbca987...
示例 2:监听合约事件(Transfer)
package main
import (
"context"
"fmt"
"log"
"math/big"
"strings"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
)
func main() {
client, err := ethclient.Dial("wss://sepolia.infura.io/ws/v3/YOUR_PROJECT_ID")
if err != nil {
log.Fatalf("❌ 连接节点失败: %v", err)
}
// ERC20 合约地址
tokenAddress := common.HexToAddress("0xYourTokenAddress")
query := ethereum.FilterQuery{
Addresses: []common.Address{tokenAddress},
}
logsCh := make(chan types.Log)
sub, err := client.SubscribeFilterLogs(context.Background(), query, logsCh)
if err != nil {
log.Fatalf("❌ 订阅失败: %v", err)
}
tokenAbi, _ := abi.JSON(strings.NewReader(`[{"anonymous":false,"inputs":[
{"indexed":true,"name":"from","type":"address"},
{"indexed":true,"name":"to","type":"address"},
{"indexed":false,"name":"value","type":"uint256"}
],"name":"Transfer","type":"event"}]`))
fmt.Println("🎧 已开始监听 ERC20 Transfer 事件...")
for {
select {
case err := <-sub.Err():
log.Println("⚠️ 错误:", err)
case vLog := <-logsCh:
var transfer struct {
From common.Address
To common.Address
Value *big.Int
}
tokenAbi.UnpackIntoInterface(&transfer, "Transfer", vLog.Data)
transfer.From = common.HexToAddress(vLog.Topics[1].Hex())
transfer.To = common.HexToAddress(vLog.Topics[2].Hex())
fmt.Printf("\n📦 区块: %d\nFrom: %s\nTo: %s\nValue: %s\n",
vLog.BlockNumber, transfer.From.Hex(), transfer.To.Hex(), transfer.Value.String())
}
}
}
输出示例:
🎧 已开始监听 ERC20 Transfer 事件...
📦 区块: 5439812
From: 0x23dE...B45C
To: 0x91a0...CCfA
Value: 1000000000000000000
五、gRPC 模式:更底层的流式监听
以太坊的 gRPC 接口定义在 go-ethereum 的 proto 文件中:
interfaces/grpc.proto
通过 protoc 生成的接口可直接被 Go 调用。
你可以使用官方客户端包 github.com/ethereum/go-ethereum/rpc 以 DialContext 的方式连接 gRPC 节点。
1. 节点启动参数
如果你运行的是本地 geth 节点,可以这样启动:
geth --grpc --grpc.addr 0.0.0.0 --grpc.port 8547
2. 连接 gRPC 通道
import (
"context"
"log"
"github.com/ethereum/go-ethereum/rpc"
)
func main() {
client, err := rpc.DialContext(context.Background(), "grpc://127.0.0.1:8547")
if err != nil {
log.Fatalf("❌ 无法连接 gRPC 节点: %v", err)
}
defer client.Close()
log.Println("✅ 已连接到 gRPC 节点")
}
3. 通过 gRPC 订阅新区块
在 gRPC 模式下,所有的订阅事件都是通过流(stream)进行的。
以下示例展示如何监听新区块头(NewHeads):
import (
"context"
"fmt"
"log"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc"
)
func main() {
client, err := rpc.DialContext(context.Background(), "grpc://127.0.0.1:8547")
if err != nil {
log.Fatalf("❌ gRPC 连接失败: %v", err)
}
defer client.Close()
headers := make(chan *types.Header)
sub, err := client.EthSubscribe(context.Background(), headers, "newHeads")
if err != nil {
log.Fatalf("❌ 订阅失败: %v", err)
}
fmt.Println("🎧 正在通过 gRPC 监听新区块...")
for {
select {
case err := <-sub.Err():
log.Println("⚠️ 订阅错误:", err)
case h := <-headers:
fmt.Printf("⛓️ 新区块 #%v | 哈希: %s\n", h.Number, h.Hash())
}
}
}
输出示例:
🎧 正在通过 gRPC 监听新区块...
⛓️ 新区块 #8923121 | 哈希: 0x42b2f...
⛓️ 新区块 #8923122 | 哈希: 0x5cc99...
4. gRPC 模式监听日志事件
与 WSS 类似,只是连接方式不同:
sub, err := client.EthSubscribe(context.Background(), logsCh, "logs", map[string]interface{}{
"address": []string{"0xYourContract"},
})
一旦事件被触发,客户端会收到结构化的 types.Log 对象。
六、底层机制:事件是如何被推送的?
- 节点执行交易 → 生成日志(Log)和收据(Receipt);
- 新区块被打包并广播;
- 订阅服务检测到匹配事件(通过 FilterQuery);
- 节点将事件推送给客户端通道;
- 客户端在通道中读取事件流,实时处理。
八、小结与思考
| 关键点 | 说明 |
|---|---|
ethclient.SubscribeNewHead | 监听新区块 |
ethclient.SubscribeFilterLogs | 监听合约事件 |
rpc.DialContext("grpc://...") | 通过 gRPC 通道连接节点 |
EthSubscribe() | 通用订阅接口 |
| gRPC 模式 | 更高效的事件流通信方式 |
| WebSocket 模式 | 最兼容的通用监听方式 |
监听事件 —— 让你的程序实时感知区块链的脉搏
https://blog.ithuo.net/posts/blockchain-tutorial-evm-5/