diff options
author | Gregory Terzian <gterzian@users.noreply.github.com> | 2018-08-25 02:31:15 +0800 |
---|---|---|
committer | Gregory Terzian <gterzian@users.noreply.github.com> | 2018-09-12 11:25:45 +0800 |
commit | b977b4994c678ce1d9bca69be72d095522c25f71 (patch) | |
tree | 2a9d209d87c33cc28f012968b075beb7236b4011 /components/channel/lib.rs | |
parent | 704f7a06b1dbec15abab31ebb2f1893c8d99def0 (diff) | |
download | servo-b977b4994c678ce1d9bca69be72d095522c25f71.tar.gz servo-b977b4994c678ce1d9bca69be72d095522c25f71.zip |
add servo_channel crate
Diffstat (limited to 'components/channel/lib.rs')
-rw-r--r-- | components/channel/lib.rs | 163 |
1 files changed, 163 insertions, 0 deletions
diff --git a/components/channel/lib.rs b/components/channel/lib.rs new file mode 100644 index 00000000000..87950fbe22a --- /dev/null +++ b/components/channel/lib.rs @@ -0,0 +1,163 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +extern crate crossbeam_channel; +extern crate ipc_channel; +extern crate serde; + +pub mod base_channel { + pub use crossbeam_channel::*; +} +// Needed to re-export the select macro. +pub use crossbeam_channel::*; + +use ipc_channel::ipc::IpcReceiver; +use ipc_channel::router::ROUTER; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; + + +pub fn route_ipc_receiver_to_new_servo_receiver<T>(ipc_receiver: IpcReceiver<T>) -> Receiver<T> +where + T: for<'de> Deserialize<'de> + Serialize + Send + 'static +{ + let (servo_sender, servo_receiver) = channel(); + ROUTER.add_route( + ipc_receiver.to_opaque(), + Box::new(move |message| { + drop(servo_sender.send(message.to::<T>().unwrap())) + }), + ); + servo_receiver +} + +pub fn route_ipc_receiver_to_new_servo_sender<T>(ipc_receiver: IpcReceiver<T>, servo_sender: Sender<T>) +where + T: for<'de> Deserialize<'de> + Serialize + Send + 'static +{ + ROUTER.add_route( + ipc_receiver.to_opaque(), + Box::new(move |message| { + drop(servo_sender.send(message.to::<T>().unwrap())) + }), + ) +} + +pub fn channel<T>() -> (Sender<T>, Receiver<T>) { + let (base_sender, base_receiver) = crossbeam_channel::unbounded::<T>(); + let is_disconnected = Arc::new(AtomicBool::new(false)); + (Sender::new(base_sender, is_disconnected.clone()), + Receiver::new(base_receiver, is_disconnected)) +} + +#[derive(Debug, PartialEq)] +pub enum ChannelError { + ChannelClosedError +} + +pub struct Receiver<T> { + receiver: crossbeam_channel::Receiver<T>, + is_disconnected: Arc<AtomicBool>, +} + +impl<T> Drop for Receiver<T> { + fn drop(&mut self) { + self.is_disconnected.store(true, Ordering::SeqCst); + } +} + +impl<T> Clone for Receiver<T> { + fn clone(&self) -> Self { + Receiver { + receiver: self.receiver.clone(), + is_disconnected: self.is_disconnected.clone(), + } + } +} + +impl<T> Receiver<T> { + pub fn new(receiver: crossbeam_channel::Receiver<T>, is_disconnected: Arc<AtomicBool>) -> Receiver<T> { + Receiver { + receiver, + is_disconnected, + } + } + + pub fn recv(&self) -> Option<T> { + self.receiver.recv() + } + + pub fn try_recv(&self) -> Option<T> { + self.receiver.try_recv() + } + + pub fn len(&self) -> usize { + self.receiver.len() + } + + pub fn select(&self) -> &crossbeam_channel::Receiver<T> { + &self.receiver + } +} + +impl<T> Iterator for Receiver<T> { + type Item = T; + + fn next(&mut self) -> Option<Self::Item> { + self.receiver.recv() + } +} + +impl<'a, T> IntoIterator for &'a Receiver<T> { + type Item = T; + type IntoIter = crossbeam_channel::Receiver<T>; + + fn into_iter(self) -> Self::IntoIter { + self.receiver.clone() + } +} + +pub struct Sender<T> { + sender: crossbeam_channel::Sender<T>, + is_disconnected: Arc<AtomicBool>, +} + +impl<T> Clone for Sender<T> { + fn clone(&self) -> Self { + Sender { + sender: self.sender.clone(), + is_disconnected: self.is_disconnected.clone(), + } + } +} + +impl<T> Sender<T> { + pub fn new(sender: crossbeam_channel::Sender<T>, is_disconnected: Arc<AtomicBool>) -> Sender<T> { + Sender { + sender, + is_disconnected, + } + } + + pub fn send(&self, msg: T) -> Result<(), ChannelError> { + if self.is_disconnected.load(Ordering::SeqCst) { + Err(ChannelError::ChannelClosedError) + } else { + Ok(self.sender.send(msg)) + } + } + + pub fn len(&self) -> usize { + self.sender.len() + } + + pub fn select(&self) -> Option<&crossbeam_channel::Sender<T>> { + if self.is_disconnected.load(Ordering::SeqCst) { + None + } else { + Some(&self.sender) + } + } +} |