From 752715e55b1eeef846dbda609b33eaac42751161 Mon Sep 17 00:00:00 2001 From: mstar Date: Sun, 23 Mar 2025 08:23:51 +0100 Subject: [PATCH] WIP node request --- rust/src/pipewire/mod.rs | 44 ++++++++++++++---- rust/src/pipewire/stream_node.rs | 80 ++++++++++++++++++++++++++++++++ 2 files changed, 115 insertions(+), 9 deletions(-) create mode 100644 rust/src/pipewire/stream_node.rs diff --git a/rust/src/pipewire/mod.rs b/rust/src/pipewire/mod.rs index 336cf86..babdec4 100644 --- a/rust/src/pipewire/mod.rs +++ b/rust/src/pipewire/mod.rs @@ -3,9 +3,14 @@ use std::thread; use godot::{classes::notify::NodeNotification, prelude::*}; use pipewire::{ channel::{Receiver, Sender}, + context::Context, + core::Core, main_loop::MainLoop, + stream::Stream, }; +mod stream_node; + #[derive(GodotClass)] #[class(base=Node)] /// Node for interacting with Pipewire @@ -15,10 +20,19 @@ pub(crate) struct Pipewire { /// Handle for the thread running the pipewire main loop thread_handle: Option>, - shutdown_sender: Sender, + shutdown_sender: Sender, } -struct Terminate; +#[derive(Debug, Clone, PartialEq)] +enum InboundPwSignal { + Terminate, + ConnectToStream(u32), +} + +#[derive(Debug)] +enum ResultPwSignal { + StreamConnection(Result), +} #[godot_api] impl INode for Pipewire { @@ -37,33 +51,37 @@ impl INode for Pipewire { || what == NodeNotification::EXIT_TREE { godot_print!("Sending shutdown signal to pipewire thread"); - let _ = self.shutdown_sender.send(Terminate {}); + let _ = self.shutdown_sender.send(InboundPwSignal::Terminate); let handle = self.thread_handle.take().unwrap(); let _join_result = handle.join(); godot_print!("Pipewire thread completed"); } } + fn process(&mut self, _delta: f64) {} } #[godot_api] impl Pipewire { pub const SINGLETON: &'static str = "Pipewire"; - fn start_pipewire_thread() -> (thread::JoinHandle<()>, Sender) { + fn start_pipewire_thread() -> (thread::JoinHandle<()>, Sender) { godot_print!("Starting pipewire thread"); - let (shutdown_sender, shutdown_receiver) = pipewire::channel::channel::(); + let (shutdown_sender, shutdown_receiver) = pipewire::channel::channel::(); let thread_handle = thread::spawn(|| Self::pipewire_main(shutdown_receiver)); (thread_handle, shutdown_sender) } /// Func that actually runs the main loop in the thread - fn pipewire_main(shutdown_receiver: Receiver) { + fn pipewire_main(shutdown_receiver: Receiver) { let mainloop = MainLoop::new(None).expect("Failed to create pipewire main loop"); + let context = + Context::new(&mainloop).expect("Failed to get context from pipewire main loop"); + let core = context + .connect(None) + .expect("Failed to get core from pipewire context"); let _receiver = shutdown_receiver.attach(mainloop.loop_(), { - godot_print!("Stopping pipewire thread"); - let mainloop = mainloop.clone(); - move |_| mainloop.quit() + pipewire_loop_signal_handler(mainloop.clone(), core.clone()) }); godot_print!("Entering pipewire main loop"); @@ -71,3 +89,11 @@ impl Pipewire { println!("Pipewire main loop exited"); } } + +fn pipewire_loop_signal_handler(mainloop: MainLoop, _core: Core) -> impl Fn(InboundPwSignal) { + godot_print!("Stopping pipewire thread"); + move |signal| match signal { + InboundPwSignal::Terminate => mainloop.quit(), + InboundPwSignal::ConnectToStream(_stream_id) => {} + } +} diff --git a/rust/src/pipewire/stream_node.rs b/rust/src/pipewire/stream_node.rs new file mode 100644 index 0000000..bb9fd9b --- /dev/null +++ b/rust/src/pipewire/stream_node.rs @@ -0,0 +1,80 @@ +use godot::{ + classes::{IVideoStream, IVideoStreamPlayback, Texture2D, VideoStream, VideoStreamPlayback}, + prelude::*, +}; +use pipewire::stream::Stream; + +#[derive(GodotClass)] +#[class(base=VideoStream)] +pub(crate) struct PwVideoStreamNode { + base: Base, + stream: Option, +} + +#[derive(GodotClass)] +#[class(base=VideoStreamPlayback)] +pub(crate) struct PwVideoStreamPlayback { + base: Base, + paused: bool, +} + +#[godot_api] +impl IVideoStream for PwVideoStreamNode { + fn init(base: Base) -> Self { + Self { base, stream: None } + } +} + +#[godot_api] +impl IVideoStreamPlayback for PwVideoStreamPlayback { + fn init(base: Base) -> Self { + Self { base, paused: true } + } + + fn update(&mut self, _delta: f64) { + if self.paused { + return; + } + // TODO: Get latest frame or keep last one if no new one + todo!() + } + + fn play(&mut self) { + self.paused = false; + // TODO: Start copying frames again + todo!() + } + + fn stop(&mut self) { + self.paused = true; + // TODO: Stop updating the last known frame + todo!() + } + + fn is_paused(&self) -> bool { + self.paused + } + + fn set_paused(&mut self, paused: bool) { + if paused { + self.stop(); + } else { + self.play(); + } + } + + fn is_playing(&self) -> bool { + !self.paused + } + + fn get_texture(&self) -> Option> { + // TODO: Transform latest frame into Godot image + todo!() + } +} + +impl PwVideoStreamPlayback { + pub fn setup(&mut self) { + // TODO: Store pipewire stream and fetch first frame + } +}