Skip to content

Conversation

@jmacd
Copy link
Contributor

@jmacd jmacd commented Dec 16, 2025

Fixes #1570.

Adds dual format configuration to batch processor, with separate FormatConfig structs for each payload format.
This supports forcing payload into one or the other format, or allowing both to be preserved.

The new bytes-based batching routines operate by scanning through top-level fields. Unlike the items-based batching mode, this may produce batches that are less than the limit; like that mode, it can also produce outputs greater than the limit.

@jmacd jmacd requested a review from a team as a code owner December 16, 2025 04:37
@github-actions github-actions bot added the rust Pull requests that update Rust code label Dec 16, 2025
@codecov
Copy link

codecov bot commented Dec 16, 2025

Codecov Report

❌ Patch coverage is 93.21149% with 52 lines in your changes missing coverage. Please review.
✅ Project coverage is 84.14%. Comparing base (70e2d49) to head (8596593).

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1633      +/-   ##
==========================================
+ Coverage   84.12%   84.14%   +0.02%     
==========================================
  Files         456      458       +2     
  Lines      129593   130178     +585     
==========================================
+ Hits       109017   109538     +521     
- Misses      20042    20106      +64     
  Partials      534      534              
Components Coverage Δ
otap-dataflow 85.82% <93.21%> (+0.02%) ⬆️
query_abstraction 80.61% <ø> (ø)
query_engine 90.00% <ø> (ø)
syslog_cef_receivers ∅ <ø> (∅)
otel-arrow-go 53.50% <ø> (ø)
🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

# At the rate configured above, this will print a batch
# every 10 seconds.
min_size: 1000
otap:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the comment right above this does not make sense.. is that to be fixed?

Comment on lines 312 to +316
// Presently we have only the OTAP mode of batching, which supports only Items.
if self.sizer != Sizer::Items {
let (expect_sizer, with_msg) = match format {
SignalFormat::OtapRecords => (Sizer::Items, "OTAP batch sizer: must be items"),
SignalFormat::OtlpBytes => (Sizer::Bytes, "OTLP batch sizer: must be bytes"),
};
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⛏️ This comment looks outdated; there are multiple modes now.

Comment on lines +335 to +336
// Zero-timeout is a valid split-only configuration, but must have
if no_timeout {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⛏️ Did part of this comment get lost? It is unclear to me what the remainder may be.

Comment on lines +1507 to +1512
// Only batch size: OTLP OK (OTLP invalid OK)
let cfg = Config {
min_size: NonZeroUsize::new(100),
max_size: None,
timeout: Duration::from_millis(100),
otap: FormatConfig::new_items(100, 0),
otlp: FormatConfig::new_items(0, 0),
flush_timeout: Duration::from_millis(100),
format: BatchingFormat::Otap,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⛏️ I don't understand the comment on this test. The otlp field appears incorrect (because items), but this situation is okay?


// Run a single equivalence test
let test_config = |limit: Option<NonZeroU64>, label: &str| {
let outputs = make_bytes_batches(signal_type, limit, inputs_bytes.clone()).expect("ok");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⛏️ I don't see an assertion that this list contains multiple values. Is it possible to pass the expected number of batches to this closure?

  • no limit → 1 batch
  • limit == size → 1 batch
  • 10% → 10 batches?
  • 50% → 2 batches?
  • limit == 1 → size batches?

}
}

/// Transcode a protocol message object to OTLP bytes.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⛏️ copy pasta, maybe.

Suggested change
/// Transcode a protocol message object to OTLP bytes.
/// Transcode OTLP bytes to a protocol message object.

}

const fn default_batching_format() -> BatchingFormat {
BatchingFormat::Otap
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this should be Preserve?

If batching OTLP Bytes is more efficient than converting to OTAP and batching arrow records, then having this as Preserve would mean users doing OTLP Receiver -> Batch Processor -> OTLP Exporter would not have to opt-in to the most efficient pipeline

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After reading through the code, I now realize that this behaviour could cause confusion because incoming telemetry would get grouped separately based on it's format, which is somewhat unexpected if users are used to how the go collector works. Feel free to ignore this suggestion if it doesn't make sense.

}

/// Combines OTLP content by concatenation of bytes. Because we have a
/// top-level repeated field, this is precisely correct.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. In the future, would we support stepping into the nested Resource -> Scope -> Signal to either to get batch sizes that are more closely aligned to the max size?

I realize that makes this significantly more complicated, but once we had that logic in place, we could also use it batch OTLP bytes for items sizer (at least for logs & traces).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

rust Pull requests that update Rust code

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

Batch processor: payload format support

4 participants