Skip to content

feat: span processor api refactor #2962

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/tracing-http-propagator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ hyper = { workspace = true, features = ["full"] }
hyper-util = { workspace = true, features = ["full"] }
tokio = { workspace = true, features = ["full"] }
opentelemetry = { path = "../../opentelemetry" }
opentelemetry_sdk = { path = "../../opentelemetry-sdk" }
opentelemetry_sdk = { path = "../../opentelemetry-sdk", features = ["experimental_span_processor_on_ending"]}
opentelemetry-http = { path = "../../opentelemetry-http" }
opentelemetry-stdout = { workspace = true, features = ["trace", "logs"] }
opentelemetry-semantic-conventions = { path = "../../opentelemetry-semantic-conventions" }
Expand Down
109 changes: 106 additions & 3 deletions examples/tracing-http-propagator/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@ use opentelemetry_sdk::{
error::OTelSdkResult,
logs::{LogProcessor, SdkLogRecord, SdkLoggerProvider},
propagation::{BaggagePropagator, TraceContextPropagator},
trace::{SdkTracerProvider, SpanProcessor},
trace::{FinishedSpan, ReadableSpan, SdkTracerProvider, SpanProcessor},
};
use opentelemetry_semantic_conventions::trace;
use opentelemetry_stdout::{LogExporter, SpanExporter};
use std::{convert::Infallible, net::SocketAddr, sync::OnceLock};
use std::{
collections::HashMap,
convert::Infallible,
net::SocketAddr,
sync::{Mutex, OnceLock},
};
use tokio::net::TcpListener;
use tracing::info;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
Expand Down Expand Up @@ -83,6 +88,7 @@ async fn router(
let span = tracer
.span_builder("router")
.with_kind(SpanKind::Server)
.with_attributes([KeyValue::new("http.route", req.uri().path().to_string())])
.start_with_context(tracer, &parent_cx);

info!(name = "router", message = "Dispatching request");
Expand All @@ -104,6 +110,101 @@ async fn router(
response
}

#[derive(Debug, Default)]
/// A custom span processor that counts concurrent requests for each route (indentified by the http.route
/// attribute) and adds that information to the span attributes.
struct RouteConcurrencyCounterSpanProcessor(Mutex<HashMap<opentelemetry::Key, usize>>);

impl SpanProcessor for RouteConcurrencyCounterSpanProcessor {
fn force_flush(&self) -> OTelSdkResult {
Ok(())
}

fn shutdown(&self) -> OTelSdkResult {
Ok(())
}

fn on_start(&self, span: &mut opentelemetry_sdk::trace::Span, _cx: &Context) {
if !matches!(span.span_kind(), SpanKind::Server) {
return;
}
let Some(route) = span
.attributes()
.iter()
.find(|kv| kv.key.as_str() == "http.route")
else {
return;
};
let mut counts = self.0.lock().unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.unwrap()
This could panic, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this panics if the mutex has been poisoned, which means a panic has happened in another thread that was holding the mutex.

Usually the best course of action is to propagate the panic because the state of the program is now uncertain (invariants that were protected by the mutex might not be true anymore)

let count = counts.entry(route.key.clone()).or_default();
*count += 1;
span.set_attribute(KeyValue::new(
"http.route.concurrent_requests",
*count as i64,
));
}

fn on_end(&self, span: &mut FinishedSpan) {
if !matches!(span.span_kind(), SpanKind::Server) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice motivating example for the on_end changes!

return;
}
let Some(route) = span
.attributes()
.iter()
.find(|kv| kv.key.as_str() == "http.route")
else {
return;
};
let mut counts = self.0.lock().unwrap();
let Some(count) = counts.get_mut(&route.key) else {
return;
};
*count -= 1;
if *count == 0 {
counts.remove(&route.key);
}
}
}

fn obfuscate_http_auth_url(s: &str) -> Option<String> {
#[allow(clippy::unnecessary_to_owned)]
let uri = hyper::http::Uri::from_maybe_shared(s.to_string()).ok()?;
let authority = uri.authority()?;
let (_, url) = authority.as_str().split_once('@')?;
let new_auth = format!("REDACTED_USERNAME:REDACTED_PASSWORD@{url}");
let mut parts = uri.into_parts();
parts.authority = Some(hyper::http::uri::Authority::from_maybe_shared(new_auth).ok()?);
Some(hyper::Uri::from_parts(parts).ok()?.to_string())
}

#[derive(Debug)]
/// A custom span processor that uses on_ending to obfuscate sensitive information in span attributes.
///
/// Currently this only overrides http auth information in the URI.
struct SpanObfuscationProcessor;

