From 96816b252b31984b4a3414c4e0e686f13f76abd6 Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Thu, 29 Jan 2026 16:26:30 +0800 Subject: [PATCH] eth/filters: remove support for pending logs #29574 This change removes support for subscribing to pending logs. "Pending logs" were always an odd feature, because it can never be fully reliable. When support for it was added many years ago, the intention was for this to be used by wallet apps to show the 'potential future token balance' of accounts, i.e. as a way of notifying the user of incoming transfers before they were mined. In order to generate the pending logs, the node must pick a subset of all public mempool transactions, execute them in the EVM, and then dispatch the resulting logs to API consumers. --- accounts/abi/bind/backends/simulated.go | 6 +- cmd/utils/flags.go | 2 +- eth/backend.go | 2 +- eth/filters/api.go | 21 +- eth/filters/filter.go | 80 ++--- eth/filters/filter_system.go | 219 ++------------ eth/filters/filter_system_test.go | 385 ++---------------------- eth/filters/filter_test.go | 25 +- 8 files changed, 103 insertions(+), 637 deletions(-) diff --git a/accounts/abi/bind/backends/simulated.go b/accounts/abi/bind/backends/simulated.go index b7a775606669..87d44265e5d3 100644 --- a/accounts/abi/bind/backends/simulated.go +++ b/accounts/abi/bind/backends/simulated.go @@ -153,7 +153,7 @@ func NewXDCSimulatedBackend(alloc types.GenesisAlloc, gasLimit uint64, chainConf filterBackend := &filterBackend{database, blockchain, backend} backend.filterSystem = filters.NewFilterSystem(filterBackend, filters.Config{}) - backend.events = filters.NewEventSystem(backend.filterSystem, false) + backend.events = filters.NewEventSystem(backend.filterSystem) blockchain.Client = backend @@ -181,7 +181,7 @@ func NewSimulatedBackend(alloc types.GenesisAlloc, gasLimit uint64) *SimulatedBa filterBackend := &filterBackend{database, blockchain, backend} backend.filterSystem = filters.NewFilterSystem(filterBackend, filters.Config{}) - backend.events = filters.NewEventSystem(backend.filterSystem, false) + backend.events = filters.NewEventSystem(backend.filterSystem) header := backend.blockchain.CurrentBlock() block := backend.blockchain.GetBlock(header.Hash(), header.Number.Uint64()) @@ -834,7 +834,7 @@ func (b *SimulatedBackend) FilterLogs(ctx context.Context, query ethereum.Filter if query.FromBlock != nil { from = query.FromBlock.Int64() } - to := int64(-1) + to := int64(rpc.LatestBlockNumber) if query.ToBlock != nil { to = query.ToBlock.Int64() } diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index c3b1a6ffd0ce..78bc42502b0c 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -1862,7 +1862,7 @@ func RegisterFilterAPI(stack *node.Node, backend ethapi.Backend, ethcfg *ethconf }) stack.RegisterAPIs([]rpc.API{{ Namespace: "eth", - Service: filters.NewFilterAPI(filterSystem, false), + Service: filters.NewFilterAPI(filterSystem), }}) return filterSystem } diff --git a/eth/backend.go b/eth/backend.go index fe6cb386e76b..018a5c449a70 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -426,7 +426,7 @@ func (e *Ethereum) APIs() []rpc.API { Service: downloader.NewDownloaderAPI(e.protocolManager.downloader, e.eventMux), }, { Namespace: "eth", - Service: filters.NewFilterAPI(filters.NewFilterSystem(e.APIBackend, filters.Config{LogCacheSize: e.config.FilterLogCacheSize}), false), + Service: filters.NewFilterAPI(filters.NewFilterSystem(e.APIBackend, filters.Config{LogCacheSize: e.config.FilterLogCacheSize})), }, { Namespace: "admin", Service: NewAdminAPI(e), diff --git a/eth/filters/api.go b/eth/filters/api.go index 6af73caea55b..e3ca8492ef3a 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -33,13 +33,14 @@ import ( ) var ( - errInvalidTopic = invalidParamsErr("invalid topic(s)") - errInvalidBlockRange = invalidParamsErr("invalid block range params") - errBlockHashWithRange = invalidParamsErr("can't specify fromBlock/toBlock with blockHash") - errUnknownBlock = errors.New("unknown block") - errFilterNotFound = errors.New("filter not found") - errExceedMaxTopics = errors.New("exceed max topics") - errExceedLogQueryLimit = errors.New("exceed max addresses or topics per search position") + errInvalidTopic = invalidParamsErr("invalid topic(s)") + errInvalidBlockRange = invalidParamsErr("invalid block range params") + errBlockHashWithRange = invalidParamsErr("can't specify fromBlock/toBlock with blockHash") + errPendingLogsUnsupported = invalidParamsErr("pending logs are not supported") + errUnknownBlock = errors.New("unknown block") + errFilterNotFound = errors.New("filter not found") + errExceedMaxTopics = errors.New("exceed max topics") + errExceedLogQueryLimit = errors.New("exceed max addresses or topics per search position") ) type invalidParamsError struct { @@ -83,10 +84,10 @@ type FilterAPI struct { } // NewFilterAPI returns a new FilterAPI instance. -func NewFilterAPI(system *FilterSystem, lightMode bool) *FilterAPI { +func NewFilterAPI(system *FilterSystem) *FilterAPI { api := &FilterAPI{ sys: system, - events: NewEventSystem(system, lightMode), + events: NewEventSystem(system), filters: make(map[rpc.ID]*filter), timeout: system.cfg.Timeout, logQueryLimit: system.cfg.LogQueryLimit, @@ -481,7 +482,7 @@ func (api *FilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) { txs := f.txs f.txs = nil return txs, nil - case LogsSubscription, MinedAndPendingLogsSubscription: + case LogsSubscription: logs := f.logs f.logs = nil return returnLogs(logs), nil diff --git a/eth/filters/filter.go b/eth/filters/filter.go index b0da021ff3d2..0617f2d1fbf9 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -110,54 +110,55 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { } return f.blockLogs(ctx, header) } - var ( - beginPending = f.begin == rpc.PendingBlockNumber.Int64() - endPending = f.end == rpc.PendingBlockNumber.Int64() - ) - // special case for pending logs - if beginPending && !endPending { - return nil, errors.New("invalid block range") + // Disallow pending logs. + if f.begin == rpc.PendingBlockNumber.Int64() || f.end == rpc.PendingBlockNumber.Int64() { + return nil, errPendingLogsUnsupported } - // Short-cut if all we care about is pending logs - if beginPending && endPending { - return f.pendingLogs(), nil - } - - resolveSpecial := func(number int64) (int64, error) { - var hdr *types.Header + resolveSpecial := func(number int64) (uint64, error) { switch number { - case rpc.LatestBlockNumber.Int64(), rpc.PendingBlockNumber.Int64(): - // we should return head here since we've already captured - // that we need to get the pending logs in the pending boolean above - hdr, _ = f.sys.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber) + case rpc.LatestBlockNumber.Int64(): + hdr, _ := f.sys.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber) if hdr == nil { return 0, errors.New("latest header not found") } + return hdr.Number.Uint64(), nil case rpc.FinalizedBlockNumber.Int64(): - hdr, _ = f.sys.backend.HeaderByNumber(ctx, rpc.FinalizedBlockNumber) + hdr, _ := f.sys.backend.HeaderByNumber(ctx, rpc.FinalizedBlockNumber) if hdr == nil { - return 0, errors.New("committed header not found") + return 0, errors.New("finalized header not found") } + return hdr.Number.Uint64(), nil + case rpc.EarliestBlockNumber.Int64(): + hdr, _ := f.sys.backend.HeaderByNumber(ctx, rpc.EarliestBlockNumber) + if hdr == nil { + return 0, errors.New("earliest header not found") + } + return hdr.Number.Uint64(), nil default: - return number, nil + if number < 0 { + return 0, errors.New("negative block number") + } + return uint64(number), nil } - return hdr.Number.Int64(), nil } - var err error // range query need to resolve the special begin/end block number - if f.begin, err = resolveSpecial(f.begin); err != nil { + begin, err := resolveSpecial(f.begin) + if err != nil { return nil, err } - if f.end, err = resolveSpecial(f.end); err != nil { + end, err := resolveSpecial(f.end) + if err != nil { return nil, err } - if f.rangeLimit != 0 && (uint64(f.end)-uint64(f.begin)) > f.rangeLimit { + if f.rangeLimit != 0 && (end-begin) > f.rangeLimit { return nil, fmt.Errorf("exceed maximum block range: %d", f.rangeLimit) } + f.begin = int64(begin) + f.end = int64(end) logChan, errChan := f.rangeLogsAsync(ctx) var logs []*types.Log for { @@ -165,16 +166,7 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { case log := <-logChan: logs = append(logs, log) case err := <-errChan: - if err != nil { - // if an error occurs during extraction, we do return the extracted data - return logs, err - } - // Append the pending ones - if endPending { - pendingLogs := f.pendingLogs() - logs = append(logs, pendingLogs...) - } - return logs, nil + return logs, err } } } @@ -332,22 +324,6 @@ func (f *Filter) checkMatches(ctx context.Context, header *types.Header) ([]*typ return logs, nil } -// pendingLogs returns the logs matching the filter criteria within the pending block. -func (f *Filter) pendingLogs() []*types.Log { - block, receipts := f.sys.backend.PendingBlockAndReceipts() - if block == nil { - return nil - } - if bloomFilter(block.Bloom(), f.addresses, f.topics) { - var unfiltered []*types.Log - for _, r := range receipts { - unfiltered = append(unfiltered, r.Logs...) - } - return filterLogs(unfiltered, nil, nil, f.addresses, f.topics) - } - return nil -} - func includes(addresses []common.Address, a common.Address) bool { for _, addr := range addresses { if addr == a { diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 13c4a72c8c07..1b29126b12b5 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -30,7 +30,6 @@ import ( "github.com/XinFinOrg/XDPoSChain/common/lru" "github.com/XinFinOrg/XDPoSChain/core" "github.com/XinFinOrg/XDPoSChain/core/bloombits" - "github.com/XinFinOrg/XDPoSChain/core/rawdb" "github.com/XinFinOrg/XDPoSChain/core/types" "github.com/XinFinOrg/XDPoSChain/ethdb" "github.com/XinFinOrg/XDPoSChain/event" @@ -64,7 +63,6 @@ type Backend interface { GetBody(ctx context.Context, hash common.Hash, number rpc.BlockNumber) (*types.Body, error) GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) GetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error) - PendingBlockAndReceipts() (*types.Block, types.Receipts) CurrentHeader() *types.Header ChainConfig() *params.ChainConfig @@ -72,7 +70,6 @@ type Backend interface { SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription - SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription BloomStatus() (uint64, uint64) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) @@ -154,10 +151,6 @@ const ( UnknownSubscription Type = iota // LogsSubscription queries for new or removed (chain reorg) logs LogsSubscription - // PendingLogsSubscription queries for logs in pending blocks - PendingLogsSubscription - // MinedAndPendingLogsSubscription queries for logs in mined and pending blocks. - MinedAndPendingLogsSubscription // PendingTransactionsSubscription queries for pending transactions entering // the pending state PendingTransactionsSubscription @@ -194,26 +187,22 @@ type subscription struct { // EventSystem creates subscriptions, processes events and broadcasts them to the // subscription which match the subscription criteria. type EventSystem struct { - backend Backend - sys *FilterSystem - lightMode bool - lastHead *types.Header + backend Backend + sys *FilterSystem // Subscriptions - txsSub event.Subscription // Subscription for new transaction event - logsSub event.Subscription // Subscription for new log event - rmLogsSub event.Subscription // Subscription for removed log event - pendingLogsSub event.Subscription // Subscription for pending log event - chainSub event.Subscription // Subscription for new chain event + txsSub event.Subscription // Subscription for new transaction event + logsSub event.Subscription // Subscription for new log event + rmLogsSub event.Subscription // Subscription for removed log event + chainSub event.Subscription // Subscription for new chain event // Channels - install chan *subscription // install filter for event notification - uninstall chan *subscription // remove filter for event notification - txsCh chan core.NewTxsEvent // Channel to receive new transactions event - logsCh chan []*types.Log // Channel to receive new log event - pendingLogsCh chan []*types.Log // Channel to receive new log event - rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event - chainCh chan core.ChainEvent // Channel to receive new chain event + install chan *subscription // install filter for event notification + uninstall chan *subscription // remove filter for event notification + txsCh chan core.NewTxsEvent // Channel to receive new transactions event + logsCh chan []*types.Log // Channel to receive new log event + rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event + chainCh chan core.ChainEvent // Channel to receive new chain event } // NewEventSystem creates a new manager that listens for event on the given mux, @@ -222,18 +211,16 @@ type EventSystem struct { // // The returned manager has a loop that needs to be stopped with the Stop function // or by stopping the given mux. -func NewEventSystem(sys *FilterSystem, lightMode bool) *EventSystem { +func NewEventSystem(sys *FilterSystem) *EventSystem { m := &EventSystem{ - sys: sys, - backend: sys.backend, - lightMode: lightMode, - install: make(chan *subscription), - uninstall: make(chan *subscription), - txsCh: make(chan core.NewTxsEvent, txChanSize), - logsCh: make(chan []*types.Log, logsChanSize), - rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize), - pendingLogsCh: make(chan []*types.Log, logsChanSize), - chainCh: make(chan core.ChainEvent, chainEvChanSize), + sys: sys, + backend: sys.backend, + install: make(chan *subscription), + uninstall: make(chan *subscription), + txsCh: make(chan core.NewTxsEvent, txChanSize), + logsCh: make(chan []*types.Log, logsChanSize), + rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize), + chainCh: make(chan core.ChainEvent, chainEvChanSize), } // Subscribe events @@ -241,10 +228,9 @@ func NewEventSystem(sys *FilterSystem, lightMode bool) *EventSystem { m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh) m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh) m.chainSub = m.backend.SubscribeChainEvent(m.chainCh) - m.pendingLogsSub = m.backend.SubscribePendingLogsEvent(m.pendingLogsCh) // Make sure none of the subscriptions are empty - if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogsSub == nil { + if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil { log.Crit("Subscribe for event system failed") } @@ -327,10 +313,11 @@ func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*typ to = rpc.BlockNumber(crit.ToBlock.Int64()) } - // only interested in pending logs - if from == rpc.PendingBlockNumber && to == rpc.PendingBlockNumber { - return es.subscribePendingLogs(crit, logs), nil + // Pending logs are not supported anymore. + if from == rpc.PendingBlockNumber || to == rpc.PendingBlockNumber { + return nil, errPendingLogsUnsupported } + // only interested in new mined logs if from == rpc.LatestBlockNumber && to == rpc.LatestBlockNumber { return es.subscribeLogs(crit, logs), nil @@ -339,10 +326,6 @@ func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*typ if from >= 0 && to >= 0 && to >= from { return es.subscribeLogs(crit, logs), nil } - // interested in mined logs from a specific block number, new logs and pending logs - if from >= rpc.LatestBlockNumber && to == rpc.PendingBlockNumber { - return es.subscribeMinedPendingLogs(crit, logs), nil - } // interested in logs from a specific block number to new mined blocks if from >= 0 && to == rpc.LatestBlockNumber { return es.subscribeLogs(crit, logs), nil @@ -350,23 +333,6 @@ func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*typ return nil, errInvalidBlockRange } -// subscribeMinedPendingLogs creates a subscription that returned mined and -// pending logs that match the given criteria. -func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription { - sub := &subscription{ - id: rpc.NewID(), - typ: MinedAndPendingLogsSubscription, - logsCrit: crit, - created: time.Now(), - logs: logs, - txs: make(chan []*types.Transaction), - headers: make(chan *types.Header), - installed: make(chan struct{}), - err: make(chan error), - } - return es.subscribe(sub) -} - // subscribeLogs creates a subscription that will write all logs matching the // given criteria to the given logs channel. func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription { @@ -384,23 +350,6 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ return es.subscribe(sub) } -// subscribePendingLogs creates a subscription that writes transaction hashes for -// transactions that enter the transaction pool. -func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription { - sub := &subscription{ - id: rpc.NewID(), - typ: PendingLogsSubscription, - logsCrit: crit, - created: time.Now(), - logs: logs, - txs: make(chan []*types.Transaction), - headers: make(chan *types.Header), - installed: make(chan struct{}), - err: make(chan error), - } - return es.subscribe(sub) -} - // SubscribeNewHeads creates a subscription that writes the header of a block that is // imported in the chain. func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscription { @@ -447,18 +396,6 @@ func (es *EventSystem) handleLogs(filters filterIndex, ev []*types.Log) { } } -func (es *EventSystem) handlePendingLogs(filters filterIndex, ev []*types.Log) { - if len(ev) == 0 { - return - } - for _, f := range filters[PendingLogsSubscription] { - matchedLogs := filterLogs(ev, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics) - if len(matchedLogs) > 0 { - f.logs <- matchedLogs - } - } -} - func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent) { for _, f := range filters[PendingTransactionsSubscription] { f.txs <- ev.Txs @@ -469,91 +406,6 @@ func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent) for _, f := range filters[BlocksSubscription] { f.headers <- ev.Block.Header() } - if es.lightMode && len(filters[LogsSubscription]) > 0 { - es.lightFilterNewHead(ev.Block.Header(), func(header *types.Header, remove bool) { - for _, f := range filters[LogsSubscription] { - if f.logsCrit.FromBlock != nil && header.Number.Cmp(f.logsCrit.FromBlock) < 0 { - continue - } - if f.logsCrit.ToBlock != nil && header.Number.Cmp(f.logsCrit.ToBlock) > 0 { - continue - } - if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 { - f.logs <- matchedLogs - } - } - }) - } -} - -func (es *EventSystem) lightFilterNewHead(newHeader *types.Header, callBack func(*types.Header, bool)) { - oldh := es.lastHead - es.lastHead = newHeader - if oldh == nil { - return - } - newh := newHeader - // find common ancestor, create list of rolled back and new block hashes - var oldHeaders, newHeaders []*types.Header - for oldh.Hash() != newh.Hash() { - if oldh.Number.Uint64() >= newh.Number.Uint64() { - oldHeaders = append(oldHeaders, oldh) - oldh = rawdb.ReadHeader(es.backend.ChainDb(), oldh.ParentHash, oldh.Number.Uint64()-1) - } - if oldh.Number.Uint64() < newh.Number.Uint64() { - newHeaders = append(newHeaders, newh) - newh = rawdb.ReadHeader(es.backend.ChainDb(), newh.ParentHash, newh.Number.Uint64()-1) - if newh == nil { - // happens when CHT syncing, nothing to do - newh = oldh - } - } - } - // roll back old blocks - for _, h := range oldHeaders { - callBack(h, true) - } - // check new blocks (array is in reverse order) - for i := len(newHeaders) - 1; i >= 0; i-- { - callBack(newHeaders[i], false) - } -} - -// filter logs of a single header in light client mode -func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.Address, topics [][]common.Hash, remove bool) []*types.Log { - if !bloomFilter(header.Bloom, addresses, topics) { - return nil - } - // Get the logs of the block - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - defer cancel() - cached, err := es.sys.cachedLogElem(ctx, header.Hash(), header.Number.Uint64()) - if err != nil { - return nil - } - unfiltered := append([]*types.Log{}, cached.logs...) - for i, log := range unfiltered { - // Don't modify in-cache elements - logcopy := *log - logcopy.Removed = remove - // Swap copy in-place - unfiltered[i] = &logcopy - } - logs := filterLogs(unfiltered, nil, nil, addresses, topics) - // Txhash is already resolved - if len(logs) > 0 && logs[0].TxHash != (common.Hash{}) { - return logs - } - // Resolve txhash - body, err := es.sys.cachedGetBody(ctx, cached, header.Hash(), header.Number.Uint64()) - if err != nil { - return nil - } - for _, log := range logs { - // logs are already copied, safe to modify - log.TxHash = body.Transactions[log.TxIndex].Hash() - } - return logs } // eventLoop (un)installs filters and processes mux events. @@ -563,7 +415,6 @@ func (es *EventSystem) eventLoop() { es.txsSub.Unsubscribe() es.logsSub.Unsubscribe() es.rmLogsSub.Unsubscribe() - es.pendingLogsSub.Unsubscribe() es.chainSub.Unsubscribe() }() @@ -580,29 +431,15 @@ func (es *EventSystem) eventLoop() { es.handleLogs(index, ev) case ev := <-es.rmLogsCh: es.handleLogs(index, ev.Logs) - case ev := <-es.pendingLogsCh: - es.handlePendingLogs(index, ev) case ev := <-es.chainCh: es.handleChainEvent(index, ev) case f := <-es.install: - if f.typ == MinedAndPendingLogsSubscription { - // the type are logs and pending logs subscriptions - index[LogsSubscription][f.id] = f - index[PendingLogsSubscription][f.id] = f - } else { - index[f.typ][f.id] = f - } + index[f.typ][f.id] = f close(f.installed) case f := <-es.uninstall: - if f.typ == MinedAndPendingLogsSubscription { - // the type are logs and pending logs subscriptions - delete(index[LogsSubscription], f.id) - delete(index[PendingLogsSubscription], f.id) - } else { - delete(index[f.typ], f.id) - } + delete(index[f.typ], f.id) close(f.err) // System stopped diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index bf01ef9aa499..e3949c803f42 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -19,7 +19,6 @@ package filters import ( "context" "errors" - "fmt" "math/big" "math/rand" "reflect" @@ -27,7 +26,6 @@ import ( "testing" "time" - ethereum "github.com/XinFinOrg/XDPoSChain" "github.com/XinFinOrg/XDPoSChain/common" "github.com/XinFinOrg/XDPoSChain/consensus/ethash" "github.com/XinFinOrg/XDPoSChain/core" @@ -35,7 +33,6 @@ import ( "github.com/XinFinOrg/XDPoSChain/core/rawdb" "github.com/XinFinOrg/XDPoSChain/core/types" "github.com/XinFinOrg/XDPoSChain/core/vm" - "github.com/XinFinOrg/XDPoSChain/crypto" "github.com/XinFinOrg/XDPoSChain/ethdb" "github.com/XinFinOrg/XDPoSChain/event" "github.com/XinFinOrg/XDPoSChain/params" @@ -49,7 +46,6 @@ type testBackend struct { txFeed event.Feed logsFeed event.Feed rmLogsFeed event.Feed - pendingLogsFeed event.Feed chainFeed event.Feed pendingBlock *types.Block pendingReceipts types.Receipts @@ -116,10 +112,6 @@ func (b *testBackend) GetLogs(ctx context.Context, hash common.Hash, number uint return logs, nil } -func (b *testBackend) PendingBlockAndReceipts() (*types.Block, types.Receipts) { - return b.pendingBlock, b.pendingReceipts -} - func (b *testBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription { return b.txFeed.Subscribe(ch) } @@ -132,10 +124,6 @@ func (b *testBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscript return b.logsFeed.Subscribe(ch) } -func (b *testBackend) SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription { - return b.pendingLogsFeed.Subscribe(ch) -} - func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription { return b.chainFeed.Subscribe(ch) } @@ -171,6 +159,11 @@ func (b *testBackend) ServiceFilter(ctx context.Context, session *bloombits.Matc }() } +func (b *testBackend) setPending(block *types.Block, receipts types.Receipts) { + b.pendingBlock = block + b.pendingReceipts = receipts +} + func newTestFilterSystem(t testing.TB, db ethdb.Database, cfg Config) (*testBackend, *FilterSystem) { backend := &testBackend{db: db} sys := NewFilterSystem(backend, cfg) @@ -188,13 +181,13 @@ func TestBlockSubscription(t *testing.T) { var ( db = rawdb.NewMemoryDatabase() backend, sys = newTestFilterSystem(t, db, Config{}) - api = NewFilterAPI(sys, false) + api = NewFilterAPI(sys) genesis = (&core.Genesis{ Config: params.TestChainConfig, BaseFee: big.NewInt(params.InitialBaseFee), }).MustCommit(db) chain, _ = core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 10, func(i int, gen *core.BlockGen) {}) - chainEvents = []core.ChainEvent{} + chainEvents []core.ChainEvent ) for _, blk := range chain { @@ -243,7 +236,7 @@ func TestPendingTxFilter(t *testing.T) { var ( db = rawdb.NewMemoryDatabase() backend, sys = newTestFilterSystem(t, db, Config{}) - api = NewFilterAPI(sys, false) + api = NewFilterAPI(sys) transactions = []*types.Transaction{ types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil), @@ -298,7 +291,7 @@ func TestLogFilterCreation(t *testing.T) { var ( db = rawdb.NewMemoryDatabase() _, sys = newTestFilterSystem(t, db, Config{}) - api = NewFilterAPI(sys, false) + api = NewFilterAPI(sys) testCases = []struct { crit FilterCriteria @@ -310,8 +303,6 @@ func TestLogFilterCreation(t *testing.T) { {FilterCriteria{FromBlock: big.NewInt(1), ToBlock: big.NewInt(2)}, true}, // "mined" block range to pending {FilterCriteria{FromBlock: big.NewInt(1), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, true}, - // new mined and pending blocks - {FilterCriteria{FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64())}, true}, // from block "higher" than to block {FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(1)}, false}, // from block "higher" than to block @@ -344,7 +335,7 @@ func TestInvalidLogFilterCreation(t *testing.T) { var ( db = rawdb.NewMemoryDatabase() _, sys = newTestFilterSystem(t, db, Config{LogQueryLimit: 1000}) - api = NewFilterAPI(sys, false) + api = NewFilterAPI(sys) ) // different situations where log filter creation should fail. @@ -375,7 +366,7 @@ func TestInvalidGetLogsRequest(t *testing.T) { } db, blocks, _ = core.GenerateChainWithGenesis(genesis, ethash.NewFaker(), 10, func(i int, gen *core.BlockGen) {}) _, sys = newTestFilterSystem(t, db, Config{LogQueryLimit: 10}) - api = NewFilterAPI(sys, false) + api = NewFilterAPI(sys) blockHash = blocks[0].Hash() unknownBlockHash = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111") ) @@ -439,7 +430,7 @@ func TestInvalidGetRangeLogsRequest(t *testing.T) { var ( db = rawdb.NewMemoryDatabase() _, sys = newTestFilterSystem(t, db, Config{}) - api = NewFilterAPI(sys, false) + api = NewFilterAPI(sys) ) if _, err := api.GetLogs(context.Background(), FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(1)}); err != errInvalidBlockRange { @@ -454,7 +445,7 @@ func TestLogFilter(t *testing.T) { var ( db = rawdb.NewMemoryDatabase() backend, sys = newTestFilterSystem(t, db, Config{}) - api = NewFilterAPI(sys, false) + api = NewFilterAPI(sys) firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") @@ -473,9 +464,6 @@ func TestLogFilter(t *testing.T) { {Address: thirdAddress, Topics: []common.Hash{secondTopic}, BlockNumber: 3}, } - expectedCase7 = []*types.Log{allLogs[3], allLogs[4], allLogs[0], allLogs[1], allLogs[2], allLogs[3], allLogs[4]} - expectedCase11 = []*types.Log{allLogs[1], allLogs[2], allLogs[1], allLogs[2]} - testCases = []struct { crit FilterCriteria expected []*types.Log @@ -493,20 +481,14 @@ func TestLogFilter(t *testing.T) { 4: {FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}}, allLogs[3:5], ""}, // match logs based on multiple addresses and "or" topics 5: {FilterCriteria{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}}, allLogs[2:5], ""}, - // logs in the pending block - 6: {FilterCriteria{Addresses: []common.Address{firstAddr}, FromBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64())}, allLogs[:2], ""}, - // mined logs with block num >= 2 or pending logs - 7: {FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64())}, expectedCase7, ""}, // all "mined" logs with block num >= 2 - 8: {FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, allLogs[3:], ""}, + 6: {FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, allLogs[3:], ""}, // all "mined" logs - 9: {FilterCriteria{ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, allLogs, ""}, + 7: {FilterCriteria{ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, allLogs, ""}, // all "mined" logs with 1>= block num <=2 and topic secondTopic - 10: {FilterCriteria{FromBlock: big.NewInt(1), ToBlock: big.NewInt(2), Topics: [][]common.Hash{{secondTopic}}}, allLogs[3:4], ""}, - // all "mined" and pending logs with topic firstTopic - 11: {FilterCriteria{FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), Topics: [][]common.Hash{{firstTopic}}}, expectedCase11, ""}, + 8: {FilterCriteria{FromBlock: big.NewInt(1), ToBlock: big.NewInt(2), Topics: [][]common.Hash{{secondTopic}}}, allLogs[3:4], ""}, // match all logs due to wildcard topic - 12: {FilterCriteria{Topics: [][]common.Hash{nil}}, allLogs[1:], ""}, + 9: {FilterCriteria{Topics: [][]common.Hash{nil}}, allLogs[1:], ""}, } ) @@ -520,9 +502,6 @@ func TestLogFilter(t *testing.T) { if nsend := backend.logsFeed.Send(allLogs); nsend == 0 { t.Fatal("Logs event not delivered") } - if nsend := backend.pendingLogsFeed.Send(allLogs); nsend == 0 { - t.Fatal("Pending logs event not delivered") - } for i, tt := range testCases { var fetched []*types.Log @@ -530,7 +509,7 @@ func TestLogFilter(t *testing.T) { for { // fetch all expected logs results, err := api.GetFilterChanges(tt.id) if err != nil { - t.Fatalf("Unable to fetch logs: %v", err) + t.Fatalf("test %d: unable to fetch logs: %v", i, err) } fetched = append(fetched, results.([]*types.Log)...) @@ -561,324 +540,6 @@ func TestLogFilter(t *testing.T) { } } -// TestPendingLogsSubscription tests if a subscription receives the correct pending logs that are posted to the event feed. -func TestPendingLogsSubscription(t *testing.T) { - t.Parallel() - - var ( - db = rawdb.NewMemoryDatabase() - backend, sys = newTestFilterSystem(t, db, Config{}) - api = NewFilterAPI(sys, false) - - firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") - secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") - thirdAddress = common.HexToAddress("0x3333333333333333333333333333333333333333") - notUsedAddress = common.HexToAddress("0x9999999999999999999999999999999999999999") - firstTopic = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111") - secondTopic = common.HexToHash("0x2222222222222222222222222222222222222222222222222222222222222222") - thirdTopic = common.HexToHash("0x3333333333333333333333333333333333333333333333333333333333333333") - fourthTopic = common.HexToHash("0x4444444444444444444444444444444444444444444444444444444444444444") - notUsedTopic = common.HexToHash("0x9999999999999999999999999999999999999999999999999999999999999999") - - allLogs = [][]*types.Log{ - {{Address: firstAddr, Topics: []common.Hash{}, BlockNumber: 0}}, - {{Address: firstAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 1}}, - {{Address: secondAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 2}}, - {{Address: thirdAddress, Topics: []common.Hash{secondTopic}, BlockNumber: 3}}, - {{Address: thirdAddress, Topics: []common.Hash{secondTopic}, BlockNumber: 4}}, - { - {Address: thirdAddress, Topics: []common.Hash{firstTopic}, BlockNumber: 5}, - {Address: thirdAddress, Topics: []common.Hash{thirdTopic}, BlockNumber: 5}, - {Address: thirdAddress, Topics: []common.Hash{fourthTopic}, BlockNumber: 5}, - {Address: firstAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 5}, - }, - } - - pendingBlockNumber = big.NewInt(rpc.PendingBlockNumber.Int64()) - - testCases = []struct { - crit ethereum.FilterQuery - expected []*types.Log - c chan []*types.Log - sub *Subscription - err chan error - }{ - // match all - { - ethereum.FilterQuery{FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber}, - flattenLogs(allLogs), - nil, nil, nil, - }, - // match none due to no matching addresses - { - ethereum.FilterQuery{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{nil}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber}, - nil, - nil, nil, nil, - }, - // match logs based on addresses, ignore topics - { - ethereum.FilterQuery{Addresses: []common.Address{firstAddr}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber}, - append(flattenLogs(allLogs[:2]), allLogs[5][3]), - nil, nil, nil, - }, - // match none due to no matching topics (match with address) - { - ethereum.FilterQuery{Addresses: []common.Address{secondAddr}, Topics: [][]common.Hash{{notUsedTopic}}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber}, - nil, - nil, nil, nil, - }, - // match logs based on addresses and topics - { - ethereum.FilterQuery{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber}, - append(flattenLogs(allLogs[3:5]), allLogs[5][0]), - nil, nil, nil, - }, - // match logs based on multiple addresses and "or" topics - { - ethereum.FilterQuery{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber}, - append(flattenLogs(allLogs[2:5]), allLogs[5][0]), - nil, nil, nil, - }, - // multiple pending logs, should match only 2 topics from the logs in block 5 - { - ethereum.FilterQuery{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, fourthTopic}}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber}, - []*types.Log{allLogs[5][0], allLogs[5][2]}, - nil, nil, nil, - }, - // match none due to only matching new mined logs - { - ethereum.FilterQuery{}, - nil, - nil, nil, nil, - }, - // match none due to only matching mined logs within a specific block range - { - ethereum.FilterQuery{FromBlock: big.NewInt(1), ToBlock: big.NewInt(2)}, - nil, - nil, nil, nil, - }, - // match all due to matching mined and pending logs - { - ethereum.FilterQuery{FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64())}, - flattenLogs(allLogs), - nil, nil, nil, - }, - // match none due to matching logs from a specific block number to new mined blocks - { - ethereum.FilterQuery{FromBlock: big.NewInt(1), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, - nil, - nil, nil, nil, - }, - } - ) - - // create all subscriptions, this ensures all subscriptions are created before the events are posted. - // on slow machines this could otherwise lead to missing events when the subscription is created after - // (some) events are posted. - for i := range testCases { - testCases[i].c = make(chan []*types.Log) - testCases[i].err = make(chan error) - - var err error - testCases[i].sub, err = api.events.SubscribeLogs(testCases[i].crit, testCases[i].c) - if err != nil { - t.Fatalf("SubscribeLogs %d failed: %v\n", i, err) - } - } - - for n, test := range testCases { - i := n - tt := test - go func() { - defer tt.sub.Unsubscribe() - - var fetched []*types.Log - - timeout := time.After(1 * time.Second) - fetchLoop: - for { - select { - case logs := <-tt.c: - // Do not break early if we've fetched greater, or equal, - // to the number of logs expected. This ensures we do not - // deadlock the filter system because it will do a blocking - // send on this channel if another log arrives. - fetched = append(fetched, logs...) - case <-timeout: - break fetchLoop - } - } - - if len(fetched) != len(tt.expected) { - tt.err <- fmt.Errorf("invalid number of logs for case %d, want %d log(s), got %d", i, len(tt.expected), len(fetched)) - return - } - - for l := range fetched { - if fetched[l].Removed { - tt.err <- fmt.Errorf("expected log not to be removed for log %d in case %d", l, i) - return - } - if !reflect.DeepEqual(fetched[l], tt.expected[l]) { - tt.err <- fmt.Errorf("invalid log on index %d for case %d\n", l, i) - return - } - } - tt.err <- nil - }() - } - - // raise events - for _, ev := range allLogs { - backend.pendingLogsFeed.Send(ev) - } - - for i := range testCases { - err := <-testCases[i].err - if err != nil { - t.Fatalf("test %d failed: %v", i, err) - } - <-testCases[i].sub.Err() - } -} - -func TestLightFilterLogs(t *testing.T) { - t.Parallel() - - var ( - db = rawdb.NewMemoryDatabase() - backend, sys = newTestFilterSystem(t, db, Config{}) - api = NewFilterAPI(sys, true) - signer = types.HomesteadSigner{} - - firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") - secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") - thirdAddress = common.HexToAddress("0x3333333333333333333333333333333333333333") - notUsedAddress = common.HexToAddress("0x9999999999999999999999999999999999999999") - firstTopic = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111") - secondTopic = common.HexToHash("0x2222222222222222222222222222222222222222222222222222222222222222") - - // posted twice, once as regular logs and once as pending logs. - allLogs = []*types.Log{ - // Block 1 - {Address: firstAddr, Topics: []common.Hash{}, Data: []byte{}, BlockNumber: 2, Index: 0}, - // Block 2 - {Address: firstAddr, Topics: []common.Hash{firstTopic}, Data: []byte{}, BlockNumber: 3, Index: 0}, - {Address: secondAddr, Topics: []common.Hash{firstTopic}, Data: []byte{}, BlockNumber: 3, Index: 1}, - {Address: thirdAddress, Topics: []common.Hash{secondTopic}, Data: []byte{}, BlockNumber: 3, Index: 2}, - // Block 3 - {Address: thirdAddress, Topics: []common.Hash{secondTopic}, Data: []byte{}, BlockNumber: 4, Index: 0}, - } - - testCases = []struct { - crit FilterCriteria - expected []*types.Log - id rpc.ID - }{ - // match all - 0: {FilterCriteria{}, allLogs, ""}, - // match none due to no matching addresses - 1: {FilterCriteria{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{nil}}, []*types.Log{}, ""}, - // match logs based on addresses, ignore topics - 2: {FilterCriteria{Addresses: []common.Address{firstAddr}}, allLogs[:2], ""}, - // match logs based on addresses and topics - 3: {FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}}, allLogs[3:5], ""}, - // all logs with block num >= 3 - 4: {FilterCriteria{FromBlock: big.NewInt(3), ToBlock: big.NewInt(5)}, allLogs[1:], ""}, - // all logs - 5: {FilterCriteria{FromBlock: big.NewInt(0), ToBlock: big.NewInt(5)}, allLogs, ""}, - // all logs with 1>= block num <=2 and topic secondTopic - 6: {FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(3), Topics: [][]common.Hash{{secondTopic}}}, allLogs[3:4], ""}, - } - - key, _ = crypto.GenerateKey() - addr = crypto.PubkeyToAddress(key.PublicKey) - genesis = &core.Genesis{Config: params.TestChainConfig, - Alloc: types.GenesisAlloc{ - addr: {Balance: big.NewInt(params.Ether)}, - }, - } - receipts = []*types.Receipt{{ - Logs: []*types.Log{allLogs[0]}, - }, { - Logs: []*types.Log{allLogs[1], allLogs[2], allLogs[3]}, - }, { - Logs: []*types.Log{allLogs[4]}, - }} - ) - - _, blocks, _ := core.GenerateChainWithGenesis(genesis, ethash.NewFaker(), 4, func(i int, b *core.BlockGen) { - if i == 0 { - return - } - receipts[i-1].Bloom = types.CreateBloom(types.Receipts{receipts[i-1]}) - b.AddUncheckedReceipt(receipts[i-1]) - tx, _ := types.SignTx(types.NewTx(&types.LegacyTx{Nonce: uint64(i - 1), To: &common.Address{}, Value: big.NewInt(1000), Gas: params.TxGas, GasPrice: big.NewInt(2100), Data: nil}), signer, key) - b.AddTx(tx) - }) - for i, block := range blocks { - rawdb.WriteBlock(db, block) - rawdb.WriteCanonicalHash(db, block.Hash(), block.NumberU64()) - rawdb.WriteHeadBlockHash(db, block.Hash()) - if i > 0 { - rawdb.WriteReceipts(db, block.Hash(), block.NumberU64(), []*types.Receipt{receipts[i-1]}) - } - } - // create all filters - for i := range testCases { - id, err := api.NewFilter(testCases[i].crit) - if err != nil { - t.Fatal(err) - } - testCases[i].id = id - } - - // raise events - time.Sleep(1 * time.Second) - for _, block := range blocks { - backend.chainFeed.Send(core.ChainEvent{Block: block, Hash: common.Hash{}, Logs: allLogs}) - } - - for i, tt := range testCases { - var fetched []*types.Log - timeout := time.Now().Add(1 * time.Second) - for { // fetch all expected logs - results, err := api.GetFilterChanges(tt.id) - if err != nil { - t.Fatalf("Unable to fetch logs: %v", err) - } - fetched = append(fetched, results.([]*types.Log)...) - if len(fetched) >= len(tt.expected) { - break - } - // check timeout - if time.Now().After(timeout) { - break - } - - time.Sleep(100 * time.Millisecond) - } - - if len(fetched) != len(tt.expected) { - t.Errorf("invalid number of logs for case %d, want %d log(s), got %d", i, len(tt.expected), len(fetched)) - return - } - - for l := range fetched { - if fetched[l].Removed { - t.Errorf("expected log not to be removed for log %d in case %d", l, i) - } - expected := *tt.expected[l] - blockNum := expected.BlockNumber - 1 - expected.BlockHash = blocks[blockNum].Hash() - expected.TxHash = blocks[blockNum].Transactions()[0].Hash() - if !reflect.DeepEqual(fetched[l], &expected) { - t.Errorf("invalid log on index %d for case %d", l, i) - } - } - } -} - // TestPendingTxFilterDeadlock tests if the event loop hangs when pending // txes arrive at the same time that one of multiple filters is timing out. // Please refer to #22131 for more details. @@ -889,7 +550,7 @@ func TestPendingTxFilterDeadlock(t *testing.T) { var ( db = rawdb.NewMemoryDatabase() backend, sys = newTestFilterSystem(t, db, Config{Timeout: timeout}) - api = NewFilterAPI(sys, false) + api = NewFilterAPI(sys) done = make(chan struct{}) ) @@ -945,11 +606,3 @@ func TestPendingTxFilterDeadlock(t *testing.T) { t.Error("Tx sending loop hangs") } } - -func flattenLogs(pl [][]*types.Log) []*types.Log { - var logs []*types.Log - for _, l := range pl { - logs = append(logs, l...) - } - return logs -} diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go index b14c78bc3eb4..6a10b0874d10 100644 --- a/eth/filters/filter_test.go +++ b/eth/filters/filter_test.go @@ -99,8 +99,8 @@ func TestFilters(t *testing.T) { config.Eip1559Block = big.NewInt(0) var ( - db = rawdb.NewMemoryDatabase() - _, sys = newTestFilterSystem(t, db, Config{}) + db = rawdb.NewMemoryDatabase() + backend, sys = newTestFilterSystem(t, db, Config{}) // Sender account key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") addr = crypto.PubkeyToAddress(key1.PublicKey) @@ -265,8 +265,7 @@ func TestFilters(t *testing.T) { }), signer, key1) gen.AddTx(tx) }) - sys.backend.(*testBackend).pendingBlock = pchain[0] - sys.backend.(*testBackend).pendingReceipts = preceipts[0] + backend.setPending(pchain[0], preceipts[0]) for i, tc := range []struct { f *Filter @@ -301,22 +300,22 @@ func TestFilters(t *testing.T) { want: `[{"address":"0xfe00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696334"],"data":"0x","blockNumber":"0x3e8","transactionHash":"0x21fd39694cbcc8cc5046b3b7d5200101edf9c85218da613a8851eb5e3d195241","transactionIndex":"0x0","blockHash":"0x8a956d79ca6468ff23c97615a4aa24a55bdaff78767ee28d3e2e02ecb407a0de","logIndex":"0x0","removed":false}]`, }, { f: sys.NewRangeFilter(int64(rpc.FinalizedBlockNumber), int64(rpc.LatestBlockNumber), nil, nil, 0), - err: "committed header not found", + err: "finalized header not found", }, { f: sys.NewRangeFilter(int64(rpc.FinalizedBlockNumber), int64(rpc.FinalizedBlockNumber), nil, nil, 0), - err: "committed header not found", + err: "finalized header not found", }, { f: sys.NewRangeFilter(int64(rpc.LatestBlockNumber), int64(rpc.FinalizedBlockNumber), nil, nil, 0), - err: "committed header not found", + err: "finalized header not found", }, { - f: sys.NewRangeFilter(int64(rpc.PendingBlockNumber), int64(rpc.PendingBlockNumber), nil, nil, 0), - want: `[{"address":"0xfe00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696335"],"data":"0x","blockNumber":"0x3e9","transactionHash":"0x87d02f2dddb1941ff179ae5d5fbb123afb9c5f71220045bc1c48f3872be24d4a","transactionIndex":"0x0","blockHash":"0x5f2b35a350840476a43aa23c6ea031d6db277aef337775ebb8421df64f17723f","logIndex":"0x0","removed":false}]`, + f: sys.NewRangeFilter(int64(rpc.PendingBlockNumber), int64(rpc.PendingBlockNumber), nil, nil, 0), + err: errPendingLogsUnsupported.Error(), }, { - f: sys.NewRangeFilter(int64(rpc.LatestBlockNumber), int64(rpc.PendingBlockNumber), nil, nil, 0), - want: `[{"address":"0xfe00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696334"],"data":"0x","blockNumber":"0x3e8","transactionHash":"0x21fd39694cbcc8cc5046b3b7d5200101edf9c85218da613a8851eb5e3d195241","transactionIndex":"0x0","blockHash":"0x8a956d79ca6468ff23c97615a4aa24a55bdaff78767ee28d3e2e02ecb407a0de","logIndex":"0x0","removed":false},{"address":"0xfe00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696335"],"data":"0x","blockNumber":"0x3e9","transactionHash":"0x87d02f2dddb1941ff179ae5d5fbb123afb9c5f71220045bc1c48f3872be24d4a","transactionIndex":"0x0","blockHash":"0x5f2b35a350840476a43aa23c6ea031d6db277aef337775ebb8421df64f17723f","logIndex":"0x0","removed":false}]`, + f: sys.NewRangeFilter(int64(rpc.LatestBlockNumber), int64(rpc.PendingBlockNumber), nil, nil, 0), + err: errPendingLogsUnsupported.Error(), }, { f: sys.NewRangeFilter(int64(rpc.PendingBlockNumber), int64(rpc.LatestBlockNumber), nil, nil, 0), - err: "invalid block range", + err: errPendingLogsUnsupported.Error(), }, } { logs, err := tc.f.Logs(context.Background()) @@ -338,7 +337,7 @@ func TestFilters(t *testing.T) { } t.Run("timeout", func(t *testing.T) { - f := sys.NewRangeFilter(0, -1, nil, nil, 0) + f := sys.NewRangeFilter(0, rpc.LatestBlockNumber.Int64(), nil, nil, 0) ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-time.Hour)) defer cancel() _, err := f.Logs(ctx)