-
Notifications
You must be signed in to change notification settings - Fork 65
[batch_processor] Support bytes-based batching via new format = [otap|otlp|preserve]
#1633
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…d/batch_otlp_direct
…d/batch_otlp_direct
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #1633 +/- ##
==========================================
+ Coverage 84.12% 84.14% +0.02%
==========================================
Files 456 458 +2
Lines 129593 130178 +585
==========================================
+ Hits 109017 109538 +521
- Misses 20042 20106 +64
Partials 534 534
🚀 New features to boost your workflow:
|
…d/batch_otlp_direct
| # At the rate configured above, this will print a batch | ||
| # every 10 seconds. | ||
| min_size: 1000 | ||
| otap: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the comment right above this does not make sense.. is that to be fixed?
| // Presently we have only the OTAP mode of batching, which supports only Items. | ||
| if self.sizer != Sizer::Items { | ||
| let (expect_sizer, with_msg) = match format { | ||
| SignalFormat::OtapRecords => (Sizer::Items, "OTAP batch sizer: must be items"), | ||
| SignalFormat::OtlpBytes => (Sizer::Bytes, "OTLP batch sizer: must be bytes"), | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
⛏️ This comment looks outdated; there are multiple modes now.
| // Zero-timeout is a valid split-only configuration, but must have | ||
| if no_timeout { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
⛏️ Did part of this comment get lost? It is unclear to me what the remainder may be.
| // Only batch size: OTLP OK (OTLP invalid OK) | ||
| let cfg = Config { | ||
| min_size: NonZeroUsize::new(100), | ||
| max_size: None, | ||
| timeout: Duration::from_millis(100), | ||
| otap: FormatConfig::new_items(100, 0), | ||
| otlp: FormatConfig::new_items(0, 0), | ||
| flush_timeout: Duration::from_millis(100), | ||
| format: BatchingFormat::Otap, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
⛏️ I don't understand the comment on this test. The otlp field appears incorrect (because items), but this situation is okay?
|
|
||
| // Run a single equivalence test | ||
| let test_config = |limit: Option<NonZeroU64>, label: &str| { | ||
| let outputs = make_bytes_batches(signal_type, limit, inputs_bytes.clone()).expect("ok"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
⛏️ I don't see an assertion that this list contains multiple values. Is it possible to pass the expected number of batches to this closure?
- no limit → 1 batch
- limit == size → 1 batch
- 10% → 10 batches?
- 50% → 2 batches?
- limit == 1 →
sizebatches?
| } | ||
| } | ||
|
|
||
| /// Transcode a protocol message object to OTLP bytes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
⛏️ copy pasta, maybe.
| /// Transcode a protocol message object to OTLP bytes. | |
| /// Transcode OTLP bytes to a protocol message object. |
| } | ||
|
|
||
| const fn default_batching_format() -> BatchingFormat { | ||
| BatchingFormat::Otap |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this should be Preserve?
If batching OTLP Bytes is more efficient than converting to OTAP and batching arrow records, then having this as Preserve would mean users doing OTLP Receiver -> Batch Processor -> OTLP Exporter would not have to opt-in to the most efficient pipeline
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After reading through the code, I now realize that this behaviour could cause confusion because incoming telemetry would get grouped separately based on it's format, which is somewhat unexpected if users are used to how the go collector works. Feel free to ignore this suggestion if it doesn't make sense.
| } | ||
|
|
||
| /// Combines OTLP content by concatenation of bytes. Because we have a | ||
| /// top-level repeated field, this is precisely correct. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. In the future, would we support stepping into the nested Resource -> Scope -> Signal to either to get batch sizes that are more closely aligned to the max size?
I realize that makes this significantly more complicated, but once we had that logic in place, we could also use it batch OTLP bytes for items sizer (at least for logs & traces).
Fixes #1570.
Adds dual format configuration to batch processor, with separate
FormatConfigstructs for each payload format.This supports forcing payload into one or the other format, or allowing both to be preserved.
The new bytes-based batching routines operate by scanning through top-level fields. Unlike the items-based batching mode, this may produce batches that are less than the limit; like that mode, it can also produce outputs greater than the limit.