impl SpanProcessor for SpanObfuscationProcessor {
fn force_flush(&self) -> OTelSdkResult {
Ok(())
}

fn shutdown(&self) -> OTelSdkResult {
Ok(())
}
fn on_start(&self, _span: &mut opentelemetry_sdk::trace::Span, _cx: &Context) {}

fn on_ending(&self, span: &mut opentelemetry_sdk::trace::Span) {
let mut obfuscated_attributes = Vec::new();
for KeyValue { key, value, .. } in span.attributes() {
if let Some(redacted_uri) = obfuscate_http_auth_url(value.as_str().as_ref()) {
obfuscated_attributes.push((key.clone(), KeyValue::new(key.clone(), redacted_uri)));
}
}
}

fn on_end(&self, _span: &mut FinishedSpan) {}
}

/// A custom log processor that enriches LogRecords with baggage attributes.
/// Baggage information is not added automatically without this processor.
#[derive(Debug)]
Expand Down Expand Up @@ -141,7 +242,7 @@ impl SpanProcessor for EnrichWithBaggageSpanProcessor {
}
}

fn on_end(&self, _span: opentelemetry_sdk::trace::SpanData) {}
fn on_end(&self, _span: &mut opentelemetry_sdk::trace::FinishedSpan) {}
}

fn init_tracer() -> SdkTracerProvider {
Expand All @@ -157,7 +258,9 @@ fn init_tracer() -> SdkTracerProvider {
// Setup tracerprovider with stdout exporter
// that prints the spans to stdout.
let provider = SdkTracerProvider::builder()
.with_span_processor(RouteConcurrencyCounterSpanProcessor::default())
.with_span_processor(EnrichWithBaggageSpanProcessor)
.with_span_processor(SpanObfuscationProcessor)
.with_simple_exporter(SpanExporter::default())
.build();

Expand Down
7 changes: 7 additions & 0 deletions opentelemetry-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ experimental_logs_batch_log_processor_with_async_runtime = ["logs", "experimenta
experimental_logs_concurrent_log_processor = ["logs"]
experimental_trace_batch_span_processor_with_async_runtime = ["trace", "experimental_async_runtime"]
experimental_metrics_disable_name_validation = ["metrics"]
experimental_span_processor_on_ending = ["trace"]

[[bench]]
name = "context"
Expand Down Expand Up @@ -115,6 +116,12 @@ name = "log"
harness = false
required-features = ["logs"]

[[bench]]
name = "span_processor_api"
harness = false
required-features = ["testing"]


[lib]
bench = false

Expand Down
5 changes: 3 additions & 2 deletions opentelemetry-sdk/benches/batch_span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ use opentelemetry::trace::{
SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you run the benchmark suite to do a performance regression against main ? Would be great to include.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I've run the existing BatchSpanProcessor benchmarks.
With experimental_span_processor_on_ending not enabled, the benchmark show no detectable changes in performance.

If the experimental_span_processor_on_ending feature is enabled, there is a slight cost (5%) which I believe comes from the fact we have to clone the tracer field, because we have to iterate over the span processor list through a shared reference, while passing the span mutably. Thus we have to clone the tracer associated with the span so we can split the ownership (it's in an Arc cell so this is not that expensive)

};
use opentelemetry_sdk::testing::trace::NoopSpanExporter;
use opentelemetry_sdk::trace::SpanData;
use opentelemetry_sdk::trace::{
BatchConfigBuilder, BatchSpanProcessor, SpanEvents, SpanLinks, SpanProcessor,
};
use opentelemetry_sdk::trace::{FinishedSpan, SpanData};
use std::sync::Arc;
use tokio::runtime::Runtime;

Expand Down Expand Up @@ -62,7 +62,8 @@ fn criterion_benchmark(c: &mut Criterion) {
let spans = get_span_data();
handles.push(tokio::spawn(async move {
for span in spans {
span_processor.on_end(span);
let mut span = FinishedSpan::new(span);
span_processor.on_end(&mut span);
tokio::task::yield_now().await;
}
}));
Expand Down
86 changes: 86 additions & 0 deletions opentelemetry-sdk/benches/span_processor_api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use opentelemetry::{
trace::{Span, Tracer, TracerProvider},
Context, KeyValue,
};
use opentelemetry_sdk::trace as sdktrace;

#[cfg(not(target_os = "windows"))]
use pprof::criterion::{Output, PProfProfiler};

/*
Adding results in comments for a quick reference.
Chip: Apple M1 Max
Total Number of Cores: 10 (8 performance and 2 efficiency)

SpanProcessorApi/0_processors
time: [385.15 ns 386.14 ns 387.25 ns]
SpanProcessorApi/1_processors
time: [385.73 ns 387.17 ns 388.85 ns]
SpanProcessorApi/2_processors
time: [384.84 ns 385.66 ns 386.50 ns]
SpanProcessorApi/4_processors
time: [386.78 ns 388.17 ns 389.58 ns]
*/

#[derive(Debug)]
struct NoopSpanProcessor;

impl sdktrace::SpanProcessor for NoopSpanProcessor {
fn on_start(&self, _span: &mut sdktrace::Span, _parent_cx: &Context) {}
fn on_end(&self, _span: &mut sdktrace::FinishedSpan) {}
fn force_flush(&self) -> opentelemetry_sdk::error::OTelSdkResult {
Ok(())
}
fn shutdown(&self) -> opentelemetry_sdk::error::OTelSdkResult {
Ok(())
}
}

fn create_tracer(span_processors_count: usize) -> sdktrace::SdkTracer {
let mut builder = sdktrace::SdkTracerProvider::builder();
for _ in 0..span_processors_count {
builder = builder.with_span_processor(NoopSpanProcessor);
}
builder.build().tracer("tracer")
}

fn create_span(tracer: &sdktrace::Tracer) {
let mut span = tracer.start("foo");
span.set_attribute(KeyValue::new("key1", false));
span.set_attribute(KeyValue::new("key2", "hello"));
span.set_attribute(KeyValue::new("key4", 123.456));
span.add_event("my_event", vec![KeyValue::new("key1", "value1")]);
span.end();
}

fn criterion_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("SpanProcessorApi");
for i in [0, 1, 2, 4] {
group.bench_function(format!("{}_processors", i), |b| {
let tracer = create_tracer(i);
b.iter(|| {
black_box(create_span(&tracer));
});
});
}
}

#[cfg(not(target_os = "windows"))]
criterion_group! {
name = benches;
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)))
.warm_up_time(std::time::Duration::from_secs(1))
.measurement_time(std::time::Duration::from_secs(2));
targets = criterion_benchmark
}

#[cfg(target_os = "windows")]
criterion_group! {
name = benches;
config = Criterion::default().warm_up_time(std::time::Duration::from_secs(1))
.measurement_time(std::time::Duration::from_secs(2));
targets = criterion_benchmark
}

criterion_main!(benches);
4 changes: 2 additions & 2 deletions opentelemetry-sdk/src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub use id_generator::{IdGenerator, RandomIdGenerator};
pub use links::SpanLinks;
pub use provider::{SdkTracerProvider, TracerProviderBuilder};
pub use sampler::{Sampler, ShouldSample};
pub use span::Span;
pub use span::{FinishedSpan, ReadableSpan, Span};
pub use span_limit::SpanLimits;
pub use span_processor::{
BatchConfig, BatchConfigBuilder, BatchSpanProcessor, BatchSpanProcessorBuilder,
Expand Down Expand Up @@ -136,7 +136,7 @@ mod tests {
}
}

fn on_end(&self, _span: SpanData) {
fn on_end(&self, _span: &mut FinishedSpan) {
// TODO: Accessing Context::current() will panic today and hence commented out.
// See https://github.com/open-telemetry/opentelemetry-rust/issues/2871
// let _c = Context::current();
Expand Down
8 changes: 4 additions & 4 deletions opentelemetry-sdk/src/trace/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,8 +462,8 @@ mod tests {
SERVICE_NAME, TELEMETRY_SDK_LANGUAGE, TELEMETRY_SDK_NAME, TELEMETRY_SDK_VERSION,
};
use crate::trace::provider::TracerProviderInner;
use crate::trace::{Config, Span, SpanProcessor};
use crate::trace::{SdkTracerProvider, SpanData};
use crate::trace::SdkTracerProvider;
use crate::trace::{Config, FinishedSpan, Span, SpanProcessor};
use crate::Resource;
use opentelemetry::trace::{Tracer, TracerProvider};
use opentelemetry::{Context, Key, KeyValue, Value};
Expand Down Expand Up @@ -516,7 +516,7 @@ mod tests {
.fetch_add(1, Ordering::SeqCst);
}

fn on_end(&self, _span: SpanData) {
fn on_end(&self, _span: &mut FinishedSpan) {
// ignore
}

Expand Down Expand Up @@ -779,7 +779,7 @@ mod tests {
// No operation needed for this processor
}

fn on_end(&self, _span: SpanData) {
fn on_end(&self, _span: &mut FinishedSpan) {
// No operation needed for this processor
}

Expand Down
Loading