Skip to content

Conversation

@lquerel
Copy link
Contributor

@lquerel lquerel commented Dec 27, 2025

This PR introduces channel sender/receiver metric sets (send/recv counts and error counts, plus capacity) and a consolidated ChannelAttributeSet including channel kind/mode/type/impl and node URN. A new TelemetrySettings.channel_metrics flag gates registration/reporting to avoid overhead when disabled.

I also added few additional otel_debug! to help diagnose pipeline initialization and creation.

I didn't observe any performance regression.

Channel attributes:

/// Channel endpoint attributes (sender or receiver).
#[attribute_set(name = "channel.attrs")]
#[derive(Debug, Clone, Default, Hash)]
pub struct ChannelAttributeSet {
    /// Node attributes.
    #[compose]
    pub node_attrs: NodeAttributeSet,

    /// Unique channel identifier (in scope of the pipeline).
    #[attribute(key = "channel.id")]
    pub channel_id: Cow<'static, str>,
    /// Channel payload kind ("control" or "pdata").
    #[attribute(key = "channel.kind")]
    pub channel_kind: Cow<'static, str>,
    /// Concurrency mode of the channel ("local" or "shared").
    #[attribute(key = "channel.mode")]
    pub channel_mode: Cow<'static, str>,
    /// Channel type ("mpsc", "mpmc", "spsc", "spmc").
    #[attribute(key = "channel.type")]
    pub channel_type: Cow<'static, str>,
    /// Channel implementation ("tokio", "flume", "internal").
    #[attribute(key = "channel.impl")]
    pub channel_impl: Cow<'static, str>,
}

Channel metrics:

#[metric_set(name = "channel.sender")]
#[derive(Debug, Default, Clone)]
pub struct ChannelSenderMetrics {
    /// Count of messages successfully sent to the channel.
    #[metric(name = "send.count", unit = "{message}")]
    pub send_count: Counter<u64>,
    /// Count of send failures due to a full channel.
    #[metric(name = "send.error_full", unit = "{1}")]
    pub send_error_full: Counter<u64>,
    /// Count of send failures due to a closed channel.
    #[metric(name = "send.error_closed", unit = "{1}")]
    pub send_error_closed: Counter<u64>,
    // Total bytes successfully sent (when message size is known).
    // TODO: Populate in a future PR when message sizes are tracked.
    // #[metric(name = "send.bytes", unit = "{By}")]
    // pub send_bytes: Counter<u64>,
}

#[metric_set(name = "channel.receiver")]
#[derive(Debug, Default, Clone)]
pub struct ChannelReceiverMetrics {
    /// Count of messages successfully received from the channel.
    #[metric(name = "recv.count", unit = "{message}")]
    pub recv_count: Counter<u64>,
    /// Count of receive attempts when the channel was empty.
    #[metric(name = "recv.error_empty", unit = "{1}")]
    pub recv_error_empty: Counter<u64>,
    /// Count of receive attempts after the channel was closed.
    #[metric(name = "recv.error_closed", unit = "{1}")]
    pub recv_error_closed: Counter<u64>,
    // Total bytes successfully received (when message size is known).
    // TODO: Populate in a future PR when message sizes are tracked.
    // #[metric(name = "recv.bytes", unit = "{By}")]
    // pub recv_bytes: Counter<u64>,
    // Current number of buffered messages.
    // TODO: Populate in a future PR when queue depth is tracked.
    // #[metric(name = "queue.depth", unit = "{message}")]
    // pub queue_depth: Gauge<u64>,
    /// Maximum channel capacity (buffer size).
    #[metric(name = "capacity", unit = "{message}")]
    pub capacity: Gauge<u64>,
}

PS: I will introduce latency metrics once we have a support for histograms.

@lquerel lquerel requested a review from a team as a code owner December 27, 2025 06:34
@github-actions github-actions bot added the rust Pull requests that update Rust code label Dec 27, 2025
# Conflicts:
#	rust/otap-dataflow/crates/otap/src/otap_exporter.rs
#	rust/otap-dataflow/crates/otap/src/otap_grpc/client_settings.rs
@codecov
Copy link

codecov bot commented Dec 28, 2025

Codecov Report

❌ Patch coverage is 31.91981% with 883 lines in your changes missing coverage. Please review.
✅ Project coverage is 84.00%. Comparing base (8e81cc8) to head (280e1c2).

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1697      +/-   ##
==========================================
- Coverage   84.43%   84.00%   -0.44%     
==========================================
  Files         465      466       +1     
  Lines      135552   136570    +1018     
==========================================
+ Hits       114460   114731     +271     
- Misses      20558    21305     +747     
  Partials      534      534              
Components Coverage Δ
otap-dataflow 85.24% <31.91%> (-0.71%) ⬇️
query_abstraction 80.61% <ø> (ø)
query_engine 90.39% <ø> (ø)
syslog_cef_receivers ∅ <ø> (∅)
otel-arrow-go 53.50% <ø> (ø)
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

#[serde(default)]
format: Option<OutputFormat>,
/// When true, metric set which have all zero values are kept in the output. Default: false.
#[serde(default = "default_false")]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I introduced this flag to monitor unused channels. This is useful for analyzing DAG creation and detecting inactive subgraphs in the DAG.

Comment on lines -275 to -276
// Create NodeUserConfig and wrap as local processor
let user_config = Arc::new(NodeUserConfig::new_processor_config(SIGNAL_TYPE_ROUTER_URN));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This statement was a default node configuration which is incorrect.


let retry = RetryProcessor::with_pipeline_ctx(pipeline_ctx, config)?;

let user_config = Arc::new(NodeUserConfig::new_processor_config(RETRY_PROCESSOR_URN));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This statement was a default node configuration which is incorrect.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

rust Pull requests that update Rust code

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

1 participant