diff --git a/.gitignore b/.gitignore index 08d1d2bfd..011218ddf 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ types/pb/tendermint .vscode/launch.json .vscode/settings.json vendor +*.test */**.html *.idea *.env diff --git a/block/internal/da/tracing.go b/block/internal/da/tracing.go new file mode 100644 index 000000000..45fae2e86 --- /dev/null +++ b/block/internal/da/tracing.go @@ -0,0 +1,137 @@ +package da + +import ( + "context" + "encoding/hex" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + + datypes "github.com/evstack/ev-node/pkg/da/types" +) + +// tracedClient decorates a FullClient with OpenTelemetry spans. +type tracedClient struct { + inner FullClient + tracer trace.Tracer +} + +// WithTracingClient decorates the provided client with tracing spans. +func WithTracingClient(inner FullClient) FullClient { + return &tracedClient{inner: inner, tracer: otel.Tracer("ev-node/da")} +} + +func (t *tracedClient) Submit(ctx context.Context, data [][]byte, gasPrice float64, namespace []byte, options []byte) datypes.ResultSubmit { + total := 0 + for _, b := range data { + total += len(b) + } + ctx, span := t.tracer.Start(ctx, "DA.Submit", + trace.WithAttributes( + attribute.Int("blob.count", len(data)), + attribute.Int("blob.total_size_bytes", total), + attribute.String("da.namespace", hex.EncodeToString(namespace)), + ), + ) + defer span.End() + + res := t.inner.Submit(ctx, data, gasPrice, namespace, options) + if res.Code != datypes.StatusSuccess { + span.RecordError(&submitError{msg: res.Message}) + span.SetStatus(codes.Error, res.Message) + } else { + span.SetAttributes(attribute.Int64("da.height", int64(res.Height))) + } + return res +} + +func (t *tracedClient) Retrieve(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve { + ctx, span := t.tracer.Start(ctx, "DA.Retrieve", + trace.WithAttributes( + attribute.Int("ns.length", len(namespace)), + attribute.String("da.namespace", hex.EncodeToString(namespace)), + ), + ) + defer span.End() + + res := t.inner.Retrieve(ctx, height, namespace) + + if res.Code != datypes.StatusSuccess && res.Code != datypes.StatusNotFound { + span.RecordError(&submitError{msg: res.Message}) + span.SetStatus(codes.Error, res.Message) + } else { + span.SetAttributes(attribute.Int("blob.count", len(res.Data))) + } + return res +} + +func (t *tracedClient) Get(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error) { + ctx, span := t.tracer.Start(ctx, "DA.Get", + trace.WithAttributes( + attribute.Int("id.count", len(ids)), + attribute.String("da.namespace", hex.EncodeToString(namespace)), + ), + ) + defer span.End() + + blobs, err := t.inner.Get(ctx, ids, namespace) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + span.SetAttributes(attribute.Int("blob.count", len(blobs))) + return blobs, nil +} + +func (t *tracedClient) GetProofs(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Proof, error) { + ctx, span := t.tracer.Start(ctx, "DA.GetProofs", + trace.WithAttributes( + attribute.Int("id.count", len(ids)), + attribute.String("da.namespace", hex.EncodeToString(namespace)), + ), + ) + defer span.End() + + proofs, err := t.inner.GetProofs(ctx, ids, namespace) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + span.SetAttributes(attribute.Int("proof.count", len(proofs))) + return proofs, nil +} + +func (t *tracedClient) Validate(ctx context.Context, ids []datypes.ID, proofs []datypes.Proof, namespace []byte) ([]bool, error) { + ctx, span := t.tracer.Start(ctx, "DA.Validate", + trace.WithAttributes( + attribute.Int("id.count", len(ids)), + attribute.String("da.namespace", hex.EncodeToString(namespace)), + ), + ) + defer span.End() + res, err := t.inner.Validate(ctx, ids, proofs, namespace) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + span.SetAttributes(attribute.Int("result.count", len(res))) + return res, nil +} + +func (t *tracedClient) GetHeaderNamespace() []byte { return t.inner.GetHeaderNamespace() } +func (t *tracedClient) GetDataNamespace() []byte { return t.inner.GetDataNamespace() } +func (t *tracedClient) GetForcedInclusionNamespace() []byte { + return t.inner.GetForcedInclusionNamespace() +} +func (t *tracedClient) HasForcedInclusionNamespace() bool { + return t.inner.HasForcedInclusionNamespace() +} + +type submitError struct{ msg string } + +func (e *submitError) Error() string { return e.msg } diff --git a/block/internal/da/tracing_test.go b/block/internal/da/tracing_test.go new file mode 100644 index 000000000..ca288770c --- /dev/null +++ b/block/internal/da/tracing_test.go @@ -0,0 +1,222 @@ +package da + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + + datypes "github.com/evstack/ev-node/pkg/da/types" +) + +// mockFullClient provides function hooks for testing the tracing decorator. +type mockFullClient struct { + submitFn func(ctx context.Context, data [][]byte, gasPrice float64, namespace []byte, options []byte) datypes.ResultSubmit + retrieveFn func(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve + getFn func(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error) + getProofsFn func(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Proof, error) + validateFn func(ctx context.Context, ids []datypes.ID, proofs []datypes.Proof, namespace []byte) ([]bool, error) +} + +func (m *mockFullClient) Submit(ctx context.Context, data [][]byte, gasPrice float64, namespace []byte, options []byte) datypes.ResultSubmit { + if m.submitFn != nil { + return m.submitFn(ctx, data, gasPrice, namespace, options) + } + return datypes.ResultSubmit{} +} +func (m *mockFullClient) Retrieve(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve { + if m.retrieveFn != nil { + return m.retrieveFn(ctx, height, namespace) + } + return datypes.ResultRetrieve{} +} +func (m *mockFullClient) Get(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error) { + if m.getFn != nil { + return m.getFn(ctx, ids, namespace) + } + return nil, nil +} +func (m *mockFullClient) GetProofs(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Proof, error) { + if m.getProofsFn != nil { + return m.getProofsFn(ctx, ids, namespace) + } + return nil, nil +} +func (m *mockFullClient) Validate(ctx context.Context, ids []datypes.ID, proofs []datypes.Proof, namespace []byte) ([]bool, error) { + if m.validateFn != nil { + return m.validateFn(ctx, ids, proofs, namespace) + } + return nil, nil +} +func (m *mockFullClient) GetHeaderNamespace() []byte { return []byte{0x01} } +func (m *mockFullClient) GetDataNamespace() []byte { return []byte{0x02} } +func (m *mockFullClient) GetForcedInclusionNamespace() []byte { return []byte{0x03} } +func (m *mockFullClient) HasForcedInclusionNamespace() bool { return true } + +// setup a tracer provider + span recorder +func setupDATrace(t *testing.T, inner FullClient) (FullClient, *tracetest.SpanRecorder) { + t.Helper() + sr := tracetest.NewSpanRecorder() + tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr)) + t.Cleanup(func() { _ = tp.Shutdown(context.Background()) }) + otel.SetTracerProvider(tp) + return WithTracingClient(inner), sr +} + +func TestTracedDA_Submit_Success(t *testing.T) { + mock := &mockFullClient{ + submitFn: func(ctx context.Context, data [][]byte, _ float64, _ []byte, _ []byte) datypes.ResultSubmit { + return datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Height: 123}} + }, + } + client, sr := setupDATrace(t, mock) + ctx := context.Background() + + _ = client.Submit(ctx, [][]byte{[]byte("a"), []byte("bc")}, -1.0, []byte{0xaa, 0xbb}, nil) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "DA.Submit", span.Name()) + require.Equal(t, codes.Unset, span.Status().Code) + + attrs := span.Attributes() + requireAttribute(t, attrs, "blob.count", 2) + requireAttribute(t, attrs, "blob.total_size_bytes", 3) + // namespace hex string length assertion + // 2 bytes = 4 hex characters + foundNS := false + for _, a := range attrs { + if string(a.Key) == "da.namespace" { + foundNS = true + require.Equal(t, 4, len(a.Value.AsString())) + } + } + require.True(t, foundNS, "attribute da.namespace not found") +} + +func TestTracedDA_Submit_Error(t *testing.T) { + mock := &mockFullClient{ + submitFn: func(ctx context.Context, data [][]byte, _ float64, _ []byte, _ []byte) datypes.ResultSubmit { + return datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusError, Message: "boom"}} + }, + } + client, sr := setupDATrace(t, mock) + ctx := context.Background() + + _ = client.Submit(ctx, [][]byte{[]byte("a")}, -1.0, []byte{0xaa}, nil) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, codes.Error, span.Status().Code) + require.Equal(t, "boom", span.Status().Description) +} + +func TestTracedDA_Retrieve_Success(t *testing.T) { + mock := &mockFullClient{ + retrieveFn: func(ctx context.Context, height uint64, _ []byte) datypes.ResultRetrieve { + return datypes.ResultRetrieve{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Height: height}, Data: []datypes.Blob{{}, {}}} + }, + } + client, sr := setupDATrace(t, mock) + ctx := context.Background() + + _ = client.Retrieve(ctx, 42, []byte{0x01}) + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "DA.Retrieve", span.Name()) + attrs := span.Attributes() + requireAttribute(t, attrs, "ns.length", 1) + requireAttribute(t, attrs, "blob.count", 2) +} + +func TestTracedDA_Retrieve_Error(t *testing.T) { + mock := &mockFullClient{ + retrieveFn: func(ctx context.Context, height uint64, _ []byte) datypes.ResultRetrieve { + return datypes.ResultRetrieve{BaseResult: datypes.BaseResult{Code: datypes.StatusError, Message: "oops"}} + }, + } + client, sr := setupDATrace(t, mock) + ctx := context.Background() + + _ = client.Retrieve(ctx, 7, []byte{0x02}) + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, codes.Error, span.Status().Code) + require.Equal(t, "oops", span.Status().Description) +} + +func TestTracedDA_Get_Success(t *testing.T) { + mock := &mockFullClient{ + getFn: func(ctx context.Context, ids []datypes.ID, _ []byte) ([]datypes.Blob, error) { + return []datypes.Blob{{}, {}}, nil + }, + } + client, sr := setupDATrace(t, mock) + ctx := context.Background() + ids := []datypes.ID{[]byte{0x01}, []byte{0x02}} + + blobs, err := client.Get(ctx, ids, []byte{0x01}) + require.NoError(t, err) + require.Len(t, blobs, 2) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "DA.Get", span.Name()) + attrs := span.Attributes() + requireAttribute(t, attrs, "id.count", 2) + requireAttribute(t, attrs, "blob.count", 2) +} + +func TestTracedDA_Get_Error(t *testing.T) { + mock := &mockFullClient{ + getFn: func(ctx context.Context, ids []datypes.ID, _ []byte) ([]datypes.Blob, error) { + return nil, errors.New("get failed") + }, + } + client, sr := setupDATrace(t, mock) + ctx := context.Background() + ids := []datypes.ID{[]byte{0x01}} + + _, err := client.Get(ctx, ids, []byte{0x01}) + require.Error(t, err) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, codes.Error, span.Status().Code) + require.Equal(t, "get failed", span.Status().Description) +} + +// helper copied from eth tracing tests +func requireAttribute(t *testing.T, attrs []attribute.KeyValue, key string, expected interface{}) { + t.Helper() + found := false + for _, attr := range attrs { + if string(attr.Key) == key { + found = true + switch v := expected.(type) { + case string: + require.Equal(t, v, attr.Value.AsString()) + case int64: + require.Equal(t, v, attr.Value.AsInt64()) + case int: + require.Equal(t, int64(v), attr.Value.AsInt64()) + default: + t.Fatalf("unsupported attribute type: %T", expected) + } + break + } + } + require.True(t, found, "attribute %s not found", key) +} diff --git a/block/public.go b/block/public.go index 1e3e0de07..54bba68c7 100644 --- a/block/public.go +++ b/block/public.go @@ -48,7 +48,7 @@ func NewDAClient( config config.Config, logger zerolog.Logger, ) FullDAClient { - return da.NewClient(da.Config{ + base := da.NewClient(da.Config{ DA: blobRPC, Logger: logger, Namespace: config.DA.GetNamespace(), @@ -56,6 +56,10 @@ func NewDAClient( DataNamespace: config.DA.GetDataNamespace(), ForcedInclusionNamespace: config.DA.GetForcedInclusionNamespace(), }) + if config.Instrumentation.IsTracingEnabled() { + return da.WithTracingClient(base) + } + return base } // ErrForceInclusionNotConfigured is returned when force inclusion is not configured. diff --git a/da.test b/da.test new file mode 100755 index 000000000..b8a69e2ef Binary files /dev/null and b/da.test differ diff --git a/execution/evm/eth_rpc_client.go b/execution/evm/eth_rpc_client.go new file mode 100644 index 000000000..8799a8177 --- /dev/null +++ b/execution/evm/eth_rpc_client.go @@ -0,0 +1,30 @@ +package evm + +import ( + "context" + "math/big" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethclient" +) + +type ethRPCClient struct { + client *ethclient.Client +} + +func NewEthRPCClient(client *ethclient.Client) EthRPCClient { + return ðRPCClient{client: client} +} + +func (e *ethRPCClient) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { + return e.client.HeaderByNumber(ctx, number) +} + +func (e *ethRPCClient) GetTxs(ctx context.Context) ([]string, error) { + var result []string + err := e.client.Client().CallContext(ctx, &result, "txpoolExt_getTxs") + if err != nil { + return nil, err + } + return result, nil +} diff --git a/execution/evm/eth_rpc_tracing.go b/execution/evm/eth_rpc_tracing.go new file mode 100644 index 000000000..a2842d7f2 --- /dev/null +++ b/execution/evm/eth_rpc_tracing.go @@ -0,0 +1,82 @@ +package evm + +import ( + "context" + "math/big" + + "github.com/ethereum/go-ethereum/core/types" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" +) + +// tracedEthRPCClient wraps an EthRPCClient and records spans for observability. +type tracedEthRPCClient struct { + inner EthRPCClient + tracer trace.Tracer +} + +// withTracingEthRPCClient decorates an EthRPCClient with OpenTelemetry tracing. +func withTracingEthRPCClient(inner EthRPCClient) EthRPCClient { + return &tracedEthRPCClient{ + inner: inner, + tracer: otel.Tracer("ev-node/execution/eth-rpc"), + } +} + +func (t *tracedEthRPCClient) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { + var blockNumber string + if number == nil { + blockNumber = "latest" + } else { + blockNumber = number.String() + } + + ctx, span := t.tracer.Start(ctx, "Eth.GetBlockByNumber", + trace.WithAttributes( + attribute.String("method", "eth_getBlockByNumber"), + attribute.String("block_number", blockNumber), + ), + ) + defer span.End() + + result, err := t.inner.HeaderByNumber(ctx, number) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + + span.SetAttributes( + attribute.String("block_hash", result.Hash().Hex()), + attribute.String("state_root", result.Root.Hex()), + attribute.Int64("gas_limit", int64(result.GasLimit)), + attribute.Int64("gas_used", int64(result.GasUsed)), + attribute.Int64("timestamp", int64(result.Time)), + ) + + return result, nil +} + +func (t *tracedEthRPCClient) GetTxs(ctx context.Context) ([]string, error) { + ctx, span := t.tracer.Start(ctx, "TxPool.GetTxs", + trace.WithAttributes( + attribute.String("method", "txpoolExt_getTxs"), + ), + ) + defer span.End() + + result, err := t.inner.GetTxs(ctx) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return nil, err + } + + span.SetAttributes( + attribute.Int("tx_count", len(result)), + ) + + return result, nil +} diff --git a/execution/evm/eth_rpc_tracing_test.go b/execution/evm/eth_rpc_tracing_test.go new file mode 100644 index 000000000..832a03fef --- /dev/null +++ b/execution/evm/eth_rpc_tracing_test.go @@ -0,0 +1,301 @@ +package evm + +import ( + "context" + "errors" + "math/big" + "testing" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" +) + +// setupTestEthRPCTracing creates a traced eth RPC client with an in-memory span recorder +func setupTestEthRPCTracing(t *testing.T, mockClient EthRPCClient) (EthRPCClient, *tracetest.SpanRecorder) { + t.Helper() + + // create in-memory span recorder + sr := tracetest.NewSpanRecorder() + tp := trace.NewTracerProvider( + trace.WithSpanProcessor(sr), + ) + t.Cleanup(func() { + _ = tp.Shutdown(context.Background()) + }) + + // set as global provider for the test + otel.SetTracerProvider(tp) + + // create traced client + traced := withTracingEthRPCClient(mockClient) + + return traced, sr +} + +// mockEthRPCClient is a simple mock for testing +type mockEthRPCClient struct { + headerByNumberFn func(ctx context.Context, number *big.Int) (*types.Header, error) + getTxsFn func(ctx context.Context) ([]string, error) +} + +func (m *mockEthRPCClient) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { + if m.headerByNumberFn != nil { + return m.headerByNumberFn(ctx, number) + } + return nil, nil +} + +func (m *mockEthRPCClient) GetTxs(ctx context.Context) ([]string, error) { + if m.getTxsFn != nil { + return m.getTxsFn(ctx) + } + return nil, nil +} + +func TestTracedEthRPCClient_HeaderByNumber_Success(t *testing.T) { + expectedHeader := &types.Header{ + GasLimit: 30000000, + GasUsed: 15000000, + Time: 1234567890, + } + + mockClient := &mockEthRPCClient{ + headerByNumberFn: func(ctx context.Context, number *big.Int) (*types.Header, error) { + return expectedHeader, nil + }, + } + + traced, sr := setupTestEthRPCTracing(t, mockClient) + + ctx := context.Background() + blockNumber := big.NewInt(100) + + header, err := traced.HeaderByNumber(ctx, blockNumber) + + require.NoError(t, err) + require.Equal(t, expectedHeader, header) + + // verify span was created + spans := sr.Ended() + require.Len(t, spans, 1) + + span := spans[0] + require.Equal(t, "Eth.GetBlockByNumber", span.Name()) + require.Equal(t, codes.Unset, span.Status().Code) + + // verify attributes + attrs := span.Attributes() + requireAttribute(t, attrs, "method", "eth_getBlockByNumber") + requireAttribute(t, attrs, "block_number", "100") + requireAttribute(t, attrs, "block_hash", expectedHeader.Hash().Hex()) + requireAttribute(t, attrs, "state_root", expectedHeader.Root.Hex()) + requireAttribute(t, attrs, "gas_limit", int64(expectedHeader.GasLimit)) + requireAttribute(t, attrs, "gas_used", int64(expectedHeader.GasUsed)) + requireAttribute(t, attrs, "timestamp", int64(expectedHeader.Time)) +} + +func TestTracedEthRPCClient_HeaderByNumber_Latest(t *testing.T) { + expectedHeader := &types.Header{ + GasLimit: 30000000, + GasUsed: 15000000, + Time: 1234567890, + } + + mockClient := &mockEthRPCClient{ + headerByNumberFn: func(ctx context.Context, number *big.Int) (*types.Header, error) { + require.Nil(t, number, "number should be nil for latest block") + return expectedHeader, nil + }, + } + + traced, sr := setupTestEthRPCTracing(t, mockClient) + + ctx := context.Background() + + header, err := traced.HeaderByNumber(ctx, nil) + + require.NoError(t, err) + require.Equal(t, expectedHeader, header) + + // verify span + spans := sr.Ended() + require.Len(t, spans, 1) + + span := spans[0] + require.Equal(t, "Eth.GetBlockByNumber", span.Name()) + + // verify block_number is "latest" when nil + attrs := span.Attributes() + requireAttribute(t, attrs, "block_number", "latest") +} + +func TestTracedEthRPCClient_HeaderByNumber_Error(t *testing.T) { + expectedErr := errors.New("failed to get block header") + + mockClient := &mockEthRPCClient{ + headerByNumberFn: func(ctx context.Context, number *big.Int) (*types.Header, error) { + return nil, expectedErr + }, + } + + traced, sr := setupTestEthRPCTracing(t, mockClient) + + ctx := context.Background() + blockNumber := big.NewInt(100) + + _, err := traced.HeaderByNumber(ctx, blockNumber) + + require.Error(t, err) + require.Equal(t, expectedErr, err) + + // verify span recorded the error + spans := sr.Ended() + require.Len(t, spans, 1) + + span := spans[0] + require.Equal(t, "Eth.GetBlockByNumber", span.Name()) + require.Equal(t, codes.Error, span.Status().Code) + require.Equal(t, expectedErr.Error(), span.Status().Description) + + // verify error event was recorded + events := span.Events() + require.Len(t, events, 1) + require.Equal(t, "exception", events[0].Name) + + // verify block header attributes NOT set on error + attrs := span.Attributes() + for _, attr := range attrs { + key := string(attr.Key) + require.NotEqual(t, "block_hash", key) + require.NotEqual(t, "state_root", key) + require.NotEqual(t, "gas_limit", key) + require.NotEqual(t, "gas_used", key) + } +} + +func TestTracedEthRPCClient_GetTxs_Success(t *testing.T) { + expectedTxs := []string{"0xabcd", "0xef01", "0x2345"} + + mockClient := &mockEthRPCClient{ + getTxsFn: func(ctx context.Context) ([]string, error) { + return expectedTxs, nil + }, + } + + traced, sr := setupTestEthRPCTracing(t, mockClient) + + ctx := context.Background() + + txs, err := traced.GetTxs(ctx) + + require.NoError(t, err) + require.Equal(t, expectedTxs, txs) + + // verify span was created + spans := sr.Ended() + require.Len(t, spans, 1) + + span := spans[0] + require.Equal(t, "TxPool.GetTxs", span.Name()) + require.Equal(t, codes.Unset, span.Status().Code) + + // verify attributes + attrs := span.Attributes() + requireAttribute(t, attrs, "method", "txpoolExt_getTxs") + requireAttribute(t, attrs, "tx_count", len(expectedTxs)) +} + +func TestTracedEthRPCClient_GetTxs_EmptyPool(t *testing.T) { + mockClient := &mockEthRPCClient{ + getTxsFn: func(ctx context.Context) ([]string, error) { + return []string{}, nil + }, + } + + traced, sr := setupTestEthRPCTracing(t, mockClient) + + ctx := context.Background() + + txs, err := traced.GetTxs(ctx) + + require.NoError(t, err) + require.Empty(t, txs) + + // verify span + spans := sr.Ended() + require.Len(t, spans, 1) + + span := spans[0] + require.Equal(t, "TxPool.GetTxs", span.Name()) + + // verify tx_count is 0 + attrs := span.Attributes() + requireAttribute(t, attrs, "tx_count", 0) +} + +func TestTracedEthRPCClient_GetTxs_Error(t *testing.T) { + expectedErr := errors.New("failed to get transactions") + + mockClient := &mockEthRPCClient{ + getTxsFn: func(ctx context.Context) ([]string, error) { + return nil, expectedErr + }, + } + + traced, sr := setupTestEthRPCTracing(t, mockClient) + + ctx := context.Background() + + _, err := traced.GetTxs(ctx) + + require.Error(t, err) + require.Equal(t, expectedErr, err) + + // verify span recorded the error + spans := sr.Ended() + require.Len(t, spans, 1) + + span := spans[0] + require.Equal(t, "TxPool.GetTxs", span.Name()) + require.Equal(t, codes.Error, span.Status().Code) + require.Equal(t, expectedErr.Error(), span.Status().Description) + + // verify error event was recorded + events := span.Events() + require.Len(t, events, 1) + require.Equal(t, "exception", events[0].Name) + + // verify tx_count NOT set on error + attrs := span.Attributes() + for _, attr := range attrs { + require.NotEqual(t, "tx_count", string(attr.Key)) + } +} + +// requireAttribute is a helper to check span attributes +func requireAttribute(t *testing.T, attrs []attribute.KeyValue, key string, expected interface{}) { + t.Helper() + found := false + for _, attr := range attrs { + if string(attr.Key) == key { + found = true + switch v := expected.(type) { + case string: + require.Equal(t, v, attr.Value.AsString()) + case int64: + require.Equal(t, v, attr.Value.AsInt64()) + case int: + require.Equal(t, int64(v), attr.Value.AsInt64()) + default: + t.Fatalf("unsupported attribute type: %T", expected) + } + break + } + } + require.True(t, found, "attribute %s not found", key) +} diff --git a/execution/evm/execution.go b/execution/evm/execution.go index e360867b5..c310af06d 100644 --- a/execution/evm/execution.go +++ b/execution/evm/execution.go @@ -142,13 +142,22 @@ type EngineRPCClient interface { NewPayload(ctx context.Context, payload *engine.ExecutableData, blobHashes []string, parentBeaconBlockRoot string, executionRequests [][]byte) (*engine.PayloadStatusV1, error) } +// EthRPCClient abstracts Ethereum JSON-RPC calls for tracing and testing. +type EthRPCClient interface { + // HeaderByNumber retrieves a block header by number (nil = latest). + HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) + + // GetTxs retrieves pending transactions from the transaction pool. + GetTxs(ctx context.Context) ([]string, error) +} + // EngineClient represents a client that interacts with an Ethereum execution engine // through the Engine API. It manages connections to both the engine and standard Ethereum // APIs, and maintains state related to block processing. type EngineClient struct { - engineClient EngineRPCClient // Client for Engine API calls - ethClient *ethclient.Client // Client for standard Ethereum API calls - genesisHash common.Hash // Hash of the genesis block + engineClient EngineRPCClient // Client for Engine API calls + ethClient EthRPCClient // Client for standard Ethereum API calls + genesisHash common.Hash // Hash of the genesis block initialHeight uint64 feeRecipient common.Address // Address to receive transaction fees @@ -171,7 +180,7 @@ type EngineClient struct { // execution and crash recovery. The db is wrapped with a prefix to isolate // EVM execution data from other ev-node data. // When tracingEnabled is true, the client will inject W3C trace context headers -// and wrap Engine API calls with OpenTelemetry spans. +// and wrap Engine API and Eth API calls with OpenTelemetry spans. func NewEngineExecutionClient( ethURL, engineURL string, @@ -197,7 +206,7 @@ func NewEngineExecutionClient( if err != nil { return nil, err } - ethClient := ethclient.NewClient(ethRPC) + rawEthClient := ethclient.NewClient(ethRPC) secret, err := decodeSecret(jwtSecret) if err != nil { @@ -223,12 +232,14 @@ func NewEngineExecutionClient( return nil, err } - // raw engine client + // wrap raw clients with interfaces engineClient := NewEngineRPCClient(rawEngineClient) + ethClient := NewEthRPCClient(rawEthClient) - // if tracing enabled, wrap with traced decorator + // if tracing enabled, wrap with traced decorators if tracingEnabled { engineClient = withTracingEngineRPCClient(engineClient) + ethClient = withTracingEthRPCClient(ethClient) } return &EngineClient{ @@ -298,8 +309,7 @@ func (c *EngineClient) InitChain(ctx context.Context, genesisTime time.Time, ini // GetTxs retrieves transactions from the current execution payload func (c *EngineClient) GetTxs(ctx context.Context) ([][]byte, error) { - var result []string - err := c.ethClient.Client().CallContext(ctx, &result, "txpoolExt_getTxs") + result, err := c.ethClient.GetTxs(ctx) if err != nil { return nil, fmt.Errorf("failed to get tx pool content: %w", err) } diff --git a/execution/evm/go.mod b/execution/evm/go.mod index d77618007..602ebf25f 100644 --- a/execution/evm/go.mod +++ b/execution/evm/go.mod @@ -11,6 +11,7 @@ require ( github.com/rs/zerolog v1.34.0 github.com/stretchr/testify v1.11.1 go.opentelemetry.io/otel v1.39.0 + go.opentelemetry.io/otel/sdk v1.38.0 go.opentelemetry.io/otel/trace v1.39.0 google.golang.org/protobuf v1.36.10 ) @@ -20,7 +21,6 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.38.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.38.0 // indirect go.opentelemetry.io/otel/metric v1.39.0 // indirect - go.opentelemetry.io/otel/sdk v1.38.0 // indirect go.opentelemetry.io/proto/otlp v1.7.1 // indirect )