scuffle_rtmp/
lib.rs

1//! A crate for handling RTMP server connections.
2//!
3//! ## Specifications
4//!
5//! | Name | Version | Link | Comments |
6//! | --- | --- | --- | --- |
7//! | Adobe’s Real Time Messaging Protocol | `1.0` | <https://github.com/veovera/enhanced-rtmp/blob/main/docs/legacy/rtmp-v1-0-spec.pdf> | Refered to as 'Legacy RTMP spec' in this documentation |
8//! | Enhancing RTMP, FLV | `v1-2024-02-29-r1` | <https://github.com/veovera/enhanced-rtmp/blob/main/docs/enhanced/enhanced-rtmp-v1.pdf> | |
9//! | Enhanced RTMP | `v2-2024-10-22-b1` | <https://github.com/veovera/enhanced-rtmp/blob/main/docs/enhanced/enhanced-rtmp-v2.pdf> | Refered to as 'Enhanced RTMP spec' in this documentation |
10//!
11//! ## Example
12//!
13//! ```no_run
14//! # use std::io::Cursor;
15//! #
16//! # use scuffle_rtmp::ServerSession;
17//! # use scuffle_rtmp::session::server::{ServerSessionError, SessionData, SessionHandler};
18//! # use tokio::net::TcpListener;
19//! #
20//! struct Handler;
21//!
22//! impl SessionHandler for Handler {
23//!     async fn on_data(&mut self, stream_id: u32, data: SessionData) -> Result<(), ServerSessionError> {
24//!         // Handle incoming video/audio/meta data
25//!         Ok(())
26//!     }
27//!
28//!     async fn on_publish(&mut self, stream_id: u32, app_name: &str, stream_name: &str) -> Result<(), ServerSessionError> {
29//!         // Handle the publish event
30//!         Ok(())
31//!     }
32//!
33//!     async fn on_unpublish(&mut self, stream_id: u32) -> Result<(), ServerSessionError> {
34//!         // Handle the unpublish event
35//!         Ok(())
36//!     }
37//! }
38//!
39//! #[tokio::main]
40//! async fn main() {
41//!     let listener = TcpListener::bind("[::]:1935").await.unwrap();
42//!     // listening on [::]:1935
43//!
44//!     while let Ok((stream, addr)) = listener.accept().await {
45//!         let session = ServerSession::new(stream, Handler);
46//!
47//!         tokio::spawn(async move {
48//!             if let Err(err) = session.run().await {
49//!                 // Handle the session error
50//!             }
51//!         });
52//!     }
53//! }
54//! ```
55//!
56//! ## Status
57//!
58//! This crate is currently under development and is not yet stable.
59//!
60//! Unit tests are not yet fully implemented. Use at your own risk.
61//!
62//! ## License
63//!
64//! This project is licensed under the [MIT](./LICENSE.MIT) or [Apache-2.0](./LICENSE.Apache-2.0) license.
65//! You can choose between one of them if you use this work.
66//!
67//! `SPDX-License-Identifier: MIT OR Apache-2.0`
68#![cfg_attr(all(coverage_nightly, test), feature(coverage_attribute))]
69#![deny(missing_docs)]
70#![deny(unsafe_code)]
71#![deny(unreachable_pub)]
72
73pub mod chunk;
74pub mod command_messages;
75pub mod error;
76pub mod handshake;
77pub mod messages;
78pub mod protocol_control_messages;
79pub mod session;
80pub mod user_control_messages;
81
82pub use session::server::ServerSession;
83
84#[cfg(test)]
85#[cfg_attr(all(test, coverage_nightly), coverage(off))]
86mod tests {
87    use std::path::PathBuf;
88    use std::time::Duration;
89
90    use scuffle_future_ext::FutureExt;
91    use tokio::process::Command;
92    use tokio::sync::{mpsc, oneshot};
93
94    use crate::session::server::{ServerSession, ServerSessionError, SessionData, SessionHandler};
95
96    enum Event {
97        Publish {
98            stream_id: u32,
99            app_name: String,
100            stream_name: String,
101            response: oneshot::Sender<Result<(), ServerSessionError>>,
102        },
103        Unpublish {
104            stream_id: u32,
105            response: oneshot::Sender<Result<(), ServerSessionError>>,
106        },
107        Data {
108            stream_id: u32,
109            data: SessionData,
110            response: oneshot::Sender<Result<(), ServerSessionError>>,
111        },
112    }
113
114    struct Handler(mpsc::Sender<Event>);
115
116    impl SessionHandler for Handler {
117        async fn on_publish(&mut self, stream_id: u32, app_name: &str, stream_name: &str) -> Result<(), ServerSessionError> {
118            let (response, reciever) = oneshot::channel();
119
120            self.0
121                .send(Event::Publish {
122                    stream_id,
123                    app_name: app_name.to_string(),
124                    stream_name: stream_name.to_string(),
125                    response,
126                })
127                .await
128                .unwrap();
129
130            reciever.await.unwrap()
131        }
132
133        async fn on_unpublish(&mut self, stream_id: u32) -> Result<(), ServerSessionError> {
134            let (response, reciever) = oneshot::channel();
135
136            self.0.send(Event::Unpublish { stream_id, response }).await.unwrap();
137
138            reciever.await.unwrap()
139        }
140
141        async fn on_data(&mut self, stream_id: u32, data: SessionData) -> Result<(), ServerSessionError> {
142            let (response, reciever) = oneshot::channel();
143            self.0
144                .send(Event::Data {
145                    stream_id,
146                    data,
147                    response,
148                })
149                .await
150                .unwrap();
151
152            reciever.await.unwrap()
153        }
154    }
155
156    #[cfg(not(valgrind))] // test is time-sensitive, consider refactoring?
157    #[tokio::test]
158    async fn test_basic_rtmp_clean() {
159        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.expect("failed to bind");
160        let addr = listener.local_addr().unwrap();
161
162        let dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../../assets");
163
164        let _ffmpeg = Command::new("ffmpeg")
165            .args([
166                "-loglevel",
167                "debug",
168                "-re",
169                "-i",
170                dir.join("avc_aac.mp4").to_str().expect("failed to get path"),
171                "-r",
172                "30",
173                "-t",
174                "1", // just for the test so it doesn't take too long
175                "-c",
176                "copy",
177                "-f",
178                "flv",
179                &format!("rtmp://{}:{}/live/stream-key", addr.ip(), addr.port()),
180            ])
181            .stdout(std::process::Stdio::inherit())
182            .stderr(std::process::Stdio::inherit())
183            .spawn()
184            .expect("failed to execute ffmpeg");
185
186        let (ffmpeg_stream, _) = listener
187            .accept()
188            .with_timeout(Duration::from_millis(1000))
189            .await
190            .expect("timed out")
191            .expect("failed to accept");
192
193        let (ffmpeg_handle, mut ffmpeg_event_reciever) = {
194            let (ffmpeg_event_producer, ffmpeg_event_reciever) = mpsc::channel(1);
195            let session = ServerSession::new(ffmpeg_stream, Handler(ffmpeg_event_producer));
196
197            (
198                tokio::spawn(async move {
199                    let r = session.run().await;
200                    println!("ffmpeg session ended: {r:?}");
201                    r
202                }),
203                ffmpeg_event_reciever,
204            )
205        };
206
207        let event = ffmpeg_event_reciever
208            .recv()
209            .with_timeout(Duration::from_millis(1000))
210            .await
211            .expect("timed out")
212            .expect("failed to recv event");
213
214        match event {
215            Event::Publish {
216                stream_id,
217                app_name,
218                stream_name,
219                response,
220            } => {
221                assert_eq!(stream_id, 1);
222                assert_eq!(app_name, "live");
223                assert_eq!(stream_name, "stream-key");
224                response.send(Ok(())).expect("failed to send response");
225            }
226            _ => panic!("unexpected event"),
227        }
228
229        let mut got_video = false;
230        let mut got_audio = false;
231        let mut got_metadata = false;
232
233        while let Some(data) = ffmpeg_event_reciever
234            .recv()
235            .with_timeout(Duration::from_millis(1000))
236            .await
237            .expect("timed out")
238        {
239            match data {
240                Event::Data {
241                    stream_id,
242                    response,
243                    data,
244                    ..
245                } => {
246                    match data {
247                        SessionData::Video { .. } => got_video = true,
248                        SessionData::Audio { .. } => got_audio = true,
249                        SessionData::Amf0 { .. } => got_metadata = true,
250                    }
251                    response.send(Ok(())).expect("failed to send response");
252                    assert_eq!(stream_id, 1);
253                }
254                Event::Unpublish { stream_id, response } => {
255                    assert_eq!(stream_id, 1);
256                    response.send(Ok(())).expect("failed to send response");
257                    break;
258                }
259                _ => panic!("unexpected event"),
260            }
261        }
262
263        assert!(got_video);
264        assert!(got_audio);
265        assert!(got_metadata);
266
267        if ffmpeg_event_reciever
268            .recv()
269            .with_timeout(Duration::from_millis(1000))
270            .await
271            .expect("timed out")
272            .is_some()
273        {
274            panic!("unexpected event");
275        }
276
277        assert!(
278            ffmpeg_handle
279                .await
280                .expect("failed to join handle")
281                .expect("failed to handle ffmpeg connection")
282        );
283
284        // TODO: Fix this assertion
285        // assert!(ffmpeg.try_wait().expect("failed to wait for ffmpeg").is_none());
286    }
287
288    // test is time-sensitive, consider refactoring?
289    // windows seems to not let us kill ffmpeg without it cleaning up the stream.
290    #[cfg(all(not(valgrind), not(windows)))]
291    #[tokio::test]
292    async fn test_basic_rtmp_unclean() {
293        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.expect("failed to bind");
294        let addr = listener.local_addr().unwrap();
295
296        let dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../../assets");
297
298        let mut ffmpeg = Command::new("ffmpeg")
299            .args([
300                "-loglevel",
301                "debug",
302                "-re",
303                "-i",
304                dir.join("avc_aac.mp4").to_str().expect("failed to get path"),
305                "-r",
306                "30",
307                "-t",
308                "1", // just for the test so it doesn't take too long
309                "-c",
310                "copy",
311                "-f",
312                "flv",
313                &format!("rtmp://{}:{}/live/stream-key", addr.ip(), addr.port()),
314            ])
315            .stdout(std::process::Stdio::inherit())
316            .stderr(std::process::Stdio::inherit())
317            .spawn()
318            .expect("failed to execute ffmpeg");
319
320        let (ffmpeg_stream, _) = listener
321            .accept()
322            .with_timeout(Duration::from_millis(1000))
323            .await
324            .expect("timed out")
325            .expect("failed to accept");
326
327        let (ffmpeg_handle, mut ffmpeg_event_reciever) = {
328            let (ffmpeg_event_producer, ffmpeg_event_reciever) = mpsc::channel(1);
329            let session = ServerSession::new(ffmpeg_stream, Handler(ffmpeg_event_producer));
330
331            (
332                tokio::spawn(async move {
333                    let r = session.run().await;
334                    println!("ffmpeg session ended: {r:?}");
335                    r
336                }),
337                ffmpeg_event_reciever,
338            )
339        };
340
341        let event = ffmpeg_event_reciever
342            .recv()
343            .with_timeout(Duration::from_millis(1000))
344            .await
345            .expect("timed out")
346            .expect("failed to recv event");
347
348        match event {
349            Event::Publish {
350                stream_id,
351                app_name,
352                stream_name,
353                response,
354            } => {
355                assert_eq!(stream_id, 1);
356                assert_eq!(app_name, "live");
357                assert_eq!(stream_name, "stream-key");
358                response.send(Ok(())).expect("failed to send response");
359            }
360            _ => panic!("unexpected event"),
361        }
362
363        let mut got_video = false;
364        let mut got_audio = false;
365        let mut got_metadata = false;
366
367        while let Some(data) = ffmpeg_event_reciever
368            .recv()
369            .with_timeout(Duration::from_millis(1000))
370            .await
371            .expect("timed out")
372        {
373            match data {
374                Event::Data {
375                    stream_id,
376                    response,
377                    data,
378                    ..
379                } => {
380                    assert_eq!(stream_id, 1);
381                    match data {
382                        SessionData::Video { .. } => got_video = true,
383                        SessionData::Audio { .. } => got_audio = true,
384                        SessionData::Amf0 { .. } => got_metadata = true,
385                    }
386                    response.send(Ok(())).expect("failed to send response");
387                }
388                _ => panic!("unexpected event"),
389            }
390
391            if got_video && got_audio && got_metadata {
392                break;
393            }
394        }
395
396        assert!(got_video);
397        assert!(got_audio);
398        assert!(got_metadata);
399
400        ffmpeg.kill().await.expect("failed to kill ffmpeg");
401
402        while let Some(data) = ffmpeg_event_reciever
403            .recv()
404            .with_timeout(Duration::from_millis(1000))
405            .await
406            .expect("timed out")
407        {
408            match data {
409                Event::Data { response, .. } => {
410                    response.send(Ok(())).expect("failed to send response");
411                }
412                _ => panic!("unexpected event"),
413            }
414        }
415
416        // the server should have detected the ffmpeg process has died uncleanly
417        assert!(
418            !ffmpeg_handle
419                .await
420                .expect("failed to join handle")
421                .expect("failed to handle ffmpeg connection")
422        );
423    }
424}