Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ types/pb/tendermint
.vscode/launch.json
.vscode/settings.json
vendor
*.test
*/**.html
*.idea
*.env
Expand Down
136 changes: 136 additions & 0 deletions block/internal/da/tracing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package da

import (
"context"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"

datypes "github.com/evstack/ev-node/pkg/da/types"
)

// tracedClient decorates a FullClient with OpenTelemetry spans.
type tracedClient struct {
inner FullClient
tracer trace.Tracer
}

// WithTracingClient decorates the provided client with tracing spans.
func WithTracingClient(inner FullClient) FullClient {
return &tracedClient{inner: inner, tracer: otel.Tracer("ev-node/da")}
}

func (t *tracedClient) Submit(ctx context.Context, data [][]byte, gasPrice float64, namespace []byte, options []byte) datypes.ResultSubmit {
total := 0
for _, b := range data {
total += len(b)
}
ctx, span := t.tracer.Start(ctx, "DA.Submit",
trace.WithAttributes(
attribute.Int("blob.count", len(data)),
attribute.Int("blob.total_size_bytes", total),
attribute.String("da.namespace", string(namespace)),
),
)
defer span.End()

res := t.inner.Submit(ctx, data, gasPrice, namespace, options)
if res.Code != datypes.StatusSuccess {
span.RecordError(&submitError{msg: res.Message})
span.SetStatus(codes.Error, res.Message)
} else {
span.SetAttributes(attribute.Int64("da.height", int64(res.Height)))
}
return res
}

func (t *tracedClient) Retrieve(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve {
ctx, span := t.tracer.Start(ctx, "DA.Retrieve",
trace.WithAttributes(
attribute.Int("ns.length", len(namespace)),
attribute.String("da.namespace", string(namespace)),
),
)
defer span.End()

res := t.inner.Retrieve(ctx, height, namespace)

if res.Code != datypes.StatusSuccess && res.Code != datypes.StatusNotFound {
span.RecordError(&submitError{msg: res.Message})
span.SetStatus(codes.Error, res.Message)
} else {
span.SetAttributes(attribute.Int("blob.count", len(res.Data)))
}
return res
}

func (t *tracedClient) Get(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error) {
ctx, span := t.tracer.Start(ctx, "DA.Get",
trace.WithAttributes(
attribute.Int("id.count", len(ids)),
attribute.String("da.namespace", string(namespace)),
),
)
defer span.End()

blobs, err := t.inner.Get(ctx, ids, namespace)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}
span.SetAttributes(attribute.Int("blob.count", len(blobs)))
return blobs, nil
}

func (t *tracedClient) GetProofs(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Proof, error) {
ctx, span := t.tracer.Start(ctx, "DA.GetProofs",
trace.WithAttributes(
attribute.Int("id.count", len(ids)),
attribute.String("da.namespace", string(namespace)),
),
)
defer span.End()

proofs, err := t.inner.GetProofs(ctx, ids, namespace)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}
span.SetAttributes(attribute.Int("proof.count", len(proofs)))
return proofs, nil
}

func (t *tracedClient) Validate(ctx context.Context, ids []datypes.ID, proofs []datypes.Proof, namespace []byte) ([]bool, error) {
ctx, span := t.tracer.Start(ctx, "DA.Validate",
trace.WithAttributes(
attribute.Int("id.count", len(ids)),
attribute.String("da.namespace", string(namespace)),
),
)
defer span.End()
res, err := t.inner.Validate(ctx, ids, proofs, namespace)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}
span.SetAttributes(attribute.Int("result.count", len(res)))
return res, nil
}

func (t *tracedClient) GetHeaderNamespace() []byte { return t.inner.GetHeaderNamespace() }
func (t *tracedClient) GetDataNamespace() []byte { return t.inner.GetDataNamespace() }
func (t *tracedClient) GetForcedInclusionNamespace() []byte {
return t.inner.GetForcedInclusionNamespace()
}
func (t *tracedClient) HasForcedInclusionNamespace() bool {
return t.inner.HasForcedInclusionNamespace()
}

type submitError struct{ msg string }

func (e *submitError) Error() string { return e.msg }
222 changes: 222 additions & 0 deletions block/internal/da/tracing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
package da

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"

datypes "github.com/evstack/ev-node/pkg/da/types"
)

// mockFullClient provides function hooks for testing the tracing decorator.
type mockFullClient struct {
submitFn func(ctx context.Context, data [][]byte, gasPrice float64, namespace []byte, options []byte) datypes.ResultSubmit
retrieveFn func(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve
getFn func(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error)
getProofsFn func(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Proof, error)
validateFn func(ctx context.Context, ids []datypes.ID, proofs []datypes.Proof, namespace []byte) ([]bool, error)
}

func (m *mockFullClient) Submit(ctx context.Context, data [][]byte, gasPrice float64, namespace []byte, options []byte) datypes.ResultSubmit {
if m.submitFn != nil {
return m.submitFn(ctx, data, gasPrice, namespace, options)
}
return datypes.ResultSubmit{}
}
func (m *mockFullClient) Retrieve(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve {
if m.retrieveFn != nil {
return m.retrieveFn(ctx, height, namespace)
}
return datypes.ResultRetrieve{}
}
func (m *mockFullClient) Get(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error) {
if m.getFn != nil {
return m.getFn(ctx, ids, namespace)
}
return nil, nil
}
func (m *mockFullClient) GetProofs(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Proof, error) {
if m.getProofsFn != nil {
return m.getProofsFn(ctx, ids, namespace)
}
return nil, nil
}
func (m *mockFullClient) Validate(ctx context.Context, ids []datypes.ID, proofs []datypes.Proof, namespace []byte) ([]bool, error) {
if m.validateFn != nil {
return m.validateFn(ctx, ids, proofs, namespace)
}
return nil, nil
}
func (m *mockFullClient) GetHeaderNamespace() []byte { return []byte{0x01} }
func (m *mockFullClient) GetDataNamespace() []byte { return []byte{0x02} }
func (m *mockFullClient) GetForcedInclusionNamespace() []byte { return []byte{0x03} }
func (m *mockFullClient) HasForcedInclusionNamespace() bool { return true }

// setup a tracer provider + span recorder
func setupDATrace(t *testing.T, inner FullClient) (FullClient, *tracetest.SpanRecorder) {
t.Helper()
sr := tracetest.NewSpanRecorder()
tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr))
t.Cleanup(func() { _ = tp.Shutdown(context.Background()) })
otel.SetTracerProvider(tp)
return WithTracingClient(inner), sr
}

func TestTracedDA_Submit_Success(t *testing.T) {
mock := &mockFullClient{
submitFn: func(ctx context.Context, data [][]byte, _ float64, _ []byte, _ []byte) datypes.ResultSubmit {
return datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Height: 123}}
},
}
client, sr := setupDATrace(t, mock)
ctx := context.Background()

_ = client.Submit(ctx, [][]byte{[]byte("a"), []byte("bc")}, -1.0, []byte{0xaa, 0xbb}, nil)

spans := sr.Ended()
require.Len(t, spans, 1)
span := spans[0]
require.Equal(t, "DA.Submit", span.Name())

Check failure on line 86 in block/internal/da/tracing_test.go

View workflow job for this annotation

GitHub Actions / lint / golangci-lint

File is not properly formatted (gofmt)
require.Equal(t, codes.Unset, span.Status().Code)

attrs := span.Attributes()
requireAttribute(t, attrs, "blob.count", 2)
requireAttribute(t, attrs, "blob.total_size_bytes", 3)
// namespace length assertion for local behavior
// look for da.namespace and assert byte-length
foundNS := false
for _, a := range attrs {
if string(a.Key) == "da.namespace" {
foundNS = true
require.Equal(t, 2, len(a.Value.AsString()))
}
}
require.True(t, foundNS, "attribute da.namespace not found")
}

func TestTracedDA_Submit_Error(t *testing.T) {
mock := &mockFullClient{
submitFn: func(ctx context.Context, data [][]byte, _ float64, _ []byte, _ []byte) datypes.ResultSubmit {
return datypes.ResultSubmit{BaseResult: datypes.BaseResult{Code: datypes.StatusError, Message: "boom"}}
},
}
client, sr := setupDATrace(t, mock)
ctx := context.Background()

_ = client.Submit(ctx, [][]byte{[]byte("a")}, -1.0, []byte{0xaa}, nil)

spans := sr.Ended()
require.Len(t, spans, 1)
span := spans[0]
require.Equal(t, codes.Error, span.Status().Code)
require.Equal(t, "boom", span.Status().Description)
}

func TestTracedDA_Retrieve_Success(t *testing.T) {
mock := &mockFullClient{
retrieveFn: func(ctx context.Context, height uint64, _ []byte) datypes.ResultRetrieve {
return datypes.ResultRetrieve{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Height: height}, Data: []datypes.Blob{{}, {}}}
},
}
client, sr := setupDATrace(t, mock)
ctx := context.Background()

_ = client.Retrieve(ctx, 42, []byte{0x01})
spans := sr.Ended()
require.Len(t, spans, 1)
span := spans[0]
require.Equal(t, "DA.Retrieve", span.Name())
attrs := span.Attributes()
requireAttribute(t, attrs, "ns.length", 1)
requireAttribute(t, attrs, "blob.count", 2)
}

func TestTracedDA_Retrieve_Error(t *testing.T) {
mock := &mockFullClient{
retrieveFn: func(ctx context.Context, height uint64, _ []byte) datypes.ResultRetrieve {
return datypes.ResultRetrieve{BaseResult: datypes.BaseResult{Code: datypes.StatusError, Message: "oops"}}
},
}
client, sr := setupDATrace(t, mock)
ctx := context.Background()

_ = client.Retrieve(ctx, 7, []byte{0x02})
spans := sr.Ended()
require.Len(t, spans, 1)
span := spans[0]
require.Equal(t, codes.Error, span.Status().Code)
require.Equal(t, "oops", span.Status().Description)
}

func TestTracedDA_Get_Success(t *testing.T) {
mock := &mockFullClient{
getFn: func(ctx context.Context, ids []datypes.ID, _ []byte) ([]datypes.Blob, error) {
return []datypes.Blob{{}, {}}, nil
},
}
client, sr := setupDATrace(t, mock)
ctx := context.Background()
ids := []datypes.ID{[]byte{0x01}, []byte{0x02}}

blobs, err := client.Get(ctx, ids, []byte{0x01})
require.NoError(t, err)
require.Len(t, blobs, 2)

spans := sr.Ended()
require.Len(t, spans, 1)
span := spans[0]
require.Equal(t, "DA.Get", span.Name())
attrs := span.Attributes()
requireAttribute(t, attrs, "id.count", 2)
requireAttribute(t, attrs, "blob.count", 2)
}

func TestTracedDA_Get_Error(t *testing.T) {
mock := &mockFullClient{
getFn: func(ctx context.Context, ids []datypes.ID, _ []byte) ([]datypes.Blob, error) {
return nil, errors.New("get failed")
},
}
client, sr := setupDATrace(t, mock)
ctx := context.Background()
ids := []datypes.ID{[]byte{0x01}}

_, err := client.Get(ctx, ids, []byte{0x01})
require.Error(t, err)

spans := sr.Ended()
require.Len(t, spans, 1)
span := spans[0]
require.Equal(t, codes.Error, span.Status().Code)
require.Equal(t, "get failed", span.Status().Description)
}

// helper copied from eth tracing tests
func requireAttribute(t *testing.T, attrs []attribute.KeyValue, key string, expected interface{}) {
t.Helper()
found := false
for _, attr := range attrs {
if string(attr.Key) == key {
found = true
switch v := expected.(type) {
case string:
require.Equal(t, v, attr.Value.AsString())
case int64:
require.Equal(t, v, attr.Value.AsInt64())
case int:
require.Equal(t, int64(v), attr.Value.AsInt64())
default:
t.Fatalf("unsupported attribute type: %T", expected)
}
break
}
}
require.True(t, found, "attribute %s not found", key)
}
6 changes: 5 additions & 1 deletion block/public.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,18 @@ func NewDAClient(
config config.Config,
logger zerolog.Logger,
) FullDAClient {
return da.NewClient(da.Config{
base := da.NewClient(da.Config{
DA: blobRPC,
Logger: logger,
Namespace: config.DA.GetNamespace(),
DefaultTimeout: config.DA.RequestTimeout.Duration,
DataNamespace: config.DA.GetDataNamespace(),
ForcedInclusionNamespace: config.DA.GetForcedInclusionNamespace(),
})
if config.Instrumentation.IsTracingEnabled() {
return da.WithTracingClient(base)
}
return base
}

// ErrForceInclusionNotConfigured is returned when force inclusion is not configured.
Expand Down
Binary file added da.test
Binary file not shown.
Loading