scuffle_bootstrap_telemetry/
lib.rs1#![cfg_attr(docsrs, feature(doc_cfg))]
110#![cfg_attr(all(coverage_nightly, test), feature(coverage_attribute))]
111#![deny(missing_docs)]
112#![deny(unsafe_code)]
113#![deny(unreachable_pub)]
114
115use anyhow::Context;
116use bytes::Bytes;
117#[cfg(feature = "opentelemetry-logs")]
118#[cfg_attr(docsrs, doc(cfg(feature = "opentelemetry-logs")))]
119pub use opentelemetry_appender_tracing;
120#[cfg(feature = "opentelemetry")]
121#[cfg_attr(docsrs, doc(cfg(feature = "opentelemetry")))]
122pub use opentelemetry_sdk;
123#[cfg(feature = "prometheus")]
124#[cfg_attr(docsrs, doc(cfg(feature = "prometheus")))]
125pub use prometheus_client;
126use scuffle_bootstrap::global::Global;
127use scuffle_bootstrap::service::Service;
128#[cfg(feature = "opentelemetry-traces")]
129pub use tracing_opentelemetry;
130
131#[cfg(feature = "opentelemetry")]
132#[cfg_attr(docsrs, doc(cfg(feature = "opentelemetry")))]
133pub mod opentelemetry;
134
135pub struct TelemetrySvc;
183
184pub trait TelemetryConfig: Global {
186 fn enabled(&self) -> bool {
188 true
189 }
190
191 fn bind_address(&self) -> Option<std::net::SocketAddr> {
193 None
194 }
195
196 fn http_server_name(&self) -> &str {
198 "scuffle-bootstrap-telemetry"
199 }
200
201 fn health_check(&self) -> impl std::future::Future<Output = Result<(), anyhow::Error>> + Send {
205 std::future::ready(Ok(()))
206 }
207
208 #[cfg(feature = "prometheus")]
215 #[cfg_attr(docsrs, doc(cfg(feature = "prometheus")))]
216 fn prometheus_metrics_registry(&self) -> Option<&prometheus_client::registry::Registry> {
217 None
218 }
219
220 #[cfg(feature = "opentelemetry")]
227 #[cfg_attr(docsrs, doc(cfg(feature = "opentelemetry")))]
228 fn opentelemetry(&self) -> Option<&opentelemetry::OpenTelemetry> {
229 None
230 }
231}
232
233impl<Global: TelemetryConfig> Service<Global> for TelemetrySvc {
234 async fn enabled(&self, global: &std::sync::Arc<Global>) -> anyhow::Result<bool> {
235 Ok(global.enabled())
236 }
237
238 async fn run(self, global: std::sync::Arc<Global>, ctx: scuffle_context::Context) -> anyhow::Result<()> {
239 if let Some(bind_addr) = global.bind_address() {
240 let global = global.clone();
241
242 let service = scuffle_http::service::fn_http_service(move |req| {
243 let global = global.clone();
244 async move {
245 match req.uri().path() {
246 "/health" => health_check(&global, req).await,
247 #[cfg(feature = "prometheus")]
248 "/metrics" => metrics(&global, req).await,
249 #[cfg(all(feature = "pprof", unix))]
250 "/pprof/cpu" => pprof(&global, req).await,
251 #[cfg(feature = "opentelemetry")]
252 "/opentelemetry/flush" => opentelemetry_flush(&global).await,
253 _ => Ok(http::Response::builder()
254 .status(http::StatusCode::NOT_FOUND)
255 .body(http_body_util::Full::new(Bytes::from_static(b"not found")))?),
256 }
257 }
258 });
259
260 scuffle_http::HttpServer::builder()
261 .bind(bind_addr)
262 .ctx(ctx)
263 .service_factory(scuffle_http::service::service_clone_factory(service))
264 .build()
265 .run()
266 .await
267 .context("server run")?;
268 } else {
269 ctx.done().await;
270 }
271
272 #[cfg(feature = "opentelemetry")]
273 if let Some(opentelemetry) = global.opentelemetry().cloned() {
274 if opentelemetry.is_enabled() {
275 tokio::task::spawn_blocking(move || opentelemetry.shutdown())
276 .await
277 .context("opentelemetry shutdown spawn")?
278 .context("opentelemetry shutdown")?;
279 }
280 }
281
282 Ok(())
283 }
284}
285
286async fn health_check<G: TelemetryConfig>(
287 global: &std::sync::Arc<G>,
288 _: http::Request<scuffle_http::body::IncomingBody>,
289) -> Result<http::Response<http_body_util::Full<Bytes>>, http::Error> {
290 if let Err(err) = global.health_check().await {
291 tracing::error!("health check failed: {err}");
292 Ok(http::Response::builder()
293 .status(http::StatusCode::INTERNAL_SERVER_ERROR)
294 .body(http_body_util::Full::new(format!("{err:#}").into()))?)
295 } else {
296 Ok(http::Response::builder()
297 .status(http::StatusCode::OK)
298 .body(http_body_util::Full::new(Bytes::from_static(b"ok")))?)
299 }
300}
301
302#[cfg(feature = "prometheus")]
303async fn metrics<G: TelemetryConfig>(
304 global: &std::sync::Arc<G>,
305 _: http::Request<scuffle_http::body::IncomingBody>,
306) -> Result<http::Response<http_body_util::Full<Bytes>>, http::Error> {
307 if let Some(metrics) = global.prometheus_metrics_registry() {
308 let mut buf = String::new();
309 if prometheus_client::encoding::text::encode(&mut buf, metrics).is_err() {
310 tracing::error!("metrics encode failed");
311 return http::Response::builder()
312 .status(http::StatusCode::INTERNAL_SERVER_ERROR)
313 .body(http_body_util::Full::new("metrics encode failed".to_string().into()));
314 }
315
316 Ok(http::Response::builder()
317 .status(http::StatusCode::OK)
318 .body(http_body_util::Full::new(Bytes::from(buf)))?)
319 } else {
320 Ok(http::Response::builder()
321 .status(http::StatusCode::NOT_FOUND)
322 .body(http_body_util::Full::new(Bytes::from_static(b"not found")))?)
323 }
324}
325
326#[cfg(unix)]
327#[cfg(feature = "pprof")]
328async fn pprof<G: TelemetryConfig>(
329 _: &std::sync::Arc<G>,
330 req: http::Request<scuffle_http::body::IncomingBody>,
331) -> Result<http::Response<http_body_util::Full<Bytes>>, http::Error> {
332 let query = req.uri().query();
333 let query = query.map(querystring::querify).into_iter().flatten();
334
335 let mut freq = 100;
336 let mut duration = std::time::Duration::from_secs(5);
337 let mut ignore_list = Vec::new();
338
339 for (key, value) in query {
340 if key == "freq" {
341 freq = match value.parse() {
342 Ok(v) => v,
343 Err(err) => {
344 return http::Response::builder()
345 .status(http::StatusCode::BAD_REQUEST)
346 .body(http_body_util::Full::new(format!("invalid freq: {err:#}").into()));
347 }
348 };
349 } else if key == "duration" {
350 duration = match value.parse() {
351 Ok(v) => std::time::Duration::from_secs(v),
352 Err(err) => {
353 return http::Response::builder()
354 .status(http::StatusCode::BAD_REQUEST)
355 .body(http_body_util::Full::new(format!("invalid duration: {err:#}").into()));
356 }
357 };
358 } else if key == "ignore" {
359 ignore_list.push(value);
360 }
361 }
362
363 let cpu = scuffle_pprof::Cpu::new(freq, &ignore_list);
364
365 match tokio::task::spawn_blocking(move || cpu.capture(duration)).await {
366 Ok(Ok(data)) => Ok(http::Response::builder()
367 .status(http::StatusCode::OK)
368 .body(http_body_util::Full::new(Bytes::from(data)))?),
369 Ok(Err(err)) => {
370 tracing::error!("cpu capture failed: {err:#}");
371 Ok(http::Response::builder()
372 .status(http::StatusCode::INTERNAL_SERVER_ERROR)
373 .body(http_body_util::Full::new(format!("{err:#}").into()))?)
374 }
375 Err(err) => {
376 tracing::error!("cpu capture failed: {err:#}");
377 Ok(http::Response::builder()
378 .status(http::StatusCode::INTERNAL_SERVER_ERROR)
379 .body(http_body_util::Full::new(format!("{err:#}").into()))?)
380 }
381 }
382}
383
384#[cfg(feature = "opentelemetry")]
385async fn opentelemetry_flush<G: TelemetryConfig>(
386 global: &std::sync::Arc<G>,
387) -> Result<http::Response<http_body_util::Full<Bytes>>, http::Error> {
388 if let Some(opentelemetry) = global.opentelemetry().cloned() {
389 if opentelemetry.is_enabled() {
390 match tokio::task::spawn_blocking(move || opentelemetry.flush()).await {
391 Ok(Ok(())) => Ok(http::Response::builder()
392 .status(http::StatusCode::OK)
393 .body(http_body_util::Full::new(Bytes::from_static(b"ok")))?),
394 Ok(Err(err)) => {
395 tracing::error!("opentelemetry flush failed: {err:#}");
396 Ok(http::Response::builder()
397 .status(http::StatusCode::INTERNAL_SERVER_ERROR)
398 .body(http_body_util::Full::new(format!("{err:#}").into()))?)
399 }
400 Err(err) => {
401 tracing::error!("opentelemetry flush spawn failed: {err:#}");
402 Ok(http::Response::builder()
403 .status(http::StatusCode::INTERNAL_SERVER_ERROR)
404 .body(http_body_util::Full::new(format!("{err:#}").into()))?)
405 }
406 }
407 } else {
408 Ok(http::Response::builder()
409 .status(http::StatusCode::OK)
410 .body(http_body_util::Full::new(Bytes::from_static(b"ok")))?)
411 }
412 } else {
413 Ok(http::Response::builder()
414 .status(http::StatusCode::NOT_FOUND)
415 .body(http_body_util::Full::new(Bytes::from_static(b"not found")))?)
416 }
417}
418
419#[cfg(test)]
420#[cfg_attr(all(test, coverage_nightly), coverage(off))]
421#[cfg(all(
422 feature = "opentelemetry-metrics",
423 feature = "opentelemetry-traces",
424 feature = "opentelemetry-logs"
425))]
426mod tests {
427 use std::net::SocketAddr;
428 use std::sync::Arc;
429
430 #[cfg(unix)]
431 use bytes::Bytes;
432 #[cfg(feature = "opentelemetry-logs")]
433 use opentelemetry_sdk::logs::SdkLoggerProvider;
434 #[cfg(feature = "opentelemetry-metrics")]
435 use opentelemetry_sdk::metrics::SdkMeterProvider;
436 #[cfg(feature = "opentelemetry-traces")]
437 use opentelemetry_sdk::trace::SdkTracerProvider;
438 use scuffle_bootstrap::{GlobalWithoutConfig, Service};
439
440 use crate::{TelemetryConfig, TelemetrySvc};
441
442 async fn request_metrics(addr: SocketAddr) -> reqwest::Result<String> {
443 reqwest::get(format!("http://{addr}/metrics"))
444 .await
445 .unwrap()
446 .error_for_status()?
447 .text()
448 .await
449 }
450
451 async fn request_health(addr: SocketAddr) -> String {
452 reqwest::get(format!("http://{addr}/health"))
453 .await
454 .unwrap()
455 .error_for_status()
456 .expect("health check failed")
457 .text()
458 .await
459 .expect("health check text")
460 }
461
462 #[cfg(unix)]
463 async fn request_pprof(addr: SocketAddr, freq: &str, duration: &str) -> reqwest::Result<Bytes> {
464 reqwest::get(format!("http://{addr}/pprof/cpu?freq={freq}&duration={duration}"))
465 .await
466 .unwrap()
467 .error_for_status()?
468 .bytes()
469 .await
470 }
471
472 async fn flush_opentelemetry(addr: SocketAddr) -> reqwest::Result<reqwest::Response> {
473 reqwest::get(format!("http://{addr}/opentelemetry/flush"))
474 .await
475 .unwrap()
476 .error_for_status()
477 }
478
479 #[cfg(not(valgrind))] #[tokio::test]
481 async fn telemetry_http_server() {
482 struct TestGlobal {
483 bind_addr: SocketAddr,
484 #[cfg(feature = "prometheus")]
485 prometheus: prometheus_client::registry::Registry,
486 open_telemetry: crate::opentelemetry::OpenTelemetry,
487 }
488
489 impl GlobalWithoutConfig for TestGlobal {
490 async fn init() -> anyhow::Result<Arc<Self>> {
491 let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
492 let bind_addr = listener.local_addr()?;
493
494 let mut prometheus = prometheus_client::registry::Registry::default();
495
496 let exporter = scuffle_metrics::prometheus::exporter().build();
497 prometheus.register_collector(exporter.collector());
498
499 let metrics = SdkMeterProvider::builder().with_reader(exporter).build();
500 opentelemetry::global::set_meter_provider(metrics.clone());
501
502 let tracer = SdkTracerProvider::default();
503 opentelemetry::global::set_tracer_provider(tracer.clone());
504
505 let logger = SdkLoggerProvider::builder().build();
506
507 let open_telemetry = crate::opentelemetry::OpenTelemetry::new()
508 .with_metrics(metrics)
509 .with_traces(tracer)
510 .with_logs(logger);
511
512 Ok(Arc::new(TestGlobal {
513 bind_addr,
514 prometheus,
515 open_telemetry,
516 }))
517 }
518 }
519
520 impl TelemetryConfig for TestGlobal {
521 fn bind_address(&self) -> Option<std::net::SocketAddr> {
522 Some(self.bind_addr)
523 }
524
525 fn prometheus_metrics_registry(&self) -> Option<&prometheus_client::registry::Registry> {
526 Some(&self.prometheus)
527 }
528
529 fn opentelemetry(&self) -> Option<&crate::opentelemetry::OpenTelemetry> {
530 Some(&self.open_telemetry)
531 }
532 }
533
534 #[scuffle_metrics::metrics]
535 mod example {
536 use scuffle_metrics::{CounterU64, MetricEnum};
537
538 #[derive(MetricEnum)]
539 pub enum Kind {
540 Http,
541 Grpc,
542 }
543
544 #[metrics(unit = "requests")]
545 pub fn request(kind: Kind) -> CounterU64;
546 }
547
548 let global = <TestGlobal as GlobalWithoutConfig>::init().await.unwrap();
549
550 let bind_addr = global.bind_addr;
551
552 assert!(TelemetrySvc.enabled(&global).await.unwrap());
553
554 let task_handle = tokio::spawn(TelemetrySvc.run(global, scuffle_context::Context::global()));
555
556 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
557
558 let health = request_health(bind_addr).await;
559 assert_eq!(health, "ok");
560
561 let metrics = request_metrics(bind_addr).await.expect("metrics failed");
562 assert!(metrics.starts_with("# HELP target Information about the target\n"));
563 assert!(metrics.contains("# TYPE target info\n"));
564 assert!(metrics.contains("service_name=\"unknown_service\""));
565 assert!(metrics.contains("telemetry_sdk_language=\"rust\""));
566 assert!(metrics.contains("telemetry_sdk_name=\"opentelemetry\""));
567 assert!(metrics.ends_with("# EOF\n"));
568
569 example::request(example::Kind::Http).incr();
570
571 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
572
573 let metrics = request_metrics(bind_addr).await.expect("metrics failed");
574 assert!(metrics.contains("# UNIT example_request_requests requests\n"));
575 assert!(metrics.contains("example_request_requests_total{"));
576 assert!(metrics.contains(format!("otel_scope_name=\"{}\"", env!("CARGO_PKG_NAME")).as_str()));
577 assert!(metrics.contains(format!("otel_scope_version=\"{}\"", env!("CARGO_PKG_VERSION")).as_str()));
578 assert!(metrics.contains("kind=\"Http\""));
579 assert!(metrics.contains("} 1\n"));
580 assert!(metrics.ends_with("# EOF\n"));
581
582 example::request(example::Kind::Http).incr();
583
584 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
585
586 let metrics = request_metrics(bind_addr).await.expect("metrics failed");
587 assert!(metrics.contains("# UNIT example_request_requests requests\n"));
588 assert!(metrics.contains("example_request_requests_total{"));
589 assert!(metrics.contains(format!("otel_scope_name=\"{}\"", env!("CARGO_PKG_NAME")).as_str()));
590 assert!(metrics.contains(format!("otel_scope_version=\"{}\"", env!("CARGO_PKG_VERSION")).as_str()));
591 assert!(metrics.contains("kind=\"Http\""));
592 assert!(metrics.contains("} 2\n"));
593 assert!(metrics.ends_with("# EOF\n"));
594
595 #[cfg(unix)]
596 {
597 let timer = std::time::Instant::now();
598 assert!(!request_pprof(bind_addr, "100", "2").await.expect("pprof failed").is_empty());
599 assert!(timer.elapsed() > std::time::Duration::from_secs(2));
600
601 let res = request_pprof(bind_addr, "invalid", "2").await.expect_err("error expected");
602 assert!(res.is_status());
603 assert_eq!(res.status(), Some(reqwest::StatusCode::BAD_REQUEST));
604
605 let res = request_pprof(bind_addr, "100", "invalid").await.expect_err("error expected");
606 assert!(res.is_status());
607 assert_eq!(res.status(), Some(reqwest::StatusCode::BAD_REQUEST));
608 }
609
610 assert!(flush_opentelemetry(bind_addr).await.is_ok());
611
612 let res = reqwest::get(format!("http://{bind_addr}/not_found")).await.unwrap();
614 assert_eq!(res.status(), reqwest::StatusCode::NOT_FOUND);
615
616 scuffle_context::Handler::global().shutdown().await;
617
618 task_handle.await.unwrap().unwrap();
619 }
620
621 #[cfg(not(valgrind))] #[tokio::test]
623 async fn empty_telemetry_http_server() {
624 struct TestGlobal {
625 bind_addr: SocketAddr,
626 }
627
628 impl GlobalWithoutConfig for TestGlobal {
629 async fn init() -> anyhow::Result<Arc<Self>> {
630 let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
631 let bind_addr = listener.local_addr()?;
632
633 Ok(Arc::new(TestGlobal { bind_addr }))
634 }
635 }
636
637 impl TelemetryConfig for TestGlobal {
638 fn bind_address(&self) -> Option<std::net::SocketAddr> {
639 Some(self.bind_addr)
640 }
641 }
642
643 let global = <TestGlobal as GlobalWithoutConfig>::init().await.unwrap();
644
645 let bind_addr = global.bind_addr;
646
647 assert!(TelemetrySvc.enabled(&global).await.unwrap());
648
649 let task_handle = tokio::spawn(TelemetrySvc.run(global, scuffle_context::Context::global()));
650 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
651
652 let health = request_health(bind_addr).await;
653 assert_eq!(health, "ok");
654
655 let res = request_metrics(bind_addr).await.expect_err("error expected");
656 assert!(res.is_status());
657 assert_eq!(res.status(), Some(reqwest::StatusCode::NOT_FOUND));
658
659 #[cfg(unix)]
660 {
661 let timer = std::time::Instant::now();
662 assert!(!request_pprof(bind_addr, "100", "2").await.expect("pprof failed").is_empty());
663 assert!(timer.elapsed() > std::time::Duration::from_secs(2));
664 }
665
666 let err = flush_opentelemetry(bind_addr).await.expect_err("error expected");
667 assert!(err.is_status());
668 assert_eq!(err.status(), Some(reqwest::StatusCode::NOT_FOUND));
669
670 scuffle_context::Handler::global().shutdown().await;
671
672 task_handle.await.unwrap().unwrap();
673 }
674}