Derick
3857 words
19 minutes
实时监控钱包交易系统设计文档
TIP

本项目是一个用 Rust 编写的命令行工具,核心功能是通过监听新区块并分析其中的交易数据来监控钱包活动。它利用交易收据、调用跟踪和日志信息来追踪代币转移并计算损益。程序设计重点在于 MEV(最大化可提取价值)分析,它能够追踪 builder 奖励和验证者贿赂。通过异步编程和高效的数据结构,实时监控多个链上的多个钱包并将交易消息即时通知到Telegram。

以下是该项目的主要功能和实现原理:

主要功能#

  1. 配置加载
    • 程序从 YAML 配置文件中加载监控的区块链、钱包地址以及 Telegram 通知设置。配置文件定义了要监控的链的 RPC URL、每个钱包的地址、关联的 builder 地址(用于 MEV 分析)、其他相关地址,以及是否包含接收方地址到损益计算中。
    • 支持容器化部署
  2. 区块处理
    • WalletWatcher 使用提供的区块链 provider 连接到指定的链,并在每个新区块到达时处理。它获取区块中的所有交易的收据和跟踪信息,以便进行后续分析。
  3. 损益计算
    • 对于每个钱包,process_block 函数调用 generate_pnl 函数来计算该区块内的损益。此函数分析交易日志和调用跟踪,以追踪原生代币和 ERC20 代币的转移,并计算交易费用,考虑 builder 奖励和验证者贿赂(针对 MEV 场景)。
  4. 消息生成与发送
    • 根据计算出的损益和代币变化,message_generator 生成包含相关信息的消息,例如交易哈希、代币数量变化等。然后程序调用 Telegram Bot API 将消息发送到配置的群组中。
  5. 命令行界面
    • 项目提供了 startrunbacktest 命令,以便用户启动实时监控、执行特定操作或基于历史数据进行回测分析。

区块处理#

wallet-watcher 项目的区块处理主要由 src/processor.rs 文件中的 process_block 函数实现,其详细步骤如下:

  1. 初始化
    • 函数接收链 ID、区块头信息、交易收据和调用跟踪数据,以及要监控的钱包列表作为输入。它初始化一个向量 reports 用于存储每个钱包的损益报告。
  2. 收集所有相关地址
    • 程序收集所有涉及的钱包地址,包括被监控钱包的地址、与其相关的其他地址,以及交易的接收方地址(如果配置中开启了 include_recipient 选项)。这确保了程序能够追踪所有与被监控钱包相关的资金流动。
  3. 遍历交易和调用跟踪
    • 程序遍历区块中的每一笔交易及其对应的调用跟踪信息。对于每笔交易,它调用 generate_pnl 函数计算该交易产生的余额变化,并将结果存储在 balance_changes_all 向量中。该函数会分析交易日志和调用跟踪,以追踪代币转移,包括原生代币和 ERC20 代币的转移,并处理 WETH 的存取款。
  4. 过滤无关交易
    • 程序会过滤掉与被监控钱包无关的交易,以及疑似空气投币的交易(使用 is_shitcoin_airdrop 函数识别此类交易)。
  5. 计算每个钱包的损益
    • 对于每个被监控的钱包,程序会遍历所有与其相关的交易,并计算总交易费用和代币余额变化。它还会根据配置信息判断该钱包是否为 builder,如果是,则计算 builder 奖励和验证者贿赂。
  6. 生成损益报告
    • 最后程序将计算出的损益、代币余额变化、builder 奖励和验证者贿赂等信息汇总到 PnlReport 结构体中,并将其添加到 reports 向量中。如果某个钱包在该区块内没有发生任何相关交易,则对应的报告为空。

核心逻辑#

process_block 函数通过对区块数据进行细致分析,实现了对钱包活动的全面监控和损益计算。它特别处理了 builder 奖励和验证者贿赂,以支持 MEV 分析。总体而言,该函数通过遍历区块中的每一笔交易,分析与被监控钱包相关的数据流动,从而实现精确的财务报告。

配置读取#

wallet-watcher 项目需要配置链节点 RPC 地址,通过这些地址读取和遍历区块:

  • RPC 配置: 在 src/config.rs 文件中,Config 结构体包含一个 chains 字段,它是一个 HashMap,用于存储链名称及对应的 RPC URL。例如:

    chains:  mainnet: "https://eth-mainnet.alchemyapi.io/v2/your-api-key"  optimism: "https://optimism-mainnet.alchemyapi.io/v2/your-api-key"
    
  • 区块读取与遍历: 在 src/main.rs 文件中,程序会读取配置文件,并根据配置创建对应链的信息。在这里,使用 alloy 库提供 HTTP 或 WebSocket provider 来连接到指定 RPC URL。当有新的区块产生时,WalletWatcher 的 process_event 函数会被调用,从而获取最新区块的信息并进行处理。

总结#

