Derick
2294 words
11 minutes
构建事件索引系统

目标:构建一个能在生产环境长期运行的链上事件索引器(indexer),把链上事件稳定、可验证地写入 PostgreSQL,并提供对外查询接口与重建能力。


一、架构概览#

核心组件:

  1. Listener(实时订阅器)
    • 通过 WSS 或 gRPC 订阅节点 logs/newHeads/pending
    • 把原始 types.Log 转化为内部事件对象,放入可靠消息通道(本地内存队列或 Kafka/RabbitMQ)。
  2. Processor(解析与校验)
    • 从消息通道消费事件,做 ABI 解码、字段规范化、合约白名单校验,生成要写入 DB 的 Domain 事件。
  3. Writer(持久化层)
    • 把事件写入 Postgres,在写入时保证幂等与顺序(使用唯一约束 + checkpoint)。
  4. Checkpoint 管理器
    • 记录索引进度(例如已处理到的区块高度与区块哈希)。便于断点续跑与重建。
  5. Reorg Handler(重组处理)
    • 检测区块重组/回滚,触发数据回滚或补偿逻辑,保证数据库与链上最终状态一致。
  6. Backfill Worker(历史索引器)
    • 用于首次全量索引或补漏。按区块区间批量拉取 eth_getLogsBlockByNumber,并将结果送入 Processor。
  7. API / Query Layer
    • 为上层应用提供查询接口(REST / GraphQL),支持分页、时间区间、按合约/事件类型过滤等。
  8. Observability(监控 & 告警)
    • 指标(Prometheus)、日志、分布式追踪(OpenTelemetry)、关键告警(处理延迟、错单、queue 泄漏、reorg 频发)。

架构图(文字版):

Node(WSS/gRPC) → Listener → MessageQueue(Kafka/NSQ) → Processor → Writer(Postgres) ← CheckpointManager

另外:Backfill Worker 可直接读取 Node(HTTP)并写入 Processor;Reorg Handler 订阅 newHeads 并检测回滚。


二、数据模型(PostgreSQL)—— 建表与索引#

设计原则:

  • 兼顾读取性能与写入吞吐;
  • 支持幂等写入与唯一约束;
  • 支持按合约/事件/区块区间快速查询;
  • 可扩展到多链/多环境。

关键表:

1. blocks —— 区块头快照(Checkpoint 用)#

CREATE TABLE blocks (
  block_number BIGINT PRIMARY KEY,
  block_hash TEXT NOT NULL,
  parent_hash TEXT NOT NULL,
  timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
  processed_at TIMESTAMP WITH TIME ZONE DEFAULT now()
);
CREATE INDEX idx_blocks_timestamp ON blocks (timestamp);

2. raw_logs —— 原始日志(存储原始数据,便于审计)#

CREATE TABLE raw_logs (
  id BIGSERIAL PRIMARY KEY,
  tx_hash TEXT NOT NULL,
  log_index INT NOT NULL,
  block_number BIGINT NOT NULL,
  block_hash TEXT NOT NULL,
  address TEXT NOT NULL,
  topics TEXT[],              -- jsonb/text array
  data BYTEA,
  received_at TIMESTAMP WITH TIME ZONE DEFAULT now(),
  UNIQUE (tx_hash, log_index) -- 幂等保证
);
CREATE INDEX idx_raw_logs_block ON raw_logs (block_number);
CREATE INDEX idx_raw_logs_address ON raw_logs (address);

3. erc20_transfers —— 业务化事件表(示例)#

CREATE TABLE erc20_transfers (
  id BIGSERIAL PRIMARY KEY,
  tx_hash TEXT NOT NULL,
  log_index INT NOT NULL,
  block_number BIGINT NOT NULL,
  block_hash TEXT NOT NULL,
  contract_address TEXT NOT NULL,
  from_address TEXT NOT NULL,
  to_address TEXT NOT NULL,
  amount NUMERIC(78,0) NOT NULL,
  processed_at TIMESTAMP WITH TIME ZONE DEFAULT now(),
  UNIQUE (tx_hash, log_index) -- 再次保证幂等
);
CREATE INDEX idx_erc20_by_contract_block ON erc20_transfers (contract_address, block_number DESC);
CREATE INDEX idx_erc20_from_to ON erc20_transfers (from_address, to_address);

