1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
|
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
use ipc_channel::ipc::{self, IpcSender};
use script_traits::{TimerEvent, TimerEventRequest, TimerSchedulerMsg};
use std::cmp::{self, Ord};
use std::collections::BinaryHeap;
use std::sync::mpsc;
use std::sync::mpsc::TryRecvError::{Disconnected, Empty};
use std::thread;
use std::time::{Duration, Instant};
pub struct TimerScheduler;
struct ScheduledEvent {
request: TimerEventRequest,
for_time: Instant,
}
impl Ord for ScheduledEvent {
fn cmp(&self, other: &ScheduledEvent) -> cmp::Ordering {
self.for_time.cmp(&other.for_time).reverse()
}
}
impl PartialOrd for ScheduledEvent {
fn partial_cmp(&self, other: &ScheduledEvent) -> Option<cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Eq for ScheduledEvent {}
impl PartialEq for ScheduledEvent {
fn eq(&self, other: &ScheduledEvent) -> bool {
self as *const ScheduledEvent == other as *const ScheduledEvent
}
}
impl TimerScheduler {
pub fn start() -> IpcSender<TimerSchedulerMsg> {
let (req_ipc_sender, req_ipc_receiver) = ipc::channel().expect("Channel creation failed.");
let (req_sender, req_receiver) = mpsc::sync_channel(1);
// We could do this much more directly with recv_timeout
// (https://github.com/rust-lang/rfcs/issues/962).
// util::thread doesn't give us access to the JoinHandle, which we need for park/unpark,
// so we use the builder directly.
let timeout_thread = thread::Builder::new()
.name(String::from("TimerScheduler"))
.spawn(move || {
// We maintain a priority queue of future events, sorted by due time.
let mut scheduled_events = BinaryHeap::<ScheduledEvent>::new();
loop {
let now = Instant::now();
// Dispatch any events whose due time is past
loop {
match scheduled_events.peek() {
// Dispatch the event if its due time is past
Some(event) if event.for_time <= now => {
let TimerEventRequest(ref sender, source, id, _) = event.request;
let _ = sender.send(TimerEvent(source, id));
},
// Otherwise, we're done dispatching events
_ => break,
}
// Remove the event from the priority queue
// (Note this only executes when the first event has been dispatched
scheduled_events.pop();
}
// Look to see if there are any incoming events
match req_receiver.try_recv() {
// If there is an event, add it to the priority queue
Ok(TimerSchedulerMsg::Request(req)) => {
let TimerEventRequest(_, _, _, delay) = req;
let schedule = Instant::now() + Duration::from_millis(delay.get());
let event = ScheduledEvent {
request: req,
for_time: schedule,
};
scheduled_events.push(event);
},
// If there is no incoming event, park the thread,
// it will either be unparked when a new event arrives,
// or by a timeout.
Err(Empty) => match scheduled_events.peek() {
None => thread::park(),
Some(event) => thread::park_timeout(event.for_time - now),
},
// If the channel is closed or we are shutting down, we are done.
Ok(TimerSchedulerMsg::Exit) | Err(Disconnected) => break,
}
}
// This thread can terminate if the req_ipc_sender is dropped.
warn!("TimerScheduler thread terminated.");
})
.expect("Thread creation failed.")
.thread()
.clone();
// A proxy that just routes incoming IPC requests over the MPSC channel to the timeout thread,
// and unparks the timeout thread each time. Note that if unpark is called while the timeout
// thread isn't parked, this causes the next call to thread::park by the timeout thread
// not to block. This means that the timeout thread won't park when there is a request
// waiting in the MPSC channel buffer.
thread::Builder::new()
.name(String::from("TimerProxy"))
.spawn(move || {
while let Ok(req) = req_ipc_receiver.recv() {
let mut shutting_down = false;
match req {
TimerSchedulerMsg::Exit => shutting_down = true,
_ => {},
}
let _ = req_sender.send(req);
timeout_thread.unpark();
if shutting_down {
break;
}
}
// This thread can terminate if the req_ipc_sender is dropped.
warn!("TimerProxy thread terminated.");
})
.expect("Thread creation failed.");
// Return the IPC sender
req_ipc_sender
}
}
|