wallet-watcher 通过以下步骤读取和遍历区块:

  1. 从配置文件中读取链的 RPC URL。
  2. 使用 alloy 库创建与区块链节点之间的连接。
  3. 监听新区块事件。
  4. 获取新区块中的交易收据和调用跟踪数据。
  5. 遍历这些交易数据并分析钱包活动。

因此wallet-watcher 依赖于用户提供的 RPC URL 来连接到区块链节点(建议自建节点),并使用 alloy 库提供功能来读取和遍历区块数据。

损益计算#

本项目的损益计算主要通过 src/processor.rs 文件中的 generate_pnlprocess_block 函数实现。以下是详细过程:

generate_pnl 函数#

  • 此函数接收链 ID、交易收据、调用跟踪信息以及可选地址集合作为输入。
  • 它首先创建一个 BalanceChanges 结构体,用于存储发生在交易中的余额变化。
  • 遍历调用跟踪中的每一个调用帧,并检查是否有错误或回滚。
  • 对于每个调用帧,它会遍历其中日志,并尝试解码 ERC20 Transfer 事件或 WETH Deposit 和 Withdrawal 事件。如果解码成功,它会记录代币转移信息,包括代币地址、发送方地址、接收方地址及转移数量。
  • 对于原生代币转移,会检查调用帧类型并记录相应转移信息。
  • 最后,它会过滤掉余额为零的转移,并返回 BalanceChanges 结构体。

process_block 函数#

  • 此函数接收链 ID、区块头信息、交易收据及调用跟踪数据,以及要监控的钱包列表作为输入。
  • 它首先遍历区块中的每一笔交易,并调用 generate_pnl 函数计算该笔交易产生的余额变化。
  • 然后过滤掉与被监控钱包无关及疑似空气投币的交易。
  • 对于每个被监控的钱包,遍历所有相关交易并计算总交易费用及代币余额变化。此处,交易费用通过 calculate_tx_fee 函数计算,该函数考虑 gas 使用量及 gas 价格,以及 L1 网络费用(如 Optimism 等 L2 网络)。
  • 最终生成一个 PnlReport 结构体,其中包含完整的损益报告,包括各类费用及奖励信息。

损益计算核心逻辑#

wallet-watcher 的损益计算核心逻辑包括:

  1. 分析交易日志及调用跟踪以追踪代币转移。
  2. 精确计算各类费用。
  3. 合并所有相关地址余额变化以得出准确结果。
  4. 综合考虑 builder 奖励及验证者贿赂以确保报告完整性。

通过这些步骤,wallet-watcher 能够准确地反映出每个被监控钱包在每个区块内所产生的财务状况。

消息生成与发送#

wallet-watcher 项目的消息生成与发送功能主要通过 src/message.rs 文件中的 MessageGenerator 结构体以及 burberry 库实现。以下是详细的实现过程:

MessageGenerator 结构体#

src/message.rs 文件中,定义了 MessageGenerator 结构体,它负责根据区块数据和损益报告生成消息。该结构体持有链 ID 和区块链 provider 的引用,以便于获取相关信息。

  • 加载代币符号和小数位数:使用 load_symbol_and_decimal 函数加载代币的符号和小数位数,以便在消息中格式化代币数量。这一步骤对于确保消息的可读性至关重要,因为不同代币可能具有不同的小数位数。

generate 函数#

MessageGeneratorgenerate 函数接收区块信息、交易收据、调用跟踪数据、损益报告以及钱包信息作为输入。该函数的主要步骤包括:

  1. 生成文本消息:根据输入的信息生成包含交易哈希、代币数量变化、损益等信息的文本消息。它会使用 utils 模块中的函数来格式化地址、交易哈希和代币数量,并生成指向区块浏览器和 Phalcon 的链接。
  2. 包含额外信息:根据配置信息,决定是否包含 builder 奖励和验证者贿赂等信息。这使得用户能够一目了然地看到与其钱包活动相关的所有重要信息。

burberry 库#

burberry 库提供了发送消息的功能,wallet-watcher 使用 burberry::executor::telegram_message::MessageBuilder 来构建消息。在 src/strategy.rs 文件中,WalletWatcher 的 process_block 函数会调用 MessageGeneratorgenerate 函数生成消息。然后,它会使用 MessageBuilder 设置 Telegram Bot 的 token、chat ID 和 thread ID(如果配置了)。最后,它会调用 submitter.submit 函数将消息发送到 Telegram。

消息生成与发送的核心逻辑#

wallet-watcher 的消息生成与发送核心逻辑如下:

  1. 使用 MessageGenerator:根据区块数据和损益报告生成文本消息。
  2. 构建消息:使用 burberry::executor::telegram_message::MessageBuilder 构建消息,并设置 Telegram Bot 的相关信息。
  3. 发送消息:使用 submitter.submit 函数将消息发送到 Telegram。

