diff options
-rw-r--r-- | components/constellation/timer_scheduler.rs | 264 |
1 files changed, 81 insertions, 183 deletions
diff --git a/components/constellation/timer_scheduler.rs b/components/constellation/timer_scheduler.rs index d83c6170d23..1578da7a951 100644 --- a/components/constellation/timer_scheduler.rs +++ b/components/constellation/timer_scheduler.rs @@ -2,86 +2,20 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ -use euclid::length::Length; use ipc_channel::ipc::{self, IpcSender}; -use ipc_channel::router::ROUTER; -use script_traits::{MsDuration, NsDuration, precise_time_ms, precise_time_ns}; use script_traits::{TimerEvent, TimerEventRequest}; -use std::cell::RefCell; use std::cmp::{self, Ord}; use std::collections::BinaryHeap; -use std::sync::Arc; -use std::sync::atomic::{self, AtomicBool}; -use std::sync::mpsc::{channel, Receiver, Select}; -use std::thread::{self, spawn, Thread}; -use std::time::Duration; -use util::thread::spawn_named; +use std::sync::mpsc; +use std::sync::mpsc::TryRecvError::{Disconnected, Empty}; +use std::thread; +use std::time::{Duration, Instant}; -/// A quick hack to work around the removal of [`std::old_io::timer::Timer`]( -/// http://doc.rust-lang.org/1.0.0-beta/std/old_io/timer/struct.Timer.html ) -struct CancelableOneshotTimer { - thread: Thread, - canceled: Arc<AtomicBool>, - port: Receiver<()>, -} - -impl CancelableOneshotTimer { - fn new(duration: MsDuration) -> CancelableOneshotTimer { - let (tx, rx) = channel(); - let canceled = Arc::new(AtomicBool::new(false)); - let canceled_clone = canceled.clone(); - - let thread = spawn(move || { - let due_time = precise_time_ms() + duration; - - let mut park_time = duration; - - loop { - thread::park_timeout(Duration::from_millis(park_time.get())); - - if canceled_clone.load(atomic::Ordering::Relaxed) { - return; - } - - // park_timeout_ms does not guarantee parking for the - // given amout. We might have woken up early. - let current_time = precise_time_ms(); - if current_time >= due_time { - let _ = tx.send(()); - return; - } - park_time = due_time - current_time; - } - }).thread().clone(); - - CancelableOneshotTimer { - thread: thread, - canceled: canceled, - port: rx, - } - } - - fn port(&self) -> &Receiver<()> { - &self.port - } - - fn cancel(&self) { - self.canceled.store(true, atomic::Ordering::Relaxed); - self.thread.unpark(); - } -} - -pub struct TimerScheduler { - port: Receiver<TimerEventRequest>, - - scheduled_events: RefCell<BinaryHeap<ScheduledEvent>>, - - timer: RefCell<Option<CancelableOneshotTimer>>, -} +pub struct TimerScheduler; struct ScheduledEvent { request: TimerEventRequest, - for_time: NsDuration, + for_time: Instant, } impl Ord for ScheduledEvent { @@ -103,119 +37,83 @@ impl PartialEq for ScheduledEvent { } } -enum Task { - HandleRequest(TimerEventRequest), - DispatchDueEvents, -} - impl TimerScheduler { pub fn start() -> IpcSender<TimerEventRequest> { - let (chan, port) = ipc::channel().unwrap(); - - let timer_scheduler = TimerScheduler { - port: ROUTER.route_ipc_receiver_to_new_mpsc_receiver(port), - - scheduled_events: RefCell::new(BinaryHeap::new()), - - timer: RefCell::new(None), - }; - - spawn_named("TimerScheduler".to_owned(), move || { - timer_scheduler.run_event_loop(); - }); - - chan - } - - fn run_event_loop(&self) { - while let Some(thread) = self.receive_next_task() { - match thread { - Task::HandleRequest(request) => self.handle_request(request), - Task::DispatchDueEvents => self.dispatch_due_events(), - } - } - } - - #[allow(unsafe_code)] - fn receive_next_task(&self) -> Option<Task> { - let port = &self.port; - let timer = self.timer.borrow(); - let timer_port = timer.as_ref().map(|timer| timer.port()); - - if let Some(ref timer_port) = timer_port { - let sel = Select::new(); - let mut scheduler_handle = sel.handle(port); - let mut timer_handle = sel.handle(timer_port); - - unsafe { - scheduler_handle.add(); - timer_handle.add(); - } - - let ret = sel.wait(); - if ret == scheduler_handle.id() { - port.recv().ok().map(Task::HandleRequest) - } else if ret == timer_handle.id() { - timer_port.recv().ok().map(|_| Task::DispatchDueEvents) - } else { - panic!("unexpected select result!") - } - } else { - port.recv().ok().map(Task::HandleRequest) - } - } - - fn handle_request(&self, request: TimerEventRequest) { - let TimerEventRequest(_, _, _, duration_ms) = request; - let duration_ns = Length::new(duration_ms.get() * 1000 * 1000); - let schedule_for = precise_time_ns() + duration_ns; - - let previously_earliest = self.scheduled_events.borrow().peek() - .map_or(Length::new(u64::max_value()), |scheduled| scheduled.for_time); - - self.scheduled_events.borrow_mut().push(ScheduledEvent { - request: request, - for_time: schedule_for, - }); - - if schedule_for < previously_earliest { - self.start_timer_for_next_event(); - } - } - - fn dispatch_due_events(&self) { - let now = precise_time_ns(); - - { - let mut events = self.scheduled_events.borrow_mut(); - - while !events.is_empty() && events.peek().as_ref().unwrap().for_time <= now { - let event = events.pop().unwrap(); - let TimerEventRequest(chan, source, id, _) = event.request; - - let _ = chan.send(TimerEvent(source, id)); - } - } - - self.start_timer_for_next_event(); - } - - fn start_timer_for_next_event(&self) { - let events = self.scheduled_events.borrow(); - let next_event = events.peek(); - - let mut timer = self.timer.borrow_mut(); - - if let Some(ref mut timer) = *timer { - timer.cancel(); - } - - *timer = next_event.map(|next_event| { - let delay_ns = next_event.for_time.get().saturating_sub(precise_time_ns().get()); - // Round up, we'd rather be late than early… - let delay_ms = Length::new(delay_ns.saturating_add(999999) / (1000 * 1000)); + let (req_ipc_sender, req_ipc_receiver) = ipc::channel().unwrap(); + let (req_sender, req_receiver) = mpsc::sync_channel(1); + + // We could do this much more directly with recv_timeout + // (https://github.com/rust-lang/rfcs/issues/962). + + // util::thread doesn't give us access to the JoinHandle, which we need for park/unpark, + // so we use the builder directly. + let timeout_thread = thread::Builder::new() + .name(String::from("TimerScheduler")) + .spawn(move || { + // We maintain a priority queue of future events, sorted by due time. + let mut scheduled_events = BinaryHeap::<ScheduledEvent>::new(); + loop { + let now = Instant::now(); + // Dispatch any events whose due time is past + loop { + match scheduled_events.peek() { + // Dispatch the event if its due time is past + Some(event) if event.for_time <= now => { + let TimerEventRequest(ref sender, source, id, _) = event.request; + let _ = sender.send(TimerEvent(source, id)); + }, + // Otherwise, we're done dispatching events + _ => break, + } + // Remove the event from the priority queue + // (Note this only executes when the first event has been dispatched + scheduled_events.pop(); + } + // Look to see if there are any incoming events + match req_receiver.try_recv() { + // If there is an event, add it to the priority queue + Ok(req) => { + let TimerEventRequest(_, _, _, delay) = req; + let schedule = Instant::now() + Duration::from_millis(delay.get()); + let event = ScheduledEvent { request: req, for_time: schedule }; + scheduled_events.push(event); + }, + // If there is no incoming event, park the thread, + // it will either be unparked when a new event arrives, + // or by a timeout. + Err(Empty) => match scheduled_events.peek() { + None => thread::park(), + Some(event) => thread::park_timeout(event.for_time - now), + }, + // If the channel is closed, we are done. + Err(Disconnected) => break, + } + } + // This thread can terminate if the req_ipc_sender is dropped. + warn!("TimerScheduler thread terminated."); + }) + .unwrap() + .thread() + .clone(); + + // A proxy that just routes incoming IPC requests over the MPSC channel to the timeout thread, + // and unparks the timeout thread each time. Note that if unpark is called while the timeout + // thread isn't parked, this causes the next call to thread::park by the timeout thread + // not to block. This means that the timeout thread won't park when there is a request + // waiting in the MPSC channel buffer. + thread::Builder::new() + .name(String::from("TimerProxy")) + .spawn(move || { + while let Ok(req) = req_ipc_receiver.recv() { + req_sender.send(req).unwrap(); + timeout_thread.unpark(); + } + // This thread can terminate if the req_ipc_sender is dropped. + warn!("TimerProxy thread terminated."); + }) + .unwrap(); - CancelableOneshotTimer::new(delay_ms) - }); + // Return the IPC sender + req_ipc_sender } } |