scuffle_bootstrap_telemetry/
lib.rs

1//! A crate used to add telemetry to applications built with the
2//! [`scuffle-bootstrap`](../scuffle_bootstrap) crate.
3//!
4//! Emit metrics using the [`scuffle-metrics`](../scuffle_metrics)
5//! crate.
6//!
7//! ## Feature Flags
8//!
9//! - `prometheus`: Enable Prometheus support.
10//! - `pprof`: Enable pprof support. (Unix only)
11//! - `opentelemetry-metrics`: Enable OpenTelemetry metrics support.
12//! - `opentelemetry-traces`: Enable OpenTelemetry traces support.
13//! - `opentelemetry-logs`: Enable OpenTelemetry logs support.
14//!
15//! All features are enabled by default.
16//!
17//! See [`TelemetrySvc`] for more details.
18//!
19//! ## Example
20//!
21//! ```rust
22//! use std::net::SocketAddr;
23//! use std::sync::Arc;
24//!
25//! use scuffle_bootstrap::global::GlobalWithoutConfig;
26//! use scuffle_bootstrap_telemetry::{prometheus_client, opentelemetry, opentelemetry_sdk, TelemetryConfig, TelemetrySvc};
27//!
28//! struct Global {
29//!     prometheus: prometheus_client::registry::Registry,
30//!     open_telemetry: opentelemetry::OpenTelemetry,
31//! }
32//!
33//! impl GlobalWithoutConfig for Global {
34//!     async fn init() -> anyhow::Result<Arc<Self>> {
35//!         // Initialize the Prometheus metrics registry.
36//!         let mut prometheus = prometheus_client::registry::Registry::default();
37//!         // The exporter converts opentelemetry metrics into the Prometheus format.
38//!         let exporter = scuffle_metrics::prometheus::exporter().build();
39//!         // Register the exporter as a data source for the Prometheus registry.
40//!         prometheus.register_collector(exporter.collector());
41//!
42//!         // Initialize the OpenTelemetry metrics provider and add the Prometheus exporter as a reader.
43//!         let metrics = opentelemetry_sdk::metrics::SdkMeterProvider::builder().with_reader(exporter).build();
44//!         opentelemetry::global::set_meter_provider(metrics.clone());
45//!
46//!         // Initialize the OpenTelemetry configuration instance.
47//!         let open_telemetry = opentelemetry::OpenTelemetry::new().with_metrics(metrics);
48//!
49//!         Ok(Arc::new(Self {
50//!             prometheus,
51//!             open_telemetry,
52//!         }))
53//!     }
54//! }
55//!
56//! impl TelemetryConfig for Global {
57//!     fn bind_address(&self) -> Option<SocketAddr> {
58//!         // Tells the http server to bind to port 8080 on localhost.
59//!         Some(SocketAddr::from(([127, 0, 0, 1], 8080)))
60//!     }
61//!
62//!     fn prometheus_metrics_registry(&self) -> Option<&prometheus_client::registry::Registry> {
63//!         Some(&self.prometheus)
64//!     }
65//!
66//!     fn opentelemetry(&self) -> Option<&opentelemetry::OpenTelemetry> {
67//!         Some(&self.open_telemetry)
68//!     }
69//! }
70//!
71//! #[scuffle_metrics::metrics]
72//! mod example {
73//!     use scuffle_metrics::{CounterU64, MetricEnum};
74//!
75//!     #[derive(MetricEnum)]
76//!     pub enum Kind {
77//!         Http,
78//!         Grpc,
79//!     }
80//!
81//!     #[metrics(unit = "requests")]
82//!     pub fn request(kind: Kind) -> CounterU64;
83//! }
84//!
85//! // Now emit metrics from anywhere in your code using the `example` module.
86//! example::request(example::Kind::Http).incr();
87//!
88//! scuffle_bootstrap::main! {
89//!     Global {
90//!         TelemetrySvc,
91//!     }
92//! };
93//! ```
94//!
95//! ## Status
96//!
97//! This crate is currently under development and is not yet stable, unit tests
98//! are not yet fully implemented.
99//!
100//! Unit tests are not yet fully implemented. Use at your own risk.
101//!
102//! ## License
103//!
104//! This project is licensed under the [MIT](./LICENSE.MIT) or
105//! [Apache-2.0](./LICENSE.Apache-2.0) license. You can choose between one of
106//! them if you use this work.
107//!
108//! `SPDX-License-Identifier: MIT OR Apache-2.0`
109#![cfg_attr(docsrs, feature(doc_cfg))]
110#![cfg_attr(all(coverage_nightly, test), feature(coverage_attribute))]
111#![deny(missing_docs)]
112#![deny(unsafe_code)]
113#![deny(unreachable_pub)]
114
115use anyhow::Context;
116use bytes::Bytes;
117#[cfg(feature = "opentelemetry-logs")]
118#[cfg_attr(docsrs, doc(cfg(feature = "opentelemetry-logs")))]
119pub use opentelemetry_appender_tracing;
120#[cfg(feature = "opentelemetry")]
121#[cfg_attr(docsrs, doc(cfg(feature = "opentelemetry")))]
122pub use opentelemetry_sdk;
123#[cfg(feature = "prometheus")]
124#[cfg_attr(docsrs, doc(cfg(feature = "prometheus")))]
125pub use prometheus_client;
126use scuffle_bootstrap::global::Global;
127use scuffle_bootstrap::service::Service;
128#[cfg(feature = "opentelemetry-traces")]
129pub use tracing_opentelemetry;
130
131#[cfg(feature = "opentelemetry")]
132#[cfg_attr(docsrs, doc(cfg(feature = "opentelemetry")))]
133pub mod opentelemetry;
134
135/// The telemetry service.
136///
137/// This is supposed to be used with the `scuffle-bootstrap` crate.
138///
139/// # HTTP Server
140///
141/// This service provides an http server which will bind to the address provided
142/// by the config. (See [`TelemetryConfig`])
143///
144/// ## Endpoints
145///
146/// The server provides the following endpoints:
147///
148/// ### `/health`
149///
150/// Health check endpoint.
151///
152/// This endpoint calls the health check function provided by the config and
153/// responds with `200 OK` if the health check returns `Ok(())`. If the health
154/// check returns an error, the endpoint returns `500 Internal Server Error`
155/// along with the error message.
156///
157/// ### `/metrics`
158///
159/// Metrics endpoint which can be used by Prometheus to scrape metrics.
160///
161/// This endpoint is only enabled if the `prometheus` feature flag is enabled
162/// and a metrics registry is provided through the config.
163///
164/// ### `/pprof/cpu` (Unix only)
165///
166/// pprof cpu endpoint to capture a cpu profile.
167///
168/// #### Query Parameters
169///
170/// - `freq`: Sampling frequency in Hz.
171/// - `duration`: Duration the profile should be captured for in s.
172/// - `ignore`: List of functions to exclude from the profile.
173///
174/// This endpoint is only enabled if the `pprof` feature flag is enabled.
175///
176/// ### `/opentelemetry/flush`
177///
178/// OpenTelemetry flush endpoint.
179///
180/// This endpoint is only enabled if one of the `opentelemetry` feature flags is
181/// enabled and an OpenTelemetry config is provided through the config.
182pub struct TelemetrySvc;
183
184/// Implement this trait to configure the telemetry service.
185pub trait TelemetryConfig: Global {
186    /// Return true if the service is enabled.
187    fn enabled(&self) -> bool {
188        true
189    }
190
191    /// Return the bind address for the http server.
192    fn bind_address(&self) -> Option<std::net::SocketAddr> {
193        None
194    }
195
196    /// Return the http server name.
197    fn http_server_name(&self) -> &str {
198        "scuffle-bootstrap-telemetry"
199    }
200
201    /// Return a health check to determine if the service is healthy.
202    ///
203    /// Always healthy by default.
204    fn health_check(&self) -> impl std::future::Future<Output = Result<(), anyhow::Error>> + Send {
205        std::future::ready(Ok(()))
206    }
207
208    /// Return a Prometheus metrics registry to scrape metrics from.
209    ///
210    /// Returning `Some` will enable the `/metrics` http endpoint which can be
211    /// used by Prometheus to scrape metrics.
212    ///
213    /// Disabled (`None`) by default.
214    #[cfg(feature = "prometheus")]
215    #[cfg_attr(docsrs, doc(cfg(feature = "prometheus")))]
216    fn prometheus_metrics_registry(&self) -> Option<&prometheus_client::registry::Registry> {
217        None
218    }
219
220    /// Pass an OpenTelemetry instance to the service.
221    ///
222    /// If provided the service will flush and shutdown the OpenTelemetry
223    /// instance when it shuts down.
224    /// Additionally, the service provides the `/opentelemetry/flush` http
225    /// endpoint to manually flush the data.
226    #[cfg(feature = "opentelemetry")]
227    #[cfg_attr(docsrs, doc(cfg(feature = "opentelemetry")))]
228    fn opentelemetry(&self) -> Option<&opentelemetry::OpenTelemetry> {
229        None
230    }
231}
232
233impl<Global: TelemetryConfig> Service<Global> for TelemetrySvc {
234    async fn enabled(&self, global: &std::sync::Arc<Global>) -> anyhow::Result<bool> {
235        Ok(global.enabled())
236    }
237
238    async fn run(self, global: std::sync::Arc<Global>, ctx: scuffle_context::Context) -> anyhow::Result<()> {
239        if let Some(bind_addr) = global.bind_address() {
240            let global = global.clone();
241
242            let service = scuffle_http::service::fn_http_service(move |req| {
243                let global = global.clone();
244                async move {
245                    match req.uri().path() {
246                        "/health" => health_check(&global, req).await,
247                        #[cfg(feature = "prometheus")]
248                        "/metrics" => metrics(&global, req).await,
249                        #[cfg(all(feature = "pprof", unix))]
250                        "/pprof/cpu" => pprof(&global, req).await,
251                        #[cfg(feature = "opentelemetry")]
252                        "/opentelemetry/flush" => opentelemetry_flush(&global).await,
253                        _ => Ok(http::Response::builder()
254                            .status(http::StatusCode::NOT_FOUND)
255                            .body(http_body_util::Full::new(Bytes::from_static(b"not found")))?),
256                    }
257                }
258            });
259
260            scuffle_http::HttpServer::builder()
261                .bind(bind_addr)
262                .ctx(ctx)
263                .service_factory(scuffle_http::service::service_clone_factory(service))
264                .build()
265                .run()
266                .await
267                .context("server run")?;
268        } else {
269            ctx.done().await;
270        }
271
272        #[cfg(feature = "opentelemetry")]
273        if let Some(opentelemetry) = global.opentelemetry().cloned() {
274            if opentelemetry.is_enabled() {
275                tokio::task::spawn_blocking(move || opentelemetry.shutdown())
276                    .await
277                    .context("opentelemetry shutdown spawn")?
278                    .context("opentelemetry shutdown")?;
279            }
280        }
281
282        Ok(())
283    }
284}
285
286async fn health_check<G: TelemetryConfig>(
287    global: &std::sync::Arc<G>,
288    _: http::Request<scuffle_http::body::IncomingBody>,
289) -> Result<http::Response<http_body_util::Full<Bytes>>, http::Error> {
290    if let Err(err) = global.health_check().await {
291        tracing::error!("health check failed: {err}");
292        Ok(http::Response::builder()
293            .status(http::StatusCode::INTERNAL_SERVER_ERROR)
294            .body(http_body_util::Full::new(format!("{err:#}").into()))?)
295    } else {
296        Ok(http::Response::builder()
297            .status(http::StatusCode::OK)
298            .body(http_body_util::Full::new(Bytes::from_static(b"ok")))?)
299    }
300}
301
302#[cfg(feature = "prometheus")]
303async fn metrics<G: TelemetryConfig>(
304    global: &std::sync::Arc<G>,
305    _: http::Request<scuffle_http::body::IncomingBody>,
306) -> Result<http::Response<http_body_util::Full<Bytes>>, http::Error> {
307    if let Some(metrics) = global.prometheus_metrics_registry() {
308        let mut buf = String::new();
309        if prometheus_client::encoding::text::encode(&mut buf, metrics).is_err() {
310            tracing::error!("metrics encode failed");
311            return http::Response::builder()
312                .status(http::StatusCode::INTERNAL_SERVER_ERROR)
313                .body(http_body_util::Full::new("metrics encode failed".to_string().into()));
314        }
315
316        Ok(http::Response::builder()
317            .status(http::StatusCode::OK)
318            .body(http_body_util::Full::new(Bytes::from(buf)))?)
319    } else {
320        Ok(http::Response::builder()
321            .status(http::StatusCode::NOT_FOUND)
322            .body(http_body_util::Full::new(Bytes::from_static(b"not found")))?)
323    }
324}
325
326#[cfg(unix)]
327#[cfg(feature = "pprof")]
328async fn pprof<G: TelemetryConfig>(
329    _: &std::sync::Arc<G>,
330    req: http::Request<scuffle_http::body::IncomingBody>,
331) -> Result<http::Response<http_body_util::Full<Bytes>>, http::Error> {
332    let query = req.uri().query();
333    let query = query.map(querystring::querify).into_iter().flatten();
334
335    let mut freq = 100;
336    let mut duration = std::time::Duration::from_secs(5);
337    let mut ignore_list = Vec::new();
338
339    for (key, value) in query {
340        if key == "freq" {
341            freq = match value.parse() {
342                Ok(v) => v,
343                Err(err) => {
344                    return http::Response::builder()
345                        .status(http::StatusCode::BAD_REQUEST)
346                        .body(http_body_util::Full::new(format!("invalid freq: {err:#}").into()));
347                }
348            };
349        } else if key == "duration" {
350            duration = match value.parse() {
351                Ok(v) => std::time::Duration::from_secs(v),
352                Err(err) => {
353                    return http::Response::builder()
354                        .status(http::StatusCode::BAD_REQUEST)
355                        .body(http_body_util::Full::new(format!("invalid duration: {err:#}").into()));
356                }
357            };
358        } else if key == "ignore" {
359            ignore_list.push(value);
360        }
361    }
362
363    let cpu = scuffle_pprof::Cpu::new(freq, &ignore_list);
364
365    match tokio::task::spawn_blocking(move || cpu.capture(duration)).await {
366        Ok(Ok(data)) => Ok(http::Response::builder()
367            .status(http::StatusCode::OK)
368            .body(http_body_util::Full::new(Bytes::from(data)))?),
369        Ok(Err(err)) => {
370            tracing::error!("cpu capture failed: {err:#}");
371            Ok(http::Response::builder()
372                .status(http::StatusCode::INTERNAL_SERVER_ERROR)
373                .body(http_body_util::Full::new(format!("{err:#}").into()))?)
374        }
375        Err(err) => {
376            tracing::error!("cpu capture failed: {err:#}");
377            Ok(http::Response::builder()
378                .status(http::StatusCode::INTERNAL_SERVER_ERROR)
379                .body(http_body_util::Full::new(format!("{err:#}").into()))?)
380        }
381    }
382}
383
384#[cfg(feature = "opentelemetry")]
385async fn opentelemetry_flush<G: TelemetryConfig>(
386    global: &std::sync::Arc<G>,
387) -> Result<http::Response<http_body_util::Full<Bytes>>, http::Error> {
388    if let Some(opentelemetry) = global.opentelemetry().cloned() {
389        if opentelemetry.is_enabled() {
390            match tokio::task::spawn_blocking(move || opentelemetry.flush()).await {
391                Ok(Ok(())) => Ok(http::Response::builder()
392                    .status(http::StatusCode::OK)
393                    .body(http_body_util::Full::new(Bytes::from_static(b"ok")))?),
394                Ok(Err(err)) => {
395                    tracing::error!("opentelemetry flush failed: {err:#}");
396                    Ok(http::Response::builder()
397                        .status(http::StatusCode::INTERNAL_SERVER_ERROR)
398                        .body(http_body_util::Full::new(format!("{err:#}").into()))?)
399                }
400                Err(err) => {
401                    tracing::error!("opentelemetry flush spawn failed: {err:#}");
402                    Ok(http::Response::builder()
403                        .status(http::StatusCode::INTERNAL_SERVER_ERROR)
404                        .body(http_body_util::Full::new(format!("{err:#}").into()))?)
405                }
406            }
407        } else {
408            Ok(http::Response::builder()
409                .status(http::StatusCode::OK)
410                .body(http_body_util::Full::new(Bytes::from_static(b"ok")))?)
411        }
412    } else {
413        Ok(http::Response::builder()
414            .status(http::StatusCode::NOT_FOUND)
415            .body(http_body_util::Full::new(Bytes::from_static(b"not found")))?)
416    }
417}
418
419#[cfg(test)]
420#[cfg_attr(all(test, coverage_nightly), coverage(off))]
421#[cfg(all(
422    feature = "opentelemetry-metrics",
423    feature = "opentelemetry-traces",
424    feature = "opentelemetry-logs"
425))]
426mod tests {
427    use std::net::SocketAddr;
428    use std::sync::Arc;
429
430    #[cfg(unix)]
431    use bytes::Bytes;
432    #[cfg(feature = "opentelemetry-logs")]
433    use opentelemetry_sdk::logs::SdkLoggerProvider;
434    #[cfg(feature = "opentelemetry-metrics")]
435    use opentelemetry_sdk::metrics::SdkMeterProvider;
436    #[cfg(feature = "opentelemetry-traces")]
437    use opentelemetry_sdk::trace::SdkTracerProvider;
438    use scuffle_bootstrap::{GlobalWithoutConfig, Service};
439
440    use crate::{TelemetryConfig, TelemetrySvc};
441
442    async fn request_metrics(addr: SocketAddr) -> reqwest::Result<String> {
443        reqwest::get(format!("http://{addr}/metrics"))
444            .await
445            .unwrap()
446            .error_for_status()?
447            .text()
448            .await
449    }
450
451    async fn request_health(addr: SocketAddr) -> String {
452        reqwest::get(format!("http://{addr}/health"))
453            .await
454            .unwrap()
455            .error_for_status()
456            .expect("health check failed")
457            .text()
458            .await
459            .expect("health check text")
460    }
461
462    #[cfg(unix)]
463    async fn request_pprof(addr: SocketAddr, freq: &str, duration: &str) -> reqwest::Result<Bytes> {
464        reqwest::get(format!("http://{addr}/pprof/cpu?freq={freq}&duration={duration}"))
465            .await
466            .unwrap()
467            .error_for_status()?
468            .bytes()
469            .await
470    }
471
472    async fn flush_opentelemetry(addr: SocketAddr) -> reqwest::Result<reqwest::Response> {
473        reqwest::get(format!("http://{addr}/opentelemetry/flush"))
474            .await
475            .unwrap()
476            .error_for_status()
477    }
478
479    #[cfg(not(valgrind))] // test is time-sensitive
480    #[tokio::test]
481    async fn telemetry_http_server() {
482        struct TestGlobal {
483            bind_addr: SocketAddr,
484            #[cfg(feature = "prometheus")]
485            prometheus: prometheus_client::registry::Registry,
486            open_telemetry: crate::opentelemetry::OpenTelemetry,
487        }
488
489        impl GlobalWithoutConfig for TestGlobal {
490            async fn init() -> anyhow::Result<Arc<Self>> {
491                let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
492                let bind_addr = listener.local_addr()?;
493
494                let mut prometheus = prometheus_client::registry::Registry::default();
495
496                let exporter = scuffle_metrics::prometheus::exporter().build();
497                prometheus.register_collector(exporter.collector());
498
499                let metrics = SdkMeterProvider::builder().with_reader(exporter).build();
500                opentelemetry::global::set_meter_provider(metrics.clone());
501
502                let tracer = SdkTracerProvider::default();
503                opentelemetry::global::set_tracer_provider(tracer.clone());
504
505                let logger = SdkLoggerProvider::builder().build();
506
507                let open_telemetry = crate::opentelemetry::OpenTelemetry::new()
508                    .with_metrics(metrics)
509                    .with_traces(tracer)
510                    .with_logs(logger);
511
512                Ok(Arc::new(TestGlobal {
513                    bind_addr,
514                    prometheus,
515                    open_telemetry,
516                }))
517            }
518        }
519
520        impl TelemetryConfig for TestGlobal {
521            fn bind_address(&self) -> Option<std::net::SocketAddr> {
522                Some(self.bind_addr)
523            }
524
525            fn prometheus_metrics_registry(&self) -> Option<&prometheus_client::registry::Registry> {
526                Some(&self.prometheus)
527            }
528
529            fn opentelemetry(&self) -> Option<&crate::opentelemetry::OpenTelemetry> {
530                Some(&self.open_telemetry)
531            }
532        }
533
534        #[scuffle_metrics::metrics]
535        mod example {
536            use scuffle_metrics::{CounterU64, MetricEnum};
537
538            #[derive(MetricEnum)]
539            pub enum Kind {
540                Http,
541                Grpc,
542            }
543
544            #[metrics(unit = "requests")]
545            pub fn request(kind: Kind) -> CounterU64;
546        }
547
548        let global = <TestGlobal as GlobalWithoutConfig>::init().await.unwrap();
549
550        let bind_addr = global.bind_addr;
551
552        assert!(TelemetrySvc.enabled(&global).await.unwrap());
553
554        let task_handle = tokio::spawn(TelemetrySvc.run(global, scuffle_context::Context::global()));
555
556        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
557
558        let health = request_health(bind_addr).await;
559        assert_eq!(health, "ok");
560
561        let metrics = request_metrics(bind_addr).await.expect("metrics failed");
562        assert!(metrics.starts_with("# HELP target Information about the target\n"));
563        assert!(metrics.contains("# TYPE target info\n"));
564        assert!(metrics.contains("service_name=\"unknown_service\""));
565        assert!(metrics.contains("telemetry_sdk_language=\"rust\""));
566        assert!(metrics.contains("telemetry_sdk_name=\"opentelemetry\""));
567        assert!(metrics.ends_with("# EOF\n"));
568
569        example::request(example::Kind::Http).incr();
570
571        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
572
573        let metrics = request_metrics(bind_addr).await.expect("metrics failed");
574        assert!(metrics.contains("# UNIT example_request_requests requests\n"));
575        assert!(metrics.contains("example_request_requests_total{"));
576        assert!(metrics.contains(format!("otel_scope_name=\"{}\"", env!("CARGO_PKG_NAME")).as_str()));
577        assert!(metrics.contains(format!("otel_scope_version=\"{}\"", env!("CARGO_PKG_VERSION")).as_str()));
578        assert!(metrics.contains("kind=\"Http\""));
579        assert!(metrics.contains("} 1\n"));
580        assert!(metrics.ends_with("# EOF\n"));
581
582        example::request(example::Kind::Http).incr();
583
584        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
585
586        let metrics = request_metrics(bind_addr).await.expect("metrics failed");
587        assert!(metrics.contains("# UNIT example_request_requests requests\n"));
588        assert!(metrics.contains("example_request_requests_total{"));
589        assert!(metrics.contains(format!("otel_scope_name=\"{}\"", env!("CARGO_PKG_NAME")).as_str()));
590        assert!(metrics.contains(format!("otel_scope_version=\"{}\"", env!("CARGO_PKG_VERSION")).as_str()));
591        assert!(metrics.contains("kind=\"Http\""));
592        assert!(metrics.contains("} 2\n"));
593        assert!(metrics.ends_with("# EOF\n"));
594
595        #[cfg(unix)]
596        {
597            let timer = std::time::Instant::now();
598            assert!(!request_pprof(bind_addr, "100", "2").await.expect("pprof failed").is_empty());
599            assert!(timer.elapsed() > std::time::Duration::from_secs(2));
600
601            let res = request_pprof(bind_addr, "invalid", "2").await.expect_err("error expected");
602            assert!(res.is_status());
603            assert_eq!(res.status(), Some(reqwest::StatusCode::BAD_REQUEST));
604
605            let res = request_pprof(bind_addr, "100", "invalid").await.expect_err("error expected");
606            assert!(res.is_status());
607            assert_eq!(res.status(), Some(reqwest::StatusCode::BAD_REQUEST));
608        }
609
610        assert!(flush_opentelemetry(bind_addr).await.is_ok());
611
612        // Not found
613        let res = reqwest::get(format!("http://{bind_addr}/not_found")).await.unwrap();
614        assert_eq!(res.status(), reqwest::StatusCode::NOT_FOUND);
615
616        scuffle_context::Handler::global().shutdown().await;
617
618        task_handle.await.unwrap().unwrap();
619    }
620
621    #[cfg(not(valgrind))] // test is time-sensitive
622    #[tokio::test]
623    async fn empty_telemetry_http_server() {
624        struct TestGlobal {
625            bind_addr: SocketAddr,
626        }
627
628        impl GlobalWithoutConfig for TestGlobal {
629            async fn init() -> anyhow::Result<Arc<Self>> {
630                let listener = std::net::TcpListener::bind("127.0.0.1:0")?;
631                let bind_addr = listener.local_addr()?;
632
633                Ok(Arc::new(TestGlobal { bind_addr }))
634            }
635        }
636
637        impl TelemetryConfig for TestGlobal {
638            fn bind_address(&self) -> Option<std::net::SocketAddr> {
639                Some(self.bind_addr)
640            }
641        }
642
643        let global = <TestGlobal as GlobalWithoutConfig>::init().await.unwrap();
644
645        let bind_addr = global.bind_addr;
646
647        assert!(TelemetrySvc.enabled(&global).await.unwrap());
648
649        let task_handle = tokio::spawn(TelemetrySvc.run(global, scuffle_context::Context::global()));
650        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
651
652        let health = request_health(bind_addr).await;
653        assert_eq!(health, "ok");
654
655        let res = request_metrics(bind_addr).await.expect_err("error expected");
656        assert!(res.is_status());
657        assert_eq!(res.status(), Some(reqwest::StatusCode::NOT_FOUND));
658
659        #[cfg(unix)]
660        {
661            let timer = std::time::Instant::now();
662            assert!(!request_pprof(bind_addr, "100", "2").await.expect("pprof failed").is_empty());
663            assert!(timer.elapsed() > std::time::Duration::from_secs(2));
664        }
665
666        let err = flush_opentelemetry(bind_addr).await.expect_err("error expected");
667        assert!(err.is_status());
668        assert_eq!(err.status(), Some(reqwest::StatusCode::NOT_FOUND));
669
670        scuffle_context::Handler::global().shutdown().await;
671
672        task_handle.await.unwrap().unwrap();
673    }
674}