通过这些步骤,wallet-watcher 能够将监控到的钱包活动和损益信息以文本消息的形式发送到 Telegram,从而帮助用户及时了解其钱包的财务状况。

回测功能实现#

wallet-watcher 项目的回测功能允许用户基于历史数据进行回测分析,其实现方式如下:

读取测试数据#

src/cli/backtest.rs 文件中,定义了回测命令的参数,包括测试数据文件的路径 (test_data) 和 RPC URL (rpc_url)。具体步骤如下:

  1. 打开测试数据文件:回测命令的 run 函数首先打开指定的测试数据文件。
  2. 解析 YAML 数据:使用 serde_yaml 库将文件内容解析为 TestCase 结构体的向量。每个测试用例包括区块号、钱包地址、builder 地址、其他相关地址、是否包含接收方以及预期的损益报告。

创建区块链 provider#

在读取完测试数据后,回测命令会使用提供的 RPC URL 创建一个区块链 provider。这一过程确保 wallet-watcher 能够连接到指定的区块链节点,以获取必要的数据进行分析。

并行处理测试用例#

回测命令利用异步编程并行处理多个测试用例。具体步骤包括:

  1. 创建异步任务:使用 tokio::spawn 创建多个异步任务,每个任务处理一个测试用例。
  2. 调用 worker 函数:每个任务都会调用 worker 函数,该函数首先获取指定区块的交易收据和调用跟踪数据。
  3. 获取区块头信息:然后会获取指定区块的区块头信息,并调用 processor::process_block 函数来计算该区块的损益报告。
  4. 结果比较:最后会将计算出的损益报告与测试用例中预期的损益报告进行比较,以验证 wallet-watcher 的计算逻辑是否正确。

结果验证与输出#

在所有测试用例处理完毕后,回测命令会接收每个任务的执行结果,并根据结果进行验证和输出。具体步骤如下:

  1. 生成新测试用例:如果参数设置为 true,则将计算出的损益报告写入到测试数据文件中,用于生成新的测试用例。
  2. 比较预期结果:如果参数设置为 false,则将计算出的损益报告与预期结果进行比较,并输出测试结果。
  3. 调试输出:如果测试结果不匹配,则输出调试命令,以便用户可以手动运行该测试用例进行调试。

回测命令核心原理#

回测命令的核心原理如下:

  1. 从 YAML 文件中读取测试用例,包括区块号、钱包地址和预期的损益报告。
  2. 使用提供的 RPC URL 连接到区块链节点。
  3. 并行处理每个测试用例,获取指定区块的交易数据,并计算损益。
  4. 将计算出的损益与预期结果进行比较,并输出测试结果。

通过这些步骤,backtest 命令可以基于历史数据进行回测分析,验证 wallet-watcher 项目的损益计算逻辑是否正确。

日志处理#

本项目选择不将链下处理的数据存储到数据库(如 ClickHouse 或 PostgreSQL),而是将数据输出到日志中进行处理和分析。这一选择背后的原因及具体实现逻辑如下:

为什么没有使用数据库?#

  1. 简化架构
    • 使用日志可以简化项目架构,避免引入数据库所带来的复杂性。这使得项目更容易部署和维护,同时降低了开发成本。
  2. 快速原型开发
    • 对于快速原型开发而言,使用日志可以更快地迭代和测试,使开发者能够专注于功能实现,而不必担心数据库管理的问题。
  3. 灵活性
    • 日志提供了灵活的数据分析方式,可以使用各种工具进行过滤、搜索和聚合,从而支持多样化的数据分析需求。

日志分析工具#

用户可以利用现有的日志分析工具(例如 Elasticsearch、Splunk 和 Grafana Loki)来处理和分析日志数据。这些工具能够帮助用户实时监控系统状态,并对日志进行深入分析。

日志输出与处理逻辑#

使用 Rust 的 tracing 库进行日志输出,该库提供了结构化日志记录功能,使得日志更易于过滤和分析。以下是具体实现细节:

  1. 记录损益与交易信息
    • src/strategy.rs 中,WalletWatcher 的 process_block 函数使用 tracing::info! 记录每个钱包的损益、代币变化和交易信息。同时,在处理过程中发生错误时,会使用 tracing::error! 记录错误信息,以便后续排查问题。
  2. 创建处理过程 span
    • src/processor.rs 中,process_block 函数使用 info_span! 创建一个 span,用于记录每个钱包在处理过程中的状态。这种结构化的方法使得追踪特定操作变得更加清晰。
  3. 详细调试信息记录
    • 在生成损益报告时,调用跟踪中的详细信息(如交易索引、交易哈希、费用等)会通过 tracing::trace! 记录下来。这有助于开发者在调试时获得更全面的信息,从而快速定位问题。
实时监控钱包交易系统设计文档
https://blog.ithuo.net/posts/design-document-realtime-cryptocurrency-wallet-monitoring/
Author
Derick
Published at
2024-10-21