1236 words
6 minutes
监听事件 —— 让你的程序实时感知区块链的脉搏
“查询让我们理解过去,
监听让我们捕捉现在。”
一、为什么要监听区块链事件?
在区块链世界中,每一笔交易、每一次合约调用、每一个新区块的产生
都代表着状态的改变。
如果我们只依赖 HTTP 方式去轮询(polling):
for { latestBlock, _ := client.BlockByNumber(ctx, nil) fmt.Println(latestBlock.Number()) time.Sleep(time.Second * 5)}——这虽然能工作,但效率极低、延迟高,还容易漏数据。
所以,Ethereum 节点提供了“推送式”的交互接口:
当区块或日志变化时,节点主动通知客户端。
这正是:
事件监听(Event Subscription)机制
二、两种监听通道:WSS 与 gRPC
1. WebSocket 模式(最常用)
特点:
- 基于
eth_subscribeRPC 机制; - 通信协议是
wss://; - 节点会主动推送事件;
- 适用于 DApp 后端、监控、通知等。
典型 URL:
wss://mainnet.infura.io/ws/v3/<YOUR_PROJECT_ID>2. gRPC 模式(底层流式接口)
gRPC 模式是 go-ethereum 在 1.10+ 版本后提供的另一套接口,
它通过 Protocol Buffers 定义数据结构,
实现更高性能的实时流式订阅。
特点:
- 二进制通信(比 JSON-RPC 快 3~5 倍);
- 支持原生流式调用(stream);
- 适合高并发、低延迟的企业级系统;
- 节点需以
-grpc启动。
典型 URL:
grpc://127.0.0.1:8545三、监听类型总览
| 监听类型 | 函数(go-ethereum) | 适用通道 | 说明 |
|---|---|---|---|
| 新区块 | SubscribeNewHead() | WSS/gRPC | 监听新产生的区块头 |
| 合约事件 | SubscribeFilterLogs() | WSS/gRPC | 监听智能合约日志 |
| 待确认交易 | SubscribePendingTransactions() | WSS | 监听交易池中的交易 |
| 同步状态 | SubscribeSyncing() | WSS/gRPC | 监听节点同步进度 |
| 历史日志 | FilterLogs() | HTTP/WSS | 拉取过去区块的事件 |
四、WSS 模式实战
示例 1:监听新区块
package main
import ( "context" "fmt" "log"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient")
func main() { client, err := ethclient.Dial("wss://sepolia.infura.io/ws/v3/YOUR_PROJECT_ID") if err != nil { log.Fatalf("❌ 无法连接节点: %v", err) } defer client.Close()
headers := make(chan *types.Header) sub, err := client.SubscribeNewHead(context.Background(), headers) if err != nil { log.Fatalf("❌ 订阅失败: %v", err) }
fmt.Println("✅ 正在监听新区块事件...") for { select { case err := <-sub.Err(): log.Println("⚠️ 订阅错误:", err) case header := <-headers: fmt.Printf("⛓️ 新区块 #%v | 哈希: %s\n", header.Number, header.Hash()) } }}输出示例:
✅ 正在监听新区块事件...⛓️ 新区块 #5862145 | 哈希: 0x9e1234...⛓️ 新区块 #5862146 | 哈希: 0xbca987...示例 2:监听合约事件(Transfer)
package main
import ( "context" "fmt" "log" "math/big" "strings"
"github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient")
func main() { client, err := ethclient.Dial("wss://sepolia.infura.io/ws/v3/YOUR_PROJECT_ID") if err != nil { log.Fatalf("❌ 连接节点失败: %v", err) }
// ERC20 合约地址 tokenAddress := common.HexToAddress("0xYourTokenAddress")
query := ethereum.FilterQuery{ Addresses: []common.Address{tokenAddress}, }
logsCh := make(chan types.Log) sub, err := client.SubscribeFilterLogs(context.Background(), query, logsCh) if err != nil { log.Fatalf("❌ 订阅失败: %v", err) }
tokenAbi, _ := abi.JSON(strings.NewReader(`[{"anonymous":false,"inputs":[ {"indexed":true,"name":"from","type":"address"}, {"indexed":true,"name":"to","type":"address"}, {"indexed":false,"name":"value","type":"uint256"} ],"name":"Transfer","type":"event"}]`))
fmt.Println("🎧 已开始监听 ERC20 Transfer 事件...")
for { select { case err := <-sub.Err(): log.Println("⚠️ 错误:", err) case vLog := <-logsCh: var transfer struct { From common.Address To common.Address Value *big.Int }
tokenAbi.UnpackIntoInterface(&transfer, "Transfer", vLog.Data) transfer.From = common.HexToAddress(vLog.Topics[1].Hex()) transfer.To = common.HexToAddress(vLog.Topics[2].Hex())
fmt.Printf("\n📦 区块: %d\nFrom: %s\nTo: %s\nValue: %s\n", vLog.BlockNumber, transfer.From.Hex(), transfer.To.Hex(), transfer.Value.String()) } }}输出示例:
🎧 已开始监听 ERC20 Transfer 事件...
📦 区块: 5439812From: 0x23dE...B45CTo: 0x91a0...CCfAValue: 1000000000000000000五、gRPC 模式:更底层的流式监听
以太坊的 gRPC 接口定义在 go-ethereum 的 proto 文件中:
interfaces/grpc.proto
通过 protoc 生成的接口可直接被 Go 调用。
你可以使用官方客户端包 github.com/ethereum/go-ethereum/rpc 以 DialContext 的方式连接 gRPC 节点。
1. 节点启动参数
如果你运行的是本地 geth 节点,可以这样启动:
geth --grpc --grpc.addr 0.0.0.0 --grpc.port 85472. 连接 gRPC 通道
import ( "context" "log"
"github.com/ethereum/go-ethereum/rpc")
func main() { client, err := rpc.DialContext(context.Background(), "grpc://127.0.0.1:8547") if err != nil { log.Fatalf("❌ 无法连接 gRPC 节点: %v", err) } defer client.Close()
log.Println("✅ 已连接到 gRPC 节点")}3. 通过 gRPC 订阅新区块
在 gRPC 模式下,所有的订阅事件都是通过流(stream)进行的。
以下示例展示如何监听新区块头(NewHeads):
import ( "context" "fmt" "log"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/rpc")
func main() { client, err := rpc.DialContext(context.Background(), "grpc://127.0.0.1:8547") if err != nil { log.Fatalf("❌ gRPC 连接失败: %v", err) } defer client.Close()
headers := make(chan *types.Header) sub, err := client.EthSubscribe(context.Background(), headers, "newHeads") if err != nil { log.Fatalf("❌ 订阅失败: %v", err) }
fmt.Println("🎧 正在通过 gRPC 监听新区块...")
for { select { case err := <-sub.Err(): log.Println("⚠️ 订阅错误:", err) case h := <-headers: fmt.Printf("⛓️ 新区块 #%v | 哈希: %s\n", h.Number, h.Hash()) } }}输出示例:
🎧 正在通过 gRPC 监听新区块...⛓️ 新区块 #8923121 | 哈希: 0x42b2f...⛓️ 新区块 #8923122 | 哈希: 0x5cc99...4. gRPC 模式监听日志事件
与 WSS 类似,只是连接方式不同:
sub, err := client.EthSubscribe(context.Background(), logsCh, "logs", map[string]interface{}{ "address": []string{"0xYourContract"},})一旦事件被触发,客户端会收到结构化的 types.Log 对象。
六、底层机制:事件是如何被推送的?
- 节点执行交易 → 生成日志(Log)和收据(Receipt);
- 新区块被打包并广播;
- 订阅服务检测到匹配事件(通过 FilterQuery);
- 节点将事件推送给客户端通道;
- 客户端在通道中读取事件流,实时处理。
八、小结与思考
| 关键点 | 说明 |
|---|---|
ethclient.SubscribeNewHead | 监听新区块 |
ethclient.SubscribeFilterLogs | 监听合约事件 |
rpc.DialContext("grpc://...") | 通过 gRPC 通道连接节点 |
EthSubscribe() | 通用订阅接口 |
| gRPC 模式 | 更高效的事件流通信方式 |
| WebSocket 模式 | 最兼容的通用监听方式 |
Share
If this article helped you, please share it with others!
监听事件 —— 让你的程序实时感知区块链的脉搏
https://blog.ithuo.net/posts/blockchain-tutorial-evm-5/ Last updated on 2024-12-08,414 days ago
Some content may be outdated