4. indexer_checkpoints —— 记录每个索引器进度#

CREATE TABLE indexer_checkpoints (
  id TEXT PRIMARY KEY,       -- e.g., "erc20_transfers_mainnet"
  last_scanned_block BIGINT NOT NULL,
  last_scanned_block_hash TEXT NOT NULL,
  updated_at TIMESTAMP WITH TIME ZONE DEFAULT now()
);

说明:

  • raw_logs 用于存审计与回溯,保留原始 topicsdata
  • 每个业务事件表都以 (tx_hash, log_index) 做唯一键,保证幂等插入;
  • 所有按区块查询的表需建立 block_number 上的索引;
  • blocks 表可用于快速比对区块哈希以检测 reorg。

三、同步策略(实时 + 历史补齐)#

系统通常需要两条工作流并行:

A. 实时流(Near real-time)#

  • Listener 订阅 logs / newHeads,事件到达后先写入 raw_logs(事务)→ Processor 解析并写入业务表 → 更新 checkpoint(小批量/事务内)。
  • 优点:低延迟,适合告警/即时业务触发。
  • 要点:保证消息处理顺序或可容忍乱序(用 block_number+log_index排序)。

B. 后向补齐(Backfill / Catch-up)#

  • Backfill Worker 在部署或检测漏数据时,按区块区间 from..to 使用 eth_getLogsBlockByNumber 拉历史数据。
  • 写入同一套 Processor/Writer 流程,但需要保证不覆盖已存在数据(幂等检查)。
  • 对大区间建议分片并发拉取(例如每批 1000~5000 区块),并控制并发度以防节点限流。

实践建议:

  • 实时流用于即时性;历史补齐用于首次索引或漏数据补偿。两者共享同一持久化 API,保证一致性。

四、幂等、去重与顺序保证#

生产系统的核心:绝对避免重复写入与乱序导致的数据不一致

策略:

  1. 唯一约束(数据库级)
    • raw_logs 与业务表上使用 (tx_hash, log_index) 作为唯一键。写入失败(冲突)表示是重复消息,直接忽略或更新元信息。
  2. 乐观并发控制(事务)
    • 写入业务表时使用 DB 事务:先写 raw_logs(若冲突则读取已有记录),再写业务表。
  3. 消息幂等性
    • 从消息队列消费时支持幂等处理:retry 不会导致重复数据(因为 DB 唯一约束会阻止)。
  4. 顺序保证
    • 对于强顺序需求(比如需要严格按区块处理),可按 block_number 分区单线程处理;或者采用 per-contract/key 的分区队列(sharding by contract address)。

示例:插入逻辑伪代码

BEGIN;
INSERT INTO raw_logs (tx_hash, log_index, ...) VALUES (...) ON CONFLICT (tx_hash, log_index) DO NOTHING;
-- 若原始记录已存在,可 fetch 确认是否已处理
INSERT INTO erc20_transfers (...)
  VALUES (...)
  ON CONFLICT (tx_hash, log_index) DO NOTHING;
-- 更新 checkpoint(原子/幂等)
COMMIT;

五、重组(reorg)检测与处理#

重组是链上需要认真对待的事实:短期内存在“临时被打包后又被替换”的区块。生产索引器需支持回滚与补偿。

重组检测思路(高层):#

  • 在处理新区块 N 时,保存 N-1 的哈希到 checkpoint。
  • 当新区块 M 到来,并且 block.parent_hash != checkpoint.last_scanned_block_hash,说明发生重组或链顶回退。

重组处理策略(两种常见方式):#

