1#![cfg_attr(all(coverage_nightly, test), feature(coverage_attribute))]
69#![deny(missing_docs)]
70#![deny(unsafe_code)]
71#![deny(unreachable_pub)]
72
73pub mod chunk;
74pub mod command_messages;
75pub mod error;
76pub mod handshake;
77pub mod messages;
78pub mod protocol_control_messages;
79pub mod session;
80pub mod user_control_messages;
81
82pub use session::server::ServerSession;
83
84#[cfg(test)]
85#[cfg_attr(all(test, coverage_nightly), coverage(off))]
86mod tests {
87 use std::path::PathBuf;
88 use std::time::Duration;
89
90 use scuffle_future_ext::FutureExt;
91 use tokio::process::Command;
92 use tokio::sync::{mpsc, oneshot};
93
94 use crate::session::server::{ServerSession, ServerSessionError, SessionData, SessionHandler};
95
96 enum Event {
97 Publish {
98 stream_id: u32,
99 app_name: String,
100 stream_name: String,
101 response: oneshot::Sender<Result<(), ServerSessionError>>,
102 },
103 Unpublish {
104 stream_id: u32,
105 response: oneshot::Sender<Result<(), ServerSessionError>>,
106 },
107 Data {
108 stream_id: u32,
109 data: SessionData,
110 response: oneshot::Sender<Result<(), ServerSessionError>>,
111 },
112 }
113
114 struct Handler(mpsc::Sender<Event>);
115
116 impl SessionHandler for Handler {
117 async fn on_publish(&mut self, stream_id: u32, app_name: &str, stream_name: &str) -> Result<(), ServerSessionError> {
118 let (response, reciever) = oneshot::channel();
119
120 self.0
121 .send(Event::Publish {
122 stream_id,
123 app_name: app_name.to_string(),
124 stream_name: stream_name.to_string(),
125 response,
126 })
127 .await
128 .unwrap();
129
130 reciever.await.unwrap()
131 }
132
133 async fn on_unpublish(&mut self, stream_id: u32) -> Result<(), ServerSessionError> {
134 let (response, reciever) = oneshot::channel();
135
136 self.0.send(Event::Unpublish { stream_id, response }).await.unwrap();
137
138 reciever.await.unwrap()
139 }
140
141 async fn on_data(&mut self, stream_id: u32, data: SessionData) -> Result<(), ServerSessionError> {
142 let (response, reciever) = oneshot::channel();
143 self.0
144 .send(Event::Data {
145 stream_id,
146 data,
147 response,
148 })
149 .await
150 .unwrap();
151
152 reciever.await.unwrap()
153 }
154 }
155
156 #[cfg(not(valgrind))] #[tokio::test]
158 async fn test_basic_rtmp_clean() {
159 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.expect("failed to bind");
160 let addr = listener.local_addr().unwrap();
161
162 let dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../../assets");
163
164 let _ffmpeg = Command::new("ffmpeg")
165 .args([
166 "-loglevel",
167 "debug",
168 "-re",
169 "-i",
170 dir.join("avc_aac.mp4").to_str().expect("failed to get path"),
171 "-r",
172 "30",
173 "-t",
174 "1", "-c",
176 "copy",
177 "-f",
178 "flv",
179 &format!("rtmp://{}:{}/live/stream-key", addr.ip(), addr.port()),
180 ])
181 .stdout(std::process::Stdio::inherit())
182 .stderr(std::process::Stdio::inherit())
183 .spawn()
184 .expect("failed to execute ffmpeg");
185
186 let (ffmpeg_stream, _) = listener
187 .accept()
188 .with_timeout(Duration::from_millis(1000))
189 .await
190 .expect("timed out")
191 .expect("failed to accept");
192
193 let (ffmpeg_handle, mut ffmpeg_event_reciever) = {
194 let (ffmpeg_event_producer, ffmpeg_event_reciever) = mpsc::channel(1);
195 let session = ServerSession::new(ffmpeg_stream, Handler(ffmpeg_event_producer));
196
197 (
198 tokio::spawn(async move {
199 let r = session.run().await;
200 println!("ffmpeg session ended: {r:?}");
201 r
202 }),
203 ffmpeg_event_reciever,
204 )
205 };
206
207 let event = ffmpeg_event_reciever
208 .recv()
209 .with_timeout(Duration::from_millis(1000))
210 .await
211 .expect("timed out")
212 .expect("failed to recv event");
213
214 match event {
215 Event::Publish {
216 stream_id,
217 app_name,
218 stream_name,
219 response,
220 } => {
221 assert_eq!(stream_id, 1);
222 assert_eq!(app_name, "live");
223 assert_eq!(stream_name, "stream-key");
224 response.send(Ok(())).expect("failed to send response");
225 }
226 _ => panic!("unexpected event"),
227 }
228
229 let mut got_video = false;
230 let mut got_audio = false;
231 let mut got_metadata = false;
232
233 while let Some(data) = ffmpeg_event_reciever
234 .recv()
235 .with_timeout(Duration::from_millis(1000))
236 .await
237 .expect("timed out")
238 {
239 match data {
240 Event::Data {
241 stream_id,
242 response,
243 data,
244 ..
245 } => {
246 match data {
247 SessionData::Video { .. } => got_video = true,
248 SessionData::Audio { .. } => got_audio = true,
249 SessionData::Amf0 { .. } => got_metadata = true,
250 }
251 response.send(Ok(())).expect("failed to send response");
252 assert_eq!(stream_id, 1);
253 }
254 Event::Unpublish { stream_id, response } => {
255 assert_eq!(stream_id, 1);
256 response.send(Ok(())).expect("failed to send response");
257 break;
258 }
259 _ => panic!("unexpected event"),
260 }
261 }
262
263 assert!(got_video);
264 assert!(got_audio);
265 assert!(got_metadata);
266
267 if ffmpeg_event_reciever
268 .recv()
269 .with_timeout(Duration::from_millis(1000))
270 .await
271 .expect("timed out")
272 .is_some()
273 {
274 panic!("unexpected event");
275 }
276
277 assert!(
278 ffmpeg_handle
279 .await
280 .expect("failed to join handle")
281 .expect("failed to handle ffmpeg connection")
282 );
283
284 }
287
288 #[cfg(all(not(valgrind), not(windows)))]
291 #[tokio::test]
292 async fn test_basic_rtmp_unclean() {
293 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.expect("failed to bind");
294 let addr = listener.local_addr().unwrap();
295
296 let dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../../assets");
297
298 let mut ffmpeg = Command::new("ffmpeg")
299 .args([
300 "-loglevel",
301 "debug",
302 "-re",
303 "-i",
304 dir.join("avc_aac.mp4").to_str().expect("failed to get path"),
305 "-r",
306 "30",
307 "-t",
308 "1", "-c",
310 "copy",
311 "-f",
312 "flv",
313 &format!("rtmp://{}:{}/live/stream-key", addr.ip(), addr.port()),
314 ])
315 .stdout(std::process::Stdio::inherit())
316 .stderr(std::process::Stdio::inherit())
317 .spawn()
318 .expect("failed to execute ffmpeg");
319
320 let (ffmpeg_stream, _) = listener
321 .accept()
322 .with_timeout(Duration::from_millis(1000))
323 .await
324 .expect("timed out")
325 .expect("failed to accept");
326
327 let (ffmpeg_handle, mut ffmpeg_event_reciever) = {
328 let (ffmpeg_event_producer, ffmpeg_event_reciever) = mpsc::channel(1);
329 let session = ServerSession::new(ffmpeg_stream, Handler(ffmpeg_event_producer));
330
331 (
332 tokio::spawn(async move {
333 let r = session.run().await;
334 println!("ffmpeg session ended: {r:?}");
335 r
336 }),
337 ffmpeg_event_reciever,
338 )
339 };
340
341 let event = ffmpeg_event_reciever
342 .recv()
343 .with_timeout(Duration::from_millis(1000))
344 .await
345 .expect("timed out")
346 .expect("failed to recv event");
347
348 match event {
349 Event::Publish {
350 stream_id,
351 app_name,
352 stream_name,
353 response,
354 } => {
355 assert_eq!(stream_id, 1);
356 assert_eq!(app_name, "live");
357 assert_eq!(stream_name, "stream-key");
358 response.send(Ok(())).expect("failed to send response");
359 }
360 _ => panic!("unexpected event"),
361 }
362
363 let mut got_video = false;
364 let mut got_audio = false;
365 let mut got_metadata = false;
366
367 while let Some(data) = ffmpeg_event_reciever
368 .recv()
369 .with_timeout(Duration::from_millis(1000))
370 .await
371 .expect("timed out")
372 {
373 match data {
374 Event::Data {
375 stream_id,
376 response,
377 data,
378 ..
379 } => {
380 assert_eq!(stream_id, 1);
381 match data {
382 SessionData::Video { .. } => got_video = true,
383 SessionData::Audio { .. } => got_audio = true,
384 SessionData::Amf0 { .. } => got_metadata = true,
385 }
386 response.send(Ok(())).expect("failed to send response");
387 }
388 _ => panic!("unexpected event"),
389 }
390
391 if got_video && got_audio && got_metadata {
392 break;
393 }
394 }
395
396 assert!(got_video);
397 assert!(got_audio);
398 assert!(got_metadata);
399
400 ffmpeg.kill().await.expect("failed to kill ffmpeg");
401
402 while let Some(data) = ffmpeg_event_reciever
403 .recv()
404 .with_timeout(Duration::from_millis(1000))
405 .await
406 .expect("timed out")
407 {
408 match data {
409 Event::Data { response, .. } => {
410 response.send(Ok(())).expect("failed to send response");
411 }
412 _ => panic!("unexpected event"),
413 }
414 }
415
416 assert!(
418 !ffmpeg_handle
419 .await
420 .expect("failed to join handle")
421 .expect("failed to handle ffmpeg connection")
422 );
423 }
424}