2294 words
11 minutes
构建事件索引系统
目标:构建一个能在生产环境长期运行的链上事件索引器(indexer),把链上事件稳定、可验证地写入 PostgreSQL,并提供对外查询接口与重建能力。
一、架构概览
核心组件:
- Listener(实时订阅器)
- 通过 WSS 或 gRPC 订阅节点
logs/newHeads/pending。 - 把原始
types.Log转化为内部事件对象,放入可靠消息通道(本地内存队列或 Kafka/RabbitMQ)。
- 通过 WSS 或 gRPC 订阅节点
- Processor(解析与校验)
- 从消息通道消费事件,做 ABI 解码、字段规范化、合约白名单校验,生成要写入 DB 的 Domain 事件。
- Writer(持久化层)
- 把事件写入 Postgres,在写入时保证幂等与顺序(使用唯一约束 + checkpoint)。
- Checkpoint 管理器
- 记录索引进度(例如已处理到的区块高度与区块哈希)。便于断点续跑与重建。
- Reorg Handler(重组处理)
- 检测区块重组/回滚,触发数据回滚或补偿逻辑,保证数据库与链上最终状态一致。
- Backfill Worker(历史索引器)
- 用于首次全量索引或补漏。按区块区间批量拉取
eth_getLogs或BlockByNumber,并将结果送入 Processor。
- 用于首次全量索引或补漏。按区块区间批量拉取
- API / Query Layer
- 为上层应用提供查询接口(REST / GraphQL),支持分页、时间区间、按合约/事件类型过滤等。
- Observability(监控 & 告警)
- 指标(Prometheus)、日志、分布式追踪(OpenTelemetry)、关键告警(处理延迟、错单、queue 泄漏、reorg 频发)。
架构图(文字版):
Node(WSS/gRPC) → Listener → MessageQueue(Kafka/NSQ) → Processor → Writer(Postgres) ← CheckpointManager
另外:Backfill Worker 可直接读取 Node(HTTP)并写入 Processor;Reorg Handler 订阅 newHeads 并检测回滚。
二、数据模型(PostgreSQL)—— 建表与索引
设计原则:
- 兼顾读取性能与写入吞吐;
- 支持幂等写入与唯一约束;
- 支持按合约/事件/区块区间快速查询;
- 可扩展到多链/多环境。
关键表:
1. blocks —— 区块头快照(Checkpoint 用)
CREATE TABLE blocks (
block_number BIGINT PRIMARY KEY,
block_hash TEXT NOT NULL,
parent_hash TEXT NOT NULL,
timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
processed_at TIMESTAMP WITH TIME ZONE DEFAULT now()
);
CREATE INDEX idx_blocks_timestamp ON blocks (timestamp);
2. raw_logs —— 原始日志(存储原始数据,便于审计)
CREATE TABLE raw_logs (
id BIGSERIAL PRIMARY KEY,
tx_hash TEXT NOT NULL,
log_index INT NOT NULL,
block_number BIGINT NOT NULL,
block_hash TEXT NOT NULL,
address TEXT NOT NULL,
topics TEXT[], -- jsonb/text array
data BYTEA,
received_at TIMESTAMP WITH TIME ZONE DEFAULT now(),
UNIQUE (tx_hash, log_index) -- 幂等保证
);
CREATE INDEX idx_raw_logs_block ON raw_logs (block_number);
CREATE INDEX idx_raw_logs_address ON raw_logs (address);
3. erc20_transfers —— 业务化事件表(示例)
CREATE TABLE erc20_transfers (
id BIGSERIAL PRIMARY KEY,
tx_hash TEXT NOT NULL,
log_index INT NOT NULL,
block_number BIGINT NOT NULL,
block_hash TEXT NOT NULL,
contract_address TEXT NOT NULL,
from_address TEXT NOT NULL,
to_address TEXT NOT NULL,
amount NUMERIC(78,0) NOT NULL,
processed_at TIMESTAMP WITH TIME ZONE DEFAULT now(),
UNIQUE (tx_hash, log_index) -- 再次保证幂等
);
CREATE INDEX idx_erc20_by_contract_block ON erc20_transfers (contract_address, block_number DESC);
CREATE INDEX idx_erc20_from_to ON erc20_transfers (from_address, to_address);
4. indexer_checkpoints —— 记录每个索引器进度
CREATE TABLE indexer_checkpoints (
id TEXT PRIMARY KEY, -- e.g., "erc20_transfers_mainnet"
last_scanned_block BIGINT NOT NULL,
last_scanned_block_hash TEXT NOT NULL,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT now()
);
说明:
raw_logs用于存审计与回溯,保留原始topics、data;- 每个业务事件表都以
(tx_hash, log_index)做唯一键,保证幂等插入; - 所有按区块查询的表需建立
block_number上的索引; blocks表可用于快速比对区块哈希以检测 reorg。
三、同步策略(实时 + 历史补齐)
系统通常需要两条工作流并行:
A. 实时流(Near real-time)
- Listener 订阅
logs/newHeads,事件到达后先写入raw_logs(事务)→ Processor 解析并写入业务表 → 更新 checkpoint(小批量/事务内)。 - 优点:低延迟,适合告警/即时业务触发。
- 要点:保证消息处理顺序或可容忍乱序(用
block_number+log_index排序)。
B. 后向补齐(Backfill / Catch-up)
- Backfill Worker 在部署或检测漏数据时,按区块区间
from..to使用eth_getLogs或BlockByNumber拉历史数据。 - 写入同一套 Processor/Writer 流程,但需要保证不覆盖已存在数据(幂等检查)。
- 对大区间建议分片并发拉取(例如每批 1000~5000 区块),并控制并发度以防节点限流。
实践建议:
- 实时流用于即时性;历史补齐用于首次索引或漏数据补偿。两者共享同一持久化 API,保证一致性。
四、幂等、去重与顺序保证
生产系统的核心:绝对避免重复写入与乱序导致的数据不一致。
策略:
- 唯一约束(数据库级)
- 在
raw_logs与业务表上使用(tx_hash, log_index)作为唯一键。写入失败(冲突)表示是重复消息,直接忽略或更新元信息。
- 在
- 乐观并发控制(事务)
- 写入业务表时使用 DB 事务:先写
raw_logs(若冲突则读取已有记录),再写业务表。
- 写入业务表时使用 DB 事务:先写
- 消息幂等性
- 从消息队列消费时支持幂等处理:retry 不会导致重复数据(因为 DB 唯一约束会阻止)。
- 顺序保证
- 对于强顺序需求(比如需要严格按区块处理),可按
block_number分区单线程处理;或者采用 per-contract/key 的分区队列(sharding by contract address)。
- 对于强顺序需求(比如需要严格按区块处理),可按
示例:插入逻辑伪代码
BEGIN;
INSERT INTO raw_logs (tx_hash, log_index, ...) VALUES (...) ON CONFLICT (tx_hash, log_index) DO NOTHING;
-- 若原始记录已存在,可 fetch 确认是否已处理
INSERT INTO erc20_transfers (...)
VALUES (...)
ON CONFLICT (tx_hash, log_index) DO NOTHING;
-- 更新 checkpoint(原子/幂等)
COMMIT;
五、重组(reorg)检测与处理
重组是链上需要认真对待的事实:短期内存在“临时被打包后又被替换”的区块。生产索引器需支持回滚与补偿。
重组检测思路(高层):
- 在处理新区块 N 时,保存
N-1的哈希到 checkpoint。 - 当新区块 M 到来,并且
block.parent_hash != checkpoint.last_scanned_block_hash,说明发生重组或链顶回退。
重组处理策略(两种常见方式):
A. 回滚并重放(safe, 保证一致)
- 找到 fork 的分叉点(从最新已知 checkpoint 向后比对区块哈希),确定需要回滚的区块范围
[fork_block+1 .. last_scanned]。 - 在 DB 中按 block_number 删除或标记这些区块对应的业务数据(并记录 tombstone 或历史)。
- 将 checkpoint 回退到
fork_block,然后重新从fork_block+1按新区块顺序重放日志并写入数据。
优点:数据最终一致;缺点:回滚代价高(可能删除大量数据),需保证回滚原子性并审计。
B. 乐观修补(复杂但更快)
- 在检测到重组时,不立即回滚,而把受到影响的区块/日志标注为 “可能不稳定(unfinalized)”。
- 等待
k个确认(例如 12 或 30 个区块)后,才把区块标注为finalized并执行写入/索引。 - 如果发生重组且数据仍未 final,则直接忽略这些日志。
优点:减少实际回滚操作(适合高吞吐业务);缺点:增加延迟(需等待 confirmations)。
推荐策略:
- 对于高价值数据(余额、金融业务)采用 等待确认(confirmation)+在 DB 标注 finalized。
- 对于实时通知可先入库并标注
unfinalized,当确认数够后做 finalized 更新;若发生重组则做修正(回滚/修补)。
实现细节:
- 在
raw_logs与业务表增加confirmed BOOLEAN DEFAULT false、confirmations INT字段。 - Listener 可维护
last_final_block = latest_block - CONFIRMATION_DEPTH,并把block_number <= last_final_block的记录打上 confirmed 标记。
六、批处理与并发策略
吞吐与延迟的折中:
- 批量写入
- 把多个事件合并成一个 DB 批插入(
COPY或INSERT ... VALUES (...),(...),...),减少事务开销。 - 批大小建议:几十到几百条,视 DB 性能与单条事件大小而定。
- 把多个事件合并成一个 DB 批插入(
- 并发分片
- 按
contract_address % N或block_number % N做分片,分配给 N 个 Processor 实例并行处理。 - 但要注意:同一合约的事件如果有顺序依赖,避免跨分片乱序。
- 按
- 背压(Backpressure)
- 当数据库写入变慢时,应向 Listener 施加背压(暂停订阅或降低消费速率),避免内存 OOM 或消息堆积。
- 幂等写入 + 可重复消费
- 设计使得任意消息可以被重复消费(幂等),便于重试与失败补偿。
七、失败恢复与断点续跑
核心是Checkpoint与幂等两件事:
- Checkpoint:定期(或事务内)提交
indexer_checkpoints,记录last_scanned_block与block_hash。立即崩溃重启后,从该块后续开始。 - 幂等写入:若处理器重新消费已经处理过的日志,数据库唯一约束会阻止二次写入。
补救流程:
- 服务重启 → 读取
indexer_checkpoints→ 从last_scanned_block + 1开始回放。 - Backfill Worker 可在发现缺漏时触发区块区间补填。
八、监控与告警
必监控指标(Prometheus 推荐):
- Listener 层:连接状态(connected/disconnected)、订阅错误率、WS/gRPC 重连次数。
- Queue:消费 lag(消息积压长度)、消费速率(msg/s)。
- Processor:每秒事件处理数、解码错误率、ABI 解析失败数。
- Writer/DB:写入延迟、批次大小、事务回滚/冲突次数、平均写入吞吐。
- Reorg:重组次数/小时、最长回滚深度(区块数)。
- 健康:Last processed block number, last finalized block number。
告警策略(示例):
- 消费队列积压 > X(例如 10000) → 告警(可能处理慢或节点限流)。
- Listener disconnect > 5 次 / 10 min → 告警。
- Reorg 深度 > K(例如 10) → 告警并人工介入。
- 最近处理区块落后链高度超过 Y(例如 500) → 告警。
日志与追踪:
- 每条事件处理应打 trace id(tx_hash)以便追踪全链路。使用 OpenTelemetry / Jaeger 做分布式追踪。