A. 回滚并重放(safe, 保证一致)#

  1. 找到 fork 的分叉点(从最新已知 checkpoint 向后比对区块哈希),确定需要回滚的区块范围 [fork_block+1 .. last_scanned]
  2. 在 DB 中按 block_number 删除或标记这些区块对应的业务数据(并记录 tombstone 或历史)。
  3. 将 checkpoint 回退到 fork_block,然后重新从 fork_block+1 按新区块顺序重放日志并写入数据。

优点:数据最终一致;缺点:回滚代价高(可能删除大量数据),需保证回滚原子性并审计。

B. 乐观修补(复杂但更快)#

  • 在检测到重组时,不立即回滚,而把受到影响的区块/日志标注为 “可能不稳定(unfinalized)”。
  • 等待 k 个确认(例如 12 或 30 个区块)后,才把区块标注为 finalized 并执行写入/索引。
  • 如果发生重组且数据仍未 final,则直接忽略这些日志。

优点:减少实际回滚操作(适合高吞吐业务);缺点:增加延迟(需等待 confirmations)。

推荐策略:

  • 对于高价值数据(余额、金融业务)采用 等待确认(confirmation)+在 DB 标注 finalized
  • 对于实时通知可先入库并标注 unfinalized,当确认数够后做 finalized 更新;若发生重组则做修正(回滚/修补)。

实现细节:

  • raw_logs 与业务表增加 confirmed BOOLEAN DEFAULT falseconfirmations INT 字段。
  • Listener 可维护 last_final_block = latest_block - CONFIRMATION_DEPTH,并把 block_number <= last_final_block 的记录打上 confirmed 标记。

六、批处理与并发策略#

吞吐与延迟的折中:

  1. 批量写入
    • 把多个事件合并成一个 DB 批插入(COPYINSERT ... VALUES (...),(...),...),减少事务开销。
    • 批大小建议:几十到几百条,视 DB 性能与单条事件大小而定。
  2. 并发分片
    • contract_address % Nblock_number % N 做分片,分配给 N 个 Processor 实例并行处理。
    • 但要注意:同一合约的事件如果有顺序依赖,避免跨分片乱序。
  3. 背压(Backpressure)
    • 当数据库写入变慢时,应向 Listener 施加背压(暂停订阅或降低消费速率),避免内存 OOM 或消息堆积。
  4. 幂等写入 + 可重复消费
    • 设计使得任意消息可以被重复消费(幂等),便于重试与失败补偿。

七、失败恢复与断点续跑#

核心是Checkpoint幂等两件事:

  • Checkpoint:定期(或事务内)提交 indexer_checkpoints,记录 last_scanned_blockblock_hash。立即崩溃重启后,从该块后续开始。
  • 幂等写入:若处理器重新消费已经处理过的日志,数据库唯一约束会阻止二次写入。

补救流程:

  1. 服务重启 → 读取 indexer_checkpoints → 从 last_scanned_block + 1 开始回放。
  2. Backfill Worker 可在发现缺漏时触发区块区间补填。

八、监控与告警#

必监控指标(Prometheus 推荐):

  • Listener 层:连接状态(connected/disconnected)、订阅错误率、WS/gRPC 重连次数。
  • Queue:消费 lag(消息积压长度)、消费速率(msg/s)。
  • Processor:每秒事件处理数、解码错误率、ABI 解析失败数。
  • Writer/DB:写入延迟、批次大小、事务回滚/冲突次数、平均写入吞吐。
  • Reorg:重组次数/小时、最长回滚深度(区块数)。
  • 健康:Last processed block number, last finalized block number。

告警策略(示例):

  • 消费队列积压 > X(例如 10000) → 告警(可能处理慢或节点限流)。
  • Listener disconnect > 5 次 / 10 min → 告警。
  • Reorg 深度 > K(例如 10) → 告警并人工介入。
  • 最近处理区块落后链高度超过 Y(例如 500) → 告警。

日志与追踪:

  • 每条事件处理应打 trace id(tx_hash)以便追踪全链路。使用 OpenTelemetry / Jaeger 做分布式追踪。
构建事件索引系统
https://blog.ithuo.net/posts/blockchain-tutorial-evm-6/
Author
Derick
Published at
2024-12-08