Summer Blog

丢失的Transaction

昨天,我的以太坊监听器丢了一个Transaction,交易丢失对于监听器来说是很严重的问题,检查了一早上,但是结果还是毫无收获。

回顾

分析

我的监听器结构如下图,十分简单。通过Web3j库的block filter与Ethereum客户端交互,取得了需要的block变化数据,处理数据然后存入数据库。这些数据获取和处理过程都是线性的,一条数据走完整个流程,才会处理下一条数据。

my listener

整个过程有两个filter是最值得怀疑的地方。

第一个filter由web3j提供,内部实现也很简单,在listener启动时给一个开始的startBlock,然后web3j比较该block是否是最新block,如果不是,连接 获取该块到最新块数据的Observable 和 递归执行这个检查;如果是最新块,返回onCompleteObservable, onCompleteObservable是我实现的blockObservable。反复检查,这部分代码没有发现异常,代码实现如下:

private Observable<EthBlock> catchUpToLatestBlockObservableSync(
            DefaultBlockParameter startBlock, boolean fullTransactionObjects,
            Observable<EthBlock> onCompleteObservable) {

        BigInteger startBlockNumber;
        BigInteger latestBlockNumber;
        try {
            startBlockNumber = getBlockNumber(startBlock);
            latestBlockNumber = getLatestBlockNumber();
        } catch (IOException e) {
            return Observable.error(e);
        }

        if (startBlockNumber.compareTo(latestBlockNumber) > -1) {
            return onCompleteObservable;
        } else {
            return Observable.concat(
                    replayBlocksObservableSync(
                            new DefaultBlockParameterNumber(startBlockNumber),
                            new DefaultBlockParameterNumber(latestBlockNumber),
                            fullTransactionObjects),
                    Observable.defer(() -> catchUpToLatestBlockObservableSync(
                            new DefaultBlockParameterNumber(latestBlockNumber.add(BigInteger.ONE)),
                            fullTransactionObjects,
                            onCompleteObservable)));
        }
    }

private Observable<EthBlock> blockObservable() {
    return ethBlockHashObservable()
        .flatMap(blockHash -> web3j.ethGetBlockByHash(blockHash, true).observable());
}

private Observable<String> ethBlockHashObservable() {
    return Observable.create(subscriber -> {

      try {
        org.web3j.protocol.core.methods.response.EthFilter filter = web3j.ethNewBlockFilter().send();
        BigInteger filterId = filter.getFilterId();

        ScheduledFuture<?> scheduledFuture = scheduledExecutorService
            .scheduleAtFixedRate(() -> {
              try {
                pollFilter(subscriber, filterId);
              } catch (Exception e) {
                subscriber.onError(e);
              }
            }, 0, pollingInterval, TimeUnit.SECONDS);
        subscriber.add(Subscriptions.create(() -> cancel(filterId, scheduledFuture)));
      } catch (IOException e) {
        throwException(e);
      }
    });
  }

第二个filter是正常的数据过滤,将需要的监听的地址放入一个缓存中,与比较交易中的地址。这里感觉也不像是出错的地方,因为前后交易都被记录了下来。

解决

对于这个bug真的毫无头绪。比较了另一个listener的实现,主要有两个地方不同,一是web3j版本,二是它等待了10 confirmations。

暂时能想到的解决是,更新web3j,3.5版本加入了rebuid filter功能,我就不需要自己实现onCompleteObservable,可能会增加代码的稳定性。由于这个监听器对于时间比较敏感,暂时不打算加入等待。

资料


comments powered by Disqus