diff --git a/accounts/abi/bind/backends/simulated.go b/accounts/abi/bind/backends/simulated.go index b7a775606669..87d44265e5d3 100644 --- a/accounts/abi/bind/backends/simulated.go +++ b/accounts/abi/bind/backends/simulated.go @@ -153,7 +153,7 @@ func NewXDCSimulatedBackend(alloc types.GenesisAlloc, gasLimit uint64, chainConf filterBackend := &filterBackend{database, blockchain, backend} backend.filterSystem = filters.NewFilterSystem(filterBackend, filters.Config{}) - backend.events = filters.NewEventSystem(backend.filterSystem, false) + backend.events = filters.NewEventSystem(backend.filterSystem) blockchain.Client = backend @@ -181,7 +181,7 @@ func NewSimulatedBackend(alloc types.GenesisAlloc, gasLimit uint64) *SimulatedBa filterBackend := &filterBackend{database, blockchain, backend} backend.filterSystem = filters.NewFilterSystem(filterBackend, filters.Config{}) - backend.events = filters.NewEventSystem(backend.filterSystem, false) + backend.events = filters.NewEventSystem(backend.filterSystem) header := backend.blockchain.CurrentBlock() block := backend.blockchain.GetBlock(header.Hash(), header.Number.Uint64()) @@ -834,7 +834,7 @@ func (b *SimulatedBackend) FilterLogs(ctx context.Context, query ethereum.Filter if query.FromBlock != nil { from = query.FromBlock.Int64() } - to := int64(-1) + to := int64(rpc.LatestBlockNumber) if query.ToBlock != nil { to = query.ToBlock.Int64() } diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index c3b1a6ffd0ce..78bc42502b0c 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -1862,7 +1862,7 @@ func RegisterFilterAPI(stack *node.Node, backend ethapi.Backend, ethcfg *ethconf }) stack.RegisterAPIs([]rpc.API{{ Namespace: "eth", - Service: filters.NewFilterAPI(filterSystem, false), + Service: filters.NewFilterAPI(filterSystem), }}) return filterSystem } diff --git a/eth/backend.go b/eth/backend.go index fe6cb386e76b..018a5c449a70 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -426,7 +426,7 @@ func (e *Ethereum) APIs() []rpc.API { Service: downloader.NewDownloaderAPI(e.protocolManager.downloader, e.eventMux), }, { Namespace: "eth", - Service: filters.NewFilterAPI(filters.NewFilterSystem(e.APIBackend, filters.Config{LogCacheSize: e.config.FilterLogCacheSize}), false), + Service: filters.NewFilterAPI(filters.NewFilterSystem(e.APIBackend, filters.Config{LogCacheSize: e.config.FilterLogCacheSize})), }, { Namespace: "admin", Service: NewAdminAPI(e), diff --git a/eth/filters/api.go b/eth/filters/api.go index 6af73caea55b..e3ca8492ef3a 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -33,13 +33,14 @@ import ( ) var ( - errInvalidTopic = invalidParamsErr("invalid topic(s)") - errInvalidBlockRange = invalidParamsErr("invalid block range params") - errBlockHashWithRange = invalidParamsErr("can't specify fromBlock/toBlock with blockHash") - errUnknownBlock = errors.New("unknown block") - errFilterNotFound = errors.New("filter not found") - errExceedMaxTopics = errors.New("exceed max topics") - errExceedLogQueryLimit = errors.New("exceed max addresses or topics per search position") + errInvalidTopic = invalidParamsErr("invalid topic(s)") + errInvalidBlockRange = invalidParamsErr("invalid block range params") + errBlockHashWithRange = invalidParamsErr("can't specify fromBlock/toBlock with blockHash") + errPendingLogsUnsupported = invalidParamsErr("pending logs are not supported") + errUnknownBlock = errors.New("unknown block") + errFilterNotFound = errors.New("filter not found") + errExceedMaxTopics = errors.New("exceed max topics") + errExceedLogQueryLimit = errors.New("exceed max addresses or topics per search position") ) type invalidParamsError struct { @@ -83,10 +84,10 @@ type FilterAPI struct { } // NewFilterAPI returns a new FilterAPI instance. -func NewFilterAPI(system *FilterSystem, lightMode bool) *FilterAPI { +func NewFilterAPI(system *FilterSystem) *FilterAPI { api := &FilterAPI{ sys: system, - events: NewEventSystem(system, lightMode), + events: NewEventSystem(system), filters: make(map[rpc.ID]*filter), timeout: system.cfg.Timeout, logQueryLimit: system.cfg.LogQueryLimit, @@ -481,7 +482,7 @@ func (api *FilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) { txs := f.txs f.txs = nil return txs, nil - case LogsSubscription, MinedAndPendingLogsSubscription: + case LogsSubscription: logs := f.logs f.logs = nil return returnLogs(logs), nil diff --git a/eth/filters/filter.go b/eth/filters/filter.go index b0da021ff3d2..0617f2d1fbf9 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -110,54 +110,55 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { } return f.blockLogs(ctx, header) } - var ( - beginPending = f.begin == rpc.PendingBlockNumber.Int64() - endPending = f.end == rpc.PendingBlockNumber.Int64() - ) - // special case for pending logs - if beginPending && !endPending { - return nil, errors.New("invalid block range") + // Disallow pending logs. + if f.begin == rpc.PendingBlockNumber.Int64() || f.end == rpc.PendingBlockNumber.Int64() { + return nil, errPendingLogsUnsupported } - // Short-cut if all we care about is pending logs - if beginPending && endPending { - return f.pendingLogs(), nil - } - - resolveSpecial := func(number int64) (int64, error) { - var hdr *types.Header + resolveSpecial := func(number int64) (uint64, error) { switch number { - case rpc.LatestBlockNumber.Int64(), rpc.PendingBlockNumber.Int64(): - // we should return head here since we've already captured - // that we need to get the pending logs in the pending boolean above - hdr, _ = f.sys.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber) + case rpc.LatestBlockNumber.Int64(): + hdr, _ := f.sys.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber) if hdr == nil { return 0, errors.New("latest header not found") } + return hdr.Number.Uint64(), nil case rpc.FinalizedBlockNumber.Int64(): - hdr, _ = f.sys.backend.HeaderByNumber(ctx, rpc.FinalizedBlockNumber) + hdr, _ := f.sys.backend.HeaderByNumber(ctx, rpc.FinalizedBlockNumber) if hdr == nil { - return 0, errors.New("committed header not found") + return 0, errors.New("finalized header not found") } + return hdr.Number.Uint64(), nil + case rpc.EarliestBlockNumber.Int64(): + hdr, _ := f.sys.backend.HeaderByNumber(ctx, rpc.EarliestBlockNumber) + if hdr == nil { + return 0, errors.New("earliest header not found") + } + return hdr.Number.Uint64(), nil default: - return number, nil + if number < 0 { + return 0, errors.New("negative block number") + } + return uint64(number), nil } - return hdr.Number.Int64(), nil } - var err error // range query need to resolve the special begin/end block number - if f.begin, err = resolveSpecial(f.begin); err != nil { + begin, err := resolveSpecial(f.begin) + if err != nil { return nil, err } - if f.end, err = resolveSpecial(f.end); err != nil { + end, err := resolveSpecial(f.end) + if err != nil { return nil, err } - if f.rangeLimit != 0 && (uint64(f.end)-uint64(f.begin)) > f.rangeLimit { + if f.rangeLimit != 0 && (end-begin) > f.rangeLimit { return nil, fmt.Errorf("exceed maximum block range: %d", f.rangeLimit) } + f.begin = int64(begin) + f.end = int64(end) logChan, errChan := f.rangeLogsAsync(ctx) var logs []*types.Log for { @@ -165,16 +166,7 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { case log := <-logChan: logs = append(logs, log) case err := <-errChan: - if err != nil { - // if an error occurs during extraction, we do return the extracted data - return logs, err - } - // Append the pending ones - if endPending { - pendingLogs := f.pendingLogs() - logs = append(logs, pendingLogs...) - } - return logs, nil + return logs, err } } } @@ -332,22 +324,6 @@ func (f *Filter) checkMatches(ctx context.Context, header *types.Header) ([]*typ return logs, nil } -// pendingLogs returns the logs matching the filter criteria within the pending block. -func (f *Filter) pendingLogs() []*types.Log { - block, receipts := f.sys.backend.PendingBlockAndReceipts() - if block == nil { - return nil - } - if bloomFilter(block.Bloom(), f.addresses, f.topics) { - var unfiltered []*types.Log - for _, r := range receipts { - unfiltered = append(unfiltered, r.Logs...) - } - return filterLogs(unfiltered, nil, nil, f.addresses, f.topics) - } - return nil -} - func includes(addresses []common.Address, a common.Address) bool { for _, addr := range addresses { if addr == a { diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 13c4a72c8c07..1b29126b12b5 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -30,7 +30,6 @@ import ( "github.com/XinFinOrg/XDPoSChain/common/lru" "github.com/XinFinOrg/XDPoSChain/core" "github.com/XinFinOrg/XDPoSChain/core/bloombits" - "github.com/XinFinOrg/XDPoSChain/core/rawdb" "github.com/XinFinOrg/XDPoSChain/core/types" "github.com/XinFinOrg/XDPoSChain/ethdb" "github.com/XinFinOrg/XDPoSChain/event" @@ -64,7 +63,6 @@ type Backend interface { GetBody(ctx context.Context, hash common.Hash, number rpc.BlockNumber) (*types.Body, error) GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) GetLogs(ctx context.Context, blockHash common.Hash, number uint64) ([][]*types.Log, error) - PendingBlockAndReceipts() (*types.Block, types.Receipts) CurrentHeader() *types.Header ChainConfig() *params.ChainConfig @@ -72,7 +70,6 @@ type Backend interface { SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription - SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription BloomStatus() (uint64, uint64) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) @@ -154,10 +151,6 @@ const ( UnknownSubscription Type = iota // LogsSubscription queries for new or removed (chain reorg) logs LogsSubscription - // PendingLogsSubscription queries for logs in pending blocks - PendingLogsSubscription - // MinedAndPendingLogsSubscription queries for logs in mined and pending blocks. - MinedAndPendingLogsSubscription // PendingTransactionsSubscription queries for pending transactions entering // the pending state PendingTransactionsSubscription @@ -194,26 +187,22 @@ type subscription struct { // EventSystem creates subscriptions, processes events and broadcasts them to the // subscription which match the subscription criteria. type EventSystem struct { - backend Backend - sys *FilterSystem - lightMode bool - lastHead *types.Header + backend Backend + sys *FilterSystem // Subscriptions - txsSub event.Subscription // Subscription for new transaction event - logsSub event.Subscription // Subscription for new log event - rmLogsSub event.Subscription // Subscription for removed log event - pendingLogsSub event.Subscription // Subscription for pending log event - chainSub event.Subscription // Subscription for new chain event + txsSub event.Subscription // Subscription for new transaction event + logsSub event.Subscription // Subscription for new log event + rmLogsSub event.Subscription // Subscription for removed log event + chainSub event.Subscription // Subscription for new chain event // Channels - install chan *subscription // install filter for event notification - uninstall chan *subscription // remove filter for event notification - txsCh chan core.NewTxsEvent // Channel to receive new transactions event - logsCh chan []*types.Log // Channel to receive new log event - pendingLogsCh chan []*types.Log // Channel to receive new log event - rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event - chainCh chan core.ChainEvent // Channel to receive new chain event + install chan *subscription // install filter for event notification + uninstall chan *subscription // remove filter for event notification + txsCh chan core.NewTxsEvent // Channel to receive new transactions event + logsCh chan []*types.Log // Channel to receive new log event + rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event + chainCh chan core.ChainEvent // Channel to receive new chain event } // NewEventSystem creates a new manager that listens for event on the given mux, @@ -222,18 +211,16 @@ type EventSystem struct { // // The returned manager has a loop that needs to be stopped with the Stop function // or by stopping the given mux. -func NewEventSystem(sys *FilterSystem, lightMode bool) *EventSystem { +func NewEventSystem(sys *FilterSystem) *EventSystem { m := &EventSystem{ - sys: sys, - backend: sys.backend, - lightMode: lightMode, - install: make(chan *subscription), - uninstall: make(chan *subscription), - txsCh: make(chan core.NewTxsEvent, txChanSize), - logsCh: make(chan []*types.Log, logsChanSize), - rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize), - pendingLogsCh: make(chan []*types.Log, logsChanSize), - chainCh: make(chan core.ChainEvent, chainEvChanSize), + sys: sys, + backend: sys.backend, + install: make(chan *subscription), + uninstall: make(chan *subscription), + txsCh: make(chan core.NewTxsEvent, txChanSize), + logsCh: make(chan []*types.Log, logsChanSize), + rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize), + chainCh: make(chan core.ChainEvent, chainEvChanSize), } // Subscribe events @@ -241,10 +228,9 @@ func NewEventSystem(sys *FilterSystem, lightMode bool) *EventSystem { m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh) m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh) m.chainSub = m.backend.SubscribeChainEvent(m.chainCh) - m.pendingLogsSub = m.backend.SubscribePendingLogsEvent(m.pendingLogsCh) // Make sure none of the subscriptions are empty - if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogsSub == nil { + if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil { log.Crit("Subscribe for event system failed") } @@ -327,10 +313,11 @@ func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*typ to = rpc.BlockNumber(crit.ToBlock.Int64()) } - // only interested in pending logs - if from == rpc.PendingBlockNumber && to == rpc.PendingBlockNumber { - return es.subscribePendingLogs(crit, logs), nil + // Pending logs are not supported anymore. + if from == rpc.PendingBlockNumber || to == rpc.PendingBlockNumber { + return nil, errPendingLogsUnsupported } + // only interested in new mined logs if from == rpc.LatestBlockNumber && to == rpc.LatestBlockNumber { return es.subscribeLogs(crit, logs), nil @@ -339,10 +326,6 @@ func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*typ if from >= 0 && to >= 0 && to >= from { return es.subscribeLogs(crit, logs), nil } - // interested in mined logs from a specific block number, new logs and pending logs - if from >= rpc.LatestBlockNumber && to == rpc.PendingBlockNumber { - return es.subscribeMinedPendingLogs(crit, logs), nil - } // interested in logs from a specific block number to new mined blocks if from >= 0 && to == rpc.LatestBlockNumber { return es.subscribeLogs(crit, logs), nil @@ -350,23 +333,6 @@ func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*typ return nil, errInvalidBlockRange } -// subscribeMinedPendingLogs creates a subscription that returned mined and -// pending logs that match the given criteria. -func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription { - sub := &subscription{ - id: rpc.NewID(), - typ: MinedAndPendingLogsSubscription, - logsCrit: crit, - created: time.Now(), - logs: logs, - txs: make(chan []*types.Transaction), - headers: make(chan *types.Header), - installed: make(chan struct{}), - err: make(chan error), - } - return es.subscribe(sub) -} - // subscribeLogs creates a subscription that will write all logs matching the // given criteria to the given logs channel. func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription { @@ -384,23 +350,6 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ return es.subscribe(sub) } -// subscribePendingLogs creates a subscription that writes transaction hashes for -// transactions that enter the transaction pool. -func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription { - sub := &subscription{ - id: rpc.NewID(), - typ: PendingLogsSubscription, - logsCrit: crit, - created: time.Now(), - logs: logs, - txs: make(chan []*types.Transaction), - headers: make(chan *types.Header), - installed: make(chan struct{}), - err: make(chan error), - } - return es.subscribe(sub) -} - // SubscribeNewHeads creates a subscription that writes the header of a block that is // imported in the chain. func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscription { @@ -447,18 +396,6 @@ func (es *EventSystem) handleLogs(filters filterIndex, ev []*types.Log) { } } -func (es *EventSystem) handlePendingLogs(filters filterIndex, ev []*types.Log) { - if len(ev) == 0 { - return - } - for _, f := range filters[PendingLogsSubscription] { - matchedLogs := filterLogs(ev, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics) - if len(matchedLogs) > 0 { - f.logs <- matchedLogs - } - } -} - func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent) { for _, f := range filters[PendingTransactionsSubscription] { f.txs <- ev.Txs @@ -469,91 +406,6 @@ func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent) for _, f := range filters[BlocksSubscription] { f.headers <- ev.Block.Header() } - if es.lightMode && len(filters[LogsSubscription]) > 0 { - es.lightFilterNewHead(ev.Block.Header(), func(header *types.Header, remove bool) { - for _, f := range filters[LogsSubscription] { - if f.logsCrit.FromBlock != nil && header.Number.Cmp(f.logsCrit.FromBlock) < 0 { - continue - } - if f.logsCrit.ToBlock != nil && header.Number.Cmp(f.logsCrit.ToBlock) > 0 { - continue - } - if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 { - f.logs <- matchedLogs - } - } - }) - } -} - -func (es *EventSystem) lightFilterNewHead(newHeader *types.Header, callBack func(*types.Header, bool)) { - oldh := es.lastHead - es.lastHead = newHeader - if oldh == nil { - return - } - newh := newHeader - // find common ancestor, create list of rolled back and new block hashes - var oldHeaders, newHeaders []*types.Header - for oldh.Hash() != newh.Hash() { - if oldh.Number.Uint64() >= newh.Number.Uint64() { - oldHeaders = append(oldHeaders, oldh) - oldh = rawdb.ReadHeader(es.backend.ChainDb(), oldh.ParentHash, oldh.Number.Uint64()-1) - } - if oldh.Number.Uint64() < newh.Number.Uint64() { - newHeaders = append(newHeaders, newh) - newh = rawdb.ReadHeader(es.backend.ChainDb(), newh.ParentHash, newh.Number.Uint64()-1) - if newh == nil { - // happens when CHT syncing, nothing to do - newh = oldh - } - } - } - // roll back old blocks - for _, h := range oldHeaders { - callBack(h, true) - } - // check new blocks (array is in reverse order) - for i := len(newHeaders) - 1; i >= 0; i-- { - callBack(newHeaders[i], false) - } -} - -// filter logs of a single header in light client mode -func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.Address, topics [][]common.Hash, remove bool) []*types.Log { - if !bloomFilter(header.Bloom, addresses, topics) { - return nil - } - // Get the logs of the block - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - defer cancel() - cached, err := es.sys.cachedLogElem(ctx, header.Hash(), header.Number.Uint64()) - if err != nil { - return nil - } - unfiltered := append([]*types.Log{}, cached.logs...) - for i, log := range unfiltered { - // Don't modify in-cache elements - logcopy := *log - logcopy.Removed = remove - // Swap copy in-place - unfiltered[i] = &logcopy - } - logs := filterLogs(unfiltered, nil, nil, addresses, topics) - // Txhash is already resolved - if len(logs) > 0 && logs[0].TxHash != (common.Hash{}) { - return logs - } - // Resolve txhash - body, err := es.sys.cachedGetBody(ctx, cached, header.Hash(), header.Number.Uint64()) - if err != nil { - return nil - } - for _, log := range logs { - // logs are already copied, safe to modify - log.TxHash = body.Transactions[log.TxIndex].Hash() - } - return logs } // eventLoop (un)installs filters and processes mux events. @@ -563,7 +415,6 @@ func (es *EventSystem) eventLoop() { es.txsSub.Unsubscribe() es.logsSub.Unsubscribe() es.rmLogsSub.Unsubscribe() - es.pendingLogsSub.Unsubscribe() es.chainSub.Unsubscribe() }() @@ -580,29 +431,15 @@ func (es *EventSystem) eventLoop() { es.handleLogs(index, ev) case ev := <-es.rmLogsCh: es.handleLogs(index, ev.Logs) - case ev := <-es.pendingLogsCh: - es.handlePendingLogs(index, ev) case ev := <-es.chainCh: es.handleChainEvent(index, ev) case f := <-es.install: - if f.typ == MinedAndPendingLogsSubscription { - // the type are logs and pending logs subscriptions - index[LogsSubscription][f.id] = f - index[PendingLogsSubscription][f.id] = f - } else { - index[f.typ][f.id] = f - } + index[f.typ][f.id] = f close(f.installed) case f := <-es.uninstall: - if f.typ == MinedAndPendingLogsSubscription { - // the type are logs and pending logs subscriptions - delete(index[LogsSubscription], f.id) - delete(index[PendingLogsSubscription], f.id) - } else { - delete(index[f.typ], f.id) - } + delete(index[f.typ], f.id) close(f.err) // System stopped diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index bf01ef9aa499..e3949c803f42 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -19,7 +19,6 @@ package filters import ( "context" "errors" - "fmt" "math/big" "math/rand" "reflect" @@ -27,7 +26,6 @@ import ( "testing" "time" - ethereum "github.com/XinFinOrg/XDPoSChain" "github.com/XinFinOrg/XDPoSChain/common" "github.com/XinFinOrg/XDPoSChain/consensus/ethash" "github.com/XinFinOrg/XDPoSChain/core" @@ -35,7 +33,6 @@ import ( "github.com/XinFinOrg/XDPoSChain/core/rawdb" "github.com/XinFinOrg/XDPoSChain/core/types" "github.com/XinFinOrg/XDPoSChain/core/vm" - "github.com/XinFinOrg/XDPoSChain/crypto" "github.com/XinFinOrg/XDPoSChain/ethdb" "github.com/XinFinOrg/XDPoSChain/event" "github.com/XinFinOrg/XDPoSChain/params" @@ -49,7 +46,6 @@ type testBackend struct { txFeed event.Feed logsFeed event.Feed rmLogsFeed event.Feed - pendingLogsFeed event.Feed chainFeed event.Feed pendingBlock *types.Block pendingReceipts types.Receipts @@ -116,10 +112,6 @@ func (b *testBackend) GetLogs(ctx context.Context, hash common.Hash, number uint return logs, nil } -func (b *testBackend) PendingBlockAndReceipts() (*types.Block, types.Receipts) { - return b.pendingBlock, b.pendingReceipts -} - func (b *testBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription { return b.txFeed.Subscribe(ch) } @@ -132,10 +124,6 @@ func (b *testBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscript return b.logsFeed.Subscribe(ch) } -func (b *testBackend) SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription { - return b.pendingLogsFeed.Subscribe(ch) -} - func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription { return b.chainFeed.Subscribe(ch) } @@ -171,6 +159,11 @@ func (b *testBackend) ServiceFilter(ctx context.Context, session *bloombits.Matc }() } +func (b *testBackend) setPending(block *types.Block, receipts types.Receipts) { + b.pendingBlock = block + b.pendingReceipts = receipts +} + func newTestFilterSystem(t testing.TB, db ethdb.Database, cfg Config) (*testBackend, *FilterSystem) { backend := &testBackend{db: db} sys := NewFilterSystem(backend, cfg) @@ -188,13 +181,13 @@ func TestBlockSubscription(t *testing.T) { var ( db = rawdb.NewMemoryDatabase() backend, sys = newTestFilterSystem(t, db, Config{}) - api = NewFilterAPI(sys, false) + api = NewFilterAPI(sys) genesis = (&core.Genesis{ Config: params.TestChainConfig, BaseFee: big.NewInt(params.InitialBaseFee), }).MustCommit(db) chain, _ = core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 10, func(i int, gen *core.BlockGen) {}) - chainEvents = []core.ChainEvent{} + chainEvents []core.ChainEvent ) for _, blk := range chain { @@ -243,7 +236,7 @@ func TestPendingTxFilter(t *testing.T) { var ( db = rawdb.NewMemoryDatabase() backend, sys = newTestFilterSystem(t, db, Config{}) - api = NewFilterAPI(sys, false) + api = NewFilterAPI(sys) transactions = []*types.Transaction{ types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil), @@ -298,7 +291,7 @@ func TestLogFilterCreation(t *testing.T) { var ( db = rawdb.NewMemoryDatabase() _, sys = newTestFilterSystem(t, db, Config{}) - api = NewFilterAPI(sys, false) + api = NewFilterAPI(sys) testCases = []struct { crit FilterCriteria @@ -310,8 +303,6 @@ func TestLogFilterCreation(t *testing.T) { {FilterCriteria{FromBlock: big.NewInt(1), ToBlock: big.NewInt(2)}, true}, // "mined" block range to pending {FilterCriteria{FromBlock: big.NewInt(1), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, true}, - // new mined and pending blocks - {FilterCriteria{FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64())}, true}, // from block "higher" than to block {FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(1)}, false}, // from block "higher" than to block @@ -344,7 +335,7 @@ func TestInvalidLogFilterCreation(t *testing.T) { var ( db = rawdb.NewMemoryDatabase() _, sys = newTestFilterSystem(t, db, Config{LogQueryLimit: 1000}) - api = NewFilterAPI(sys, false) + api = NewFilterAPI(sys) ) // different situations where log filter creation should fail. @@ -375,7 +366,7 @@ func TestInvalidGetLogsRequest(t *testing.T) { } db, blocks, _ = core.GenerateChainWithGenesis(genesis, ethash.NewFaker(), 10, func(i int, gen *core.BlockGen) {}) _, sys = newTestFilterSystem(t, db, Config{LogQueryLimit: 10}) - api = NewFilterAPI(sys, false) + api = NewFilterAPI(sys) blockHash = blocks[0].Hash() unknownBlockHash = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111") ) @@ -439,7 +430,7 @@ func TestInvalidGetRangeLogsRequest(t *testing.T) { var ( db = rawdb.NewMemoryDatabase() _, sys = newTestFilterSystem(t, db, Config{}) - api = NewFilterAPI(sys, false) + api = NewFilterAPI(sys) ) if _, err := api.GetLogs(context.Background(), FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(1)}); err != errInvalidBlockRange { @@ -454,7 +445,7 @@ func TestLogFilter(t *testing.T) { var ( db = rawdb.NewMemoryDatabase() backend, sys = newTestFilterSystem(t, db, Config{}) - api = NewFilterAPI(sys, false) + api = NewFilterAPI(sys) firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") @@ -473,9 +464,6 @@ func TestLogFilter(t *testing.T) { {Address: thirdAddress, Topics: []common.Hash{secondTopic}, BlockNumber: 3}, } - expectedCase7 = []*types.Log{allLogs[3], allLogs[4], allLogs[0], allLogs[1], allLogs[2], allLogs[3], allLogs[4]} - expectedCase11 = []*types.Log{allLogs[1], allLogs[2], allLogs[1], allLogs[2]} - testCases = []struct { crit FilterCriteria expected []*types.Log @@ -493,20 +481,14 @@ func TestLogFilter(t *testing.T) { 4: {FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}}, allLogs[3:5], ""}, // match logs based on multiple addresses and "or" topics 5: {FilterCriteria{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}}, allLogs[2:5], ""}, - // logs in the pending block - 6: {FilterCriteria{Addresses: []common.Address{firstAddr}, FromBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64())}, allLogs[:2], ""}, - // mined logs with block num >= 2 or pending logs - 7: {FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64())}, expectedCase7, ""}, // all "mined" logs with block num >= 2 - 8: {FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, allLogs[3:], ""}, + 6: {FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, allLogs[3:], ""}, // all "mined" logs - 9: {FilterCriteria{ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, allLogs, ""}, + 7: {FilterCriteria{ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, allLogs, ""}, // all "mined" logs with 1>= block num <=2 and topic secondTopic - 10: {FilterCriteria{FromBlock: big.NewInt(1), ToBlock: big.NewInt(2), Topics: [][]common.Hash{{secondTopic}}}, allLogs[3:4], ""}, - // all "mined" and pending logs with topic firstTopic - 11: {FilterCriteria{FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), Topics: [][]common.Hash{{firstTopic}}}, expectedCase11, ""}, + 8: {FilterCriteria{FromBlock: big.NewInt(1), ToBlock: big.NewInt(2), Topics: [][]common.Hash{{secondTopic}}}, allLogs[3:4], ""}, // match all logs due to wildcard topic - 12: {FilterCriteria{Topics: [][]common.Hash{nil}}, allLogs[1:], ""}, + 9: {FilterCriteria{Topics: [][]common.Hash{nil}}, allLogs[1:], ""}, } ) @@ -520,9 +502,6 @@ func TestLogFilter(t *testing.T) { if nsend := backend.logsFeed.Send(allLogs); nsend == 0 { t.Fatal("Logs event not delivered") } - if nsend := backend.pendingLogsFeed.Send(allLogs); nsend == 0 { - t.Fatal("Pending logs event not delivered") - } for i, tt := range testCases { var fetched []*types.Log @@ -530,7 +509,7 @@ func TestLogFilter(t *testing.T) { for { // fetch all expected logs results, err := api.GetFilterChanges(tt.id) if err != nil { - t.Fatalf("Unable to fetch logs: %v", err) + t.Fatalf("test %d: unable to fetch logs: %v", i, err) } fetched = append(fetched, results.([]*types.Log)...) @@ -561,324 +540,6 @@ func TestLogFilter(t *testing.T) { } } -// TestPendingLogsSubscription tests if a subscription receives the correct pending logs that are posted to the event feed. -func TestPendingLogsSubscription(t *testing.T) { - t.Parallel() - - var ( - db = rawdb.NewMemoryDatabase() - backend, sys = newTestFilterSystem(t, db, Config{}) - api = NewFilterAPI(sys, false) - - firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") - secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") - thirdAddress = common.HexToAddress("0x3333333333333333333333333333333333333333") - notUsedAddress = common.HexToAddress("0x9999999999999999999999999999999999999999") - firstTopic = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111") - secondTopic = common.HexToHash("0x2222222222222222222222222222222222222222222222222222222222222222") - thirdTopic = common.HexToHash("0x3333333333333333333333333333333333333333333333333333333333333333") - fourthTopic = common.HexToHash("0x4444444444444444444444444444444444444444444444444444444444444444") - notUsedTopic = common.HexToHash("0x9999999999999999999999999999999999999999999999999999999999999999") - - allLogs = [][]*types.Log{ - {{Address: firstAddr, Topics: []common.Hash{}, BlockNumber: 0}}, - {{Address: firstAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 1}}, - {{Address: secondAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 2}}, - {{Address: thirdAddress, Topics: []common.Hash{secondTopic}, BlockNumber: 3}}, - {{Address: thirdAddress, Topics: []common.Hash{secondTopic}, BlockNumber: 4}}, - { - {Address: thirdAddress, Topics: []common.Hash{firstTopic}, BlockNumber: 5}, - {Address: thirdAddress, Topics: []common.Hash{thirdTopic}, BlockNumber: 5}, - {Address: thirdAddress, Topics: []common.Hash{fourthTopic}, BlockNumber: 5}, - {Address: firstAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 5}, - }, - } - - pendingBlockNumber = big.NewInt(rpc.PendingBlockNumber.Int64()) - - testCases = []struct { - crit ethereum.FilterQuery - expected []*types.Log - c chan []*types.Log - sub *Subscription - err chan error - }{ - // match all - { - ethereum.FilterQuery{FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber}, - flattenLogs(allLogs), - nil, nil, nil, - }, - // match none due to no matching addresses - { - ethereum.FilterQuery{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{nil}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber}, - nil, - nil, nil, nil, - }, - // match logs based on addresses, ignore topics - { - ethereum.FilterQuery{Addresses: []common.Address{firstAddr}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber}, - append(flattenLogs(allLogs[:2]), allLogs[5][3]), - nil, nil, nil, - }, - // match none due to no matching topics (match with address) - { - ethereum.FilterQuery{Addresses: []common.Address{secondAddr}, Topics: [][]common.Hash{{notUsedTopic}}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber}, - nil, - nil, nil, nil, - }, - // match logs based on addresses and topics - { - ethereum.FilterQuery{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber}, - append(flattenLogs(allLogs[3:5]), allLogs[5][0]), - nil, nil, nil, - }, - // match logs based on multiple addresses and "or" topics - { - ethereum.FilterQuery{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber}, - append(flattenLogs(allLogs[2:5]), allLogs[5][0]), - nil, nil, nil, - }, - // multiple pending logs, should match only 2 topics from the logs in block 5 - { - ethereum.FilterQuery{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, fourthTopic}}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber}, - []*types.Log{allLogs[5][0], allLogs[5][2]}, - nil, nil, nil, - }, - // match none due to only matching new mined logs - { - ethereum.FilterQuery{}, - nil, - nil, nil, nil, - }, - // match none due to only matching mined logs within a specific block range - { - ethereum.FilterQuery{FromBlock: big.NewInt(1), ToBlock: big.NewInt(2)}, - nil, - nil, nil, nil, - }, - // match all due to matching mined and pending logs - { - ethereum.FilterQuery{FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64())}, - flattenLogs(allLogs), - nil, nil, nil, - }, - // match none due to matching logs from a specific block number to new mined blocks - { - ethereum.FilterQuery{FromBlock: big.NewInt(1), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, - nil, - nil, nil, nil, - }, - } - ) - - // create all subscriptions, this ensures all subscriptions are created before the events are posted. - // on slow machines this could otherwise lead to missing events when the subscription is created after - // (some) events are posted. - for i := range testCases { - testCases[i].c = make(chan []*types.Log) - testCases[i].err = make(chan error) - - var err error - testCases[i].sub, err = api.events.SubscribeLogs(testCases[i].crit, testCases[i].c) - if err != nil { - t.Fatalf("SubscribeLogs %d failed: %v\n", i, err) - } - } - - for n, test := range testCases { - i := n - tt := test - go func() { - defer tt.sub.Unsubscribe() - - var fetched []*types.Log - - timeout := time.After(1 * time.Second) - fetchLoop: - for { - select { - case logs := <-tt.c: - // Do not break early if we've fetched greater, or equal, - // to the number of logs expected. This ensures we do not - // deadlock the filter system because it will do a blocking - // send on this channel if another log arrives. - fetched = append(fetched, logs...) - case <-timeout: - break fetchLoop - } - } - - if len(fetched) != len(tt.expected) { - tt.err <- fmt.Errorf("invalid number of logs for case %d, want %d log(s), got %d", i, len(tt.expected), len(fetched)) - return - } - - for l := range fetched { - if fetched[l].Removed { - tt.err <- fmt.Errorf("expected log not to be removed for log %d in case %d", l, i) - return - } - if !reflect.DeepEqual(fetched[l], tt.expected[l]) { - tt.err <- fmt.Errorf("invalid log on index %d for case %d\n", l, i) - return - } - } - tt.err <- nil - }() - } - - // raise events - for _, ev := range allLogs { - backend.pendingLogsFeed.Send(ev) - } - - for i := range testCases { - err := <-testCases[i].err - if err != nil { - t.Fatalf("test %d failed: %v", i, err) - } - <-testCases[i].sub.Err() - } -} - -func TestLightFilterLogs(t *testing.T) { - t.Parallel() - - var ( - db = rawdb.NewMemoryDatabase() - backend, sys = newTestFilterSystem(t, db, Config{}) - api = NewFilterAPI(sys, true) - signer = types.HomesteadSigner{} - - firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") - secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") - thirdAddress = common.HexToAddress("0x3333333333333333333333333333333333333333") - notUsedAddress = common.HexToAddress("0x9999999999999999999999999999999999999999") - firstTopic = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111") - secondTopic = common.HexToHash("0x2222222222222222222222222222222222222222222222222222222222222222") - - // posted twice, once as regular logs and once as pending logs. - allLogs = []*types.Log{ - // Block 1 - {Address: firstAddr, Topics: []common.Hash{}, Data: []byte{}, BlockNumber: 2, Index: 0}, - // Block 2 - {Address: firstAddr, Topics: []common.Hash{firstTopic}, Data: []byte{}, BlockNumber: 3, Index: 0}, - {Address: secondAddr, Topics: []common.Hash{firstTopic}, Data: []byte{}, BlockNumber: 3, Index: 1}, - {Address: thirdAddress, Topics: []common.Hash{secondTopic}, Data: []byte{}, BlockNumber: 3, Index: 2}, - // Block 3 - {Address: thirdAddress, Topics: []common.Hash{secondTopic}, Data: []byte{}, BlockNumber: 4, Index: 0}, - } - - testCases = []struct { - crit FilterCriteria - expected []*types.Log - id rpc.ID - }{ - // match all - 0: {FilterCriteria{}, allLogs, ""}, - // match none due to no matching addresses - 1: {FilterCriteria{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{nil}}, []*types.Log{}, ""}, - // match logs based on addresses, ignore topics - 2: {FilterCriteria{Addresses: []common.Address{firstAddr}}, allLogs[:2], ""}, - // match logs based on addresses and topics - 3: {FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}}, allLogs[3:5], ""}, - // all logs with block num >= 3 - 4: {FilterCriteria{FromBlock: big.NewInt(3), ToBlock: big.NewInt(5)}, allLogs[1:], ""}, - // all logs - 5: {FilterCriteria{FromBlock: big.NewInt(0), ToBlock: big.NewInt(5)}, allLogs, ""}, - // all logs with 1>= block num <=2 and topic secondTopic - 6: {FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(3), Topics: [][]common.Hash{{secondTopic}}}, allLogs[3:4], ""}, - } - - key, _ = crypto.GenerateKey() - addr = crypto.PubkeyToAddress(key.PublicKey) - genesis = &core.Genesis{Config: params.TestChainConfig, - Alloc: types.GenesisAlloc{ - addr: {Balance: big.NewInt(params.Ether)}, - }, - } - receipts = []*types.Receipt{{ - Logs: []*types.Log{allLogs[0]}, - }, { - Logs: []*types.Log{allLogs[1], allLogs[2], allLogs[3]}, - }, { - Logs: []*types.Log{allLogs[4]}, - }} - ) - - _, blocks, _ := core.GenerateChainWithGenesis(genesis, ethash.NewFaker(), 4, func(i int, b *core.BlockGen) { - if i == 0 { - return - } - receipts[i-1].Bloom = types.CreateBloom(types.Receipts{receipts[i-1]}) - b.AddUncheckedReceipt(receipts[i-1]) - tx, _ := types.SignTx(types.NewTx(&types.LegacyTx{Nonce: uint64(i - 1), To: &common.Address{}, Value: big.NewInt(1000), Gas: params.TxGas, GasPrice: big.NewInt(2100), Data: nil}), signer, key) - b.AddTx(tx) - }) - for i, block := range blocks { - rawdb.WriteBlock(db, block) - rawdb.WriteCanonicalHash(db, block.Hash(), block.NumberU64()) - rawdb.WriteHeadBlockHash(db, block.Hash()) - if i > 0 { - rawdb.WriteReceipts(db, block.Hash(), block.NumberU64(), []*types.Receipt{receipts[i-1]}) - } - } - // create all filters - for i := range testCases { - id, err := api.NewFilter(testCases[i].crit) - if err != nil { - t.Fatal(err) - } - testCases[i].id = id - } - - // raise events - time.Sleep(1 * time.Second) - for _, block := range blocks { - backend.chainFeed.Send(core.ChainEvent{Block: block, Hash: common.Hash{}, Logs: allLogs}) - } - - for i, tt := range testCases { - var fetched []*types.Log - timeout := time.Now().Add(1 * time.Second) - for { // fetch all expected logs - results, err := api.GetFilterChanges(tt.id) - if err != nil { - t.Fatalf("Unable to fetch logs: %v", err) - } - fetched = append(fetched, results.([]*types.Log)...) - if len(fetched) >= len(tt.expected) { - break - } - // check timeout - if time.Now().After(timeout) { - break - } - - time.Sleep(100 * time.Millisecond) - } - - if len(fetched) != len(tt.expected) { - t.Errorf("invalid number of logs for case %d, want %d log(s), got %d", i, len(tt.expected), len(fetched)) - return - } - - for l := range fetched { - if fetched[l].Removed { - t.Errorf("expected log not to be removed for log %d in case %d", l, i) - } - expected := *tt.expected[l] - blockNum := expected.BlockNumber - 1 - expected.BlockHash = blocks[blockNum].Hash() - expected.TxHash = blocks[blockNum].Transactions()[0].Hash() - if !reflect.DeepEqual(fetched[l], &expected) { - t.Errorf("invalid log on index %d for case %d", l, i) - } - } - } -} - // TestPendingTxFilterDeadlock tests if the event loop hangs when pending // txes arrive at the same time that one of multiple filters is timing out. // Please refer to #22131 for more details. @@ -889,7 +550,7 @@ func TestPendingTxFilterDeadlock(t *testing.T) { var ( db = rawdb.NewMemoryDatabase() backend, sys = newTestFilterSystem(t, db, Config{Timeout: timeout}) - api = NewFilterAPI(sys, false) + api = NewFilterAPI(sys) done = make(chan struct{}) ) @@ -945,11 +606,3 @@ func TestPendingTxFilterDeadlock(t *testing.T) { t.Error("Tx sending loop hangs") } } - -func flattenLogs(pl [][]*types.Log) []*types.Log { - var logs []*types.Log - for _, l := range pl { - logs = append(logs, l...) - } - return logs -} diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go index b14c78bc3eb4..6a10b0874d10 100644 --- a/eth/filters/filter_test.go +++ b/eth/filters/filter_test.go @@ -99,8 +99,8 @@ func TestFilters(t *testing.T) { config.Eip1559Block = big.NewInt(0) var ( - db = rawdb.NewMemoryDatabase() - _, sys = newTestFilterSystem(t, db, Config{}) + db = rawdb.NewMemoryDatabase() + backend, sys = newTestFilterSystem(t, db, Config{}) // Sender account key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") addr = crypto.PubkeyToAddress(key1.PublicKey) @@ -265,8 +265,7 @@ func TestFilters(t *testing.T) { }), signer, key1) gen.AddTx(tx) }) - sys.backend.(*testBackend).pendingBlock = pchain[0] - sys.backend.(*testBackend).pendingReceipts = preceipts[0] + backend.setPending(pchain[0], preceipts[0]) for i, tc := range []struct { f *Filter @@ -301,22 +300,22 @@ func TestFilters(t *testing.T) { want: `[{"address":"0xfe00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696334"],"data":"0x","blockNumber":"0x3e8","transactionHash":"0x21fd39694cbcc8cc5046b3b7d5200101edf9c85218da613a8851eb5e3d195241","transactionIndex":"0x0","blockHash":"0x8a956d79ca6468ff23c97615a4aa24a55bdaff78767ee28d3e2e02ecb407a0de","logIndex":"0x0","removed":false}]`, }, { f: sys.NewRangeFilter(int64(rpc.FinalizedBlockNumber), int64(rpc.LatestBlockNumber), nil, nil, 0), - err: "committed header not found", + err: "finalized header not found", }, { f: sys.NewRangeFilter(int64(rpc.FinalizedBlockNumber), int64(rpc.FinalizedBlockNumber), nil, nil, 0), - err: "committed header not found", + err: "finalized header not found", }, { f: sys.NewRangeFilter(int64(rpc.LatestBlockNumber), int64(rpc.FinalizedBlockNumber), nil, nil, 0), - err: "committed header not found", + err: "finalized header not found", }, { - f: sys.NewRangeFilter(int64(rpc.PendingBlockNumber), int64(rpc.PendingBlockNumber), nil, nil, 0), - want: `[{"address":"0xfe00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696335"],"data":"0x","blockNumber":"0x3e9","transactionHash":"0x87d02f2dddb1941ff179ae5d5fbb123afb9c5f71220045bc1c48f3872be24d4a","transactionIndex":"0x0","blockHash":"0x5f2b35a350840476a43aa23c6ea031d6db277aef337775ebb8421df64f17723f","logIndex":"0x0","removed":false}]`, + f: sys.NewRangeFilter(int64(rpc.PendingBlockNumber), int64(rpc.PendingBlockNumber), nil, nil, 0), + err: errPendingLogsUnsupported.Error(), }, { - f: sys.NewRangeFilter(int64(rpc.LatestBlockNumber), int64(rpc.PendingBlockNumber), nil, nil, 0), - want: `[{"address":"0xfe00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696334"],"data":"0x","blockNumber":"0x3e8","transactionHash":"0x21fd39694cbcc8cc5046b3b7d5200101edf9c85218da613a8851eb5e3d195241","transactionIndex":"0x0","blockHash":"0x8a956d79ca6468ff23c97615a4aa24a55bdaff78767ee28d3e2e02ecb407a0de","logIndex":"0x0","removed":false},{"address":"0xfe00000000000000000000000000000000000000","topics":["0x0000000000000000000000000000000000000000000000000000746f70696335"],"data":"0x","blockNumber":"0x3e9","transactionHash":"0x87d02f2dddb1941ff179ae5d5fbb123afb9c5f71220045bc1c48f3872be24d4a","transactionIndex":"0x0","blockHash":"0x5f2b35a350840476a43aa23c6ea031d6db277aef337775ebb8421df64f17723f","logIndex":"0x0","removed":false}]`, + f: sys.NewRangeFilter(int64(rpc.LatestBlockNumber), int64(rpc.PendingBlockNumber), nil, nil, 0), + err: errPendingLogsUnsupported.Error(), }, { f: sys.NewRangeFilter(int64(rpc.PendingBlockNumber), int64(rpc.LatestBlockNumber), nil, nil, 0), - err: "invalid block range", + err: errPendingLogsUnsupported.Error(), }, } { logs, err := tc.f.Logs(context.Background()) @@ -338,7 +337,7 @@ func TestFilters(t *testing.T) { } t.Run("timeout", func(t *testing.T) { - f := sys.NewRangeFilter(0, -1, nil, nil, 0) + f := sys.NewRangeFilter(0, rpc.LatestBlockNumber.Int64(), nil, nil, 0) ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-time.Hour)) defer cancel() _, err := f.Logs(ctx)