aboutsummaryrefslogtreecommitdiffstats
path: root/components/constellation/timer_scheduler.rs
blob: 14ad2588e8520b4720e73cc1a0835d1020addbac (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at https://mozilla.org/MPL/2.0/. */

use crossbeam_channel::{self, TryRecvError};
use ipc_channel::ipc::{self, IpcSender};
use script_traits::{TimerEvent, TimerEventRequest, TimerSchedulerMsg};
use std::cmp::{self, Ord};
use std::collections::BinaryHeap;
use std::thread;
use std::time::{Duration, Instant};

pub struct TimerScheduler;

struct ScheduledEvent {
    request: TimerEventRequest,
    for_time: Instant,
}

impl Ord for ScheduledEvent {
    fn cmp(&self, other: &ScheduledEvent) -> cmp::Ordering {
        self.for_time.cmp(&other.for_time).reverse()
    }
}

impl PartialOrd for ScheduledEvent {
    fn partial_cmp(&self, other: &ScheduledEvent) -> Option<cmp::Ordering> {
        Some(self.cmp(other))
    }
}

impl Eq for ScheduledEvent {}
impl PartialEq for ScheduledEvent {
    fn eq(&self, other: &ScheduledEvent) -> bool {
        self as *const ScheduledEvent == other as *const ScheduledEvent
    }
}

impl TimerScheduler {
    pub fn start() -> IpcSender<TimerSchedulerMsg> {
        let (req_ipc_sender, req_ipc_receiver) = ipc::channel().expect("Channel creation failed.");
        let (req_sender, req_receiver) = crossbeam_channel::bounded(1);

        // We could do this much more directly with recv_timeout
        // (https://github.com/rust-lang/rfcs/issues/962).

        // util::thread doesn't give us access to the JoinHandle, which we need for park/unpark,
        // so we use the builder directly.
        let timeout_thread = thread::Builder::new()
            .name(String::from("TimerScheduler"))
            .spawn(move || {
                // We maintain a priority queue of future events, sorted by due time.
                let mut scheduled_events = BinaryHeap::<ScheduledEvent>::new();
                loop {
                    let now = Instant::now();
                    // Dispatch any events whose due time is past
                    loop {
                        match scheduled_events.peek() {
                            // Dispatch the event if its due time is past
                            Some(event) if event.for_time <= now => {
                                let TimerEventRequest(ref sender, source, id, _) = event.request;
                                let _ = sender.send(TimerEvent(source, id));
                            },
                            // Otherwise, we're done dispatching events
                            _ => break,
                        }
                        // Remove the event from the priority queue
                        // (Note this only executes when the first event has been dispatched
                        scheduled_events.pop();
                    }
                    // Look to see if there are any incoming events
                    match req_receiver.try_recv() {
                        // If there is an event, add it to the priority queue
                        Ok(TimerSchedulerMsg::Request(req)) => {
                            let TimerEventRequest(_, _, _, delay) = req;
                            let schedule = Instant::now() + Duration::from_millis(delay.get());
                            let event = ScheduledEvent {
                                request: req,
                                for_time: schedule,
                            };
                            scheduled_events.push(event);
                        },
                        // If there is no incoming event, park the thread,
                        // it will either be unparked when a new event arrives,
                        // or by a timeout.
                        Err(TryRecvError::Empty) => match scheduled_events.peek() {
                            None => thread::park(),
                            Some(event) => thread::park_timeout(event.for_time - now),
                        },
                        // If the channel is closed or we are shutting down, we are done.
                        Ok(TimerSchedulerMsg::Exit) | Err(TryRecvError::Disconnected) => break,
                    }
                }
                // This thread can terminate if the req_ipc_sender is dropped.
                warn!("TimerScheduler thread terminated.");
            })
            .expect("Thread creation failed.")
            .thread()
            .clone();

        // A proxy that just routes incoming IPC requests over the MPSC channel to the timeout thread,
        // and unparks the timeout thread each time. Note that if unpark is called while the timeout
        // thread isn't parked, this causes the next call to thread::park by the timeout thread
        // not to block. This means that the timeout thread won't park when there is a request
        // waiting in the MPSC channel buffer.
        thread::Builder::new()
            .name(String::from("TimerProxy"))
            .spawn(move || {
                while let Ok(req) = req_ipc_receiver.recv() {
                    let mut shutting_down = false;
                    match req {
                        TimerSchedulerMsg::Exit => shutting_down = true,
                        _ => {},
                    }
                    let _ = req_sender.send(req);
                    timeout_thread.unpark();
                    if shutting_down {
                        break;
                    }
                }
                // This thread can terminate if the req_ipc_sender is dropped.
                warn!("TimerProxy thread terminated.");
            })
            .expect("Thread creation failed.");

        // Return the IPC sender
        req_ipc_sender
    }
}