aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbors-servo <servo-ops@mozilla.com>2021-06-20 19:47:54 -0400
committerGitHub <noreply@github.com>2021-06-20 19:47:54 -0400
commiteab515f22467c3b73ad7e8fb1065072fbb764fa7 (patch)
tree99537e08c963017bed83b35ebc83aa0001d32a0c
parentce4f965587c0701c4cc0e8bdd4966980aaaf1098 (diff)
parent602c02edd29029ab16e988e4bb5e2cddc1c1da5b (diff)
downloadservo-eab515f22467c3b73ad7e8fb1065072fbb764fa7.tar.gz
servo-eab515f22467c3b73ad7e8fb1065072fbb764fa7.zip
Auto merge of #28522 - yvt:fix-bhm-hangup, r=jdm
Fix hang-ups in `background_hang_monitor` tests --- - [x] `./mach build -d` does not report any errors - [ ] `./mach test-tidy` does not report any errors - [x] These changes fix #28270, #27191 and random failures in the CI pipeline --- - [x] There are tests for these changes OR - [ ] These changes do not require tests because ___
-rw-r--r--components/background_hang_monitor/background_hang_monitor.rs175
-rw-r--r--components/background_hang_monitor/tests/hang_monitor_tests.rs93
2 files changed, 221 insertions, 47 deletions
diff --git a/components/background_hang_monitor/background_hang_monitor.rs b/components/background_hang_monitor/background_hang_monitor.rs
index b97ec3bceec..ab1e4e5940c 100644
--- a/components/background_hang_monitor/background_hang_monitor.rs
+++ b/components/background_hang_monitor/background_hang_monitor.rs
@@ -16,12 +16,14 @@ use msg::constellation_msg::{
};
use std::cell::Cell;
use std::collections::{HashMap, VecDeque};
+use std::sync::{Arc, Weak};
use std::thread;
use std::time::{Duration, Instant};
#[derive(Clone)]
pub struct HangMonitorRegister {
- sender: Sender<(MonitoredComponentId, MonitoredComponentMsg)>,
+ sender: Weak<Sender<(MonitoredComponentId, MonitoredComponentMsg)>>,
+ tether: Sender<Never>,
monitoring_enabled: bool,
}
@@ -32,13 +34,27 @@ impl HangMonitorRegister {
control_port: IpcReceiver<BackgroundHangMonitorControlMsg>,
monitoring_enabled: bool,
) -> Box<dyn BackgroundHangMonitorRegister> {
+ // Create a channel to pass messages of type `MonitoredComponentMsg`.
+ // See the discussion in `<HangMonitorRegister as
+ // BackgroundHangMonitorRegister>::register_component` for why we wrap
+ // the sender with `Arc` and why `HangMonitorRegister` only maintains
+ // a weak reference to it.
let (sender, port) = unbounded();
+ let sender = Arc::new(sender);
+ let sender_weak = Arc::downgrade(&sender);
+
+ // Create a "tether" channel, whose sole purpose is to keep the worker
+ // thread alive. The worker thread will terminates when all copies of
+ // `tether` are dropped.
+ let (tether, tether_port) = unbounded();
+
let _ = thread::Builder::new()
.spawn(move || {
let mut monitor = BackgroundHangMonitorWorker::new(
constellation_chan,
control_port,
- port,
+ (sender, port),
+ tether_port,
monitoring_enabled,
);
while monitor.run() {
@@ -47,7 +63,8 @@ impl HangMonitorRegister {
})
.expect("Couldn't start BHM worker.");
Box::new(HangMonitorRegister {
- sender,
+ sender: sender_weak,
+ tether,
monitoring_enabled,
})
}
@@ -66,6 +83,7 @@ impl BackgroundHangMonitorRegister for HangMonitorRegister {
) -> Box<dyn BackgroundHangMonitor> {
let bhm_chan = BackgroundHangMonitorChan::new(
self.sender.clone(),
+ self.tether.clone(),
component_id,
self.monitoring_enabled,
);
@@ -85,12 +103,54 @@ impl BackgroundHangMonitorRegister for HangMonitorRegister {
#[cfg(any(target_os = "android", target_arch = "arm", target_arch = "aarch64"))]
let sampler = crate::sampler::DummySampler::new();
+ // When a component is registered, and there's an exit request that
+ // reached BHM, we want an exit signal to be delivered to the
+ // component's exit signal handler eventually. However, there's a race
+ // condition between the reception of `BackgroundHangMonitorControlMsg::
+ // Exit` and `MonitoredComponentMsg::Register` that needs to handled
+ // carefully. When the worker receives an `Exit` message, it stops
+ // processing messages, and any further `Register` messages sent to the
+ // worker thread are ignored. If the submissions of `Exit` and
+ // `Register` messages are far apart enough, the channel is closed by
+ // the time the client attempts to send a `Register` message, and
+ // therefore the client can figure out by `Sender::send`'s return value
+ // that it must deliver an exit signal. However, if these message
+ // submissions are close enough, the `Register` message is still sent,
+ // but the worker thread might exit before it sees the message, leaving
+ // the message unprocessed and the exit signal unsent.
+ //
+ // To fix this, we wrap the exit signal handler in an RAII wrapper of
+ // type `SignalToExitOnDrop` to automatically send a signal when it's
+ // dropped. This way, we can make sure the exit signal is sent even if
+ // the message couldn't reach the worker thread and be processed.
+ //
+ // However, as it turns out, `crossbeam-channel`'s channels don't drop
+ // remaining messages until all associated senders *and* receivers are
+ // dropped. This means the exit signal won't be delivered as long as
+ // there's at least one `HangMonitorRegister` or
+ // `BackgroundHangMonitorChan` maintaining a copy of the sender. To work
+ // around this and guarantee a rapid delivery of the exit signal, the
+ // sender is wrapped in `Arc`, and only the worker thread maintains a
+ // strong reference, thus ensuring both the sender and receiver are
+ // dropped as soon as the worker thread exits.
+ let exit_signal = SignalToExitOnDrop(exit_signal);
+
+ // If the tether is dropped after this call, the worker thread might
+ // exit before processing the `Register` message because there's no
+ // implicit ordering guarantee between two channels. If this happens,
+ // an exit signal will be sent despite we haven't received a
+ // corresponding exit request. To enforce the correct ordering and
+ // prevent a false exit signal from being sent, we include a copy of
+ // `self.tether` in the `Register` message.
+ let tether = self.tether.clone();
+
bhm_chan.send(MonitoredComponentMsg::Register(
sampler,
thread::current().name().map(str::to_owned),
transient_hang_timeout,
permanent_hang_timeout,
exit_signal,
+ tether,
));
Box::new(bhm_chan)
}
@@ -103,14 +163,15 @@ impl BackgroundHangMonitorClone for HangMonitorRegister {
}
/// Messages sent from monitored components to the monitor.
-pub enum MonitoredComponentMsg {
+enum MonitoredComponentMsg {
/// Register component for monitoring,
Register(
Box<dyn Sampler>,
Option<String>,
Duration,
Duration,
- Option<Box<dyn BackgroundHangMonitorExitSignal>>,
+ SignalToExitOnDrop,
+ Sender<Never>,
),
/// Unregister component for monitoring.
Unregister,
@@ -120,35 +181,51 @@ pub enum MonitoredComponentMsg {
NotifyWait,
}
+/// Stable equivalent to the `!` type
+enum Never {}
+
/// A wrapper around a sender to the monitor,
/// which will send the Id of the monitored component along with each message,
/// and keep track of whether the monitor is still listening on the other end.
-pub struct BackgroundHangMonitorChan {
- sender: Sender<(MonitoredComponentId, MonitoredComponentMsg)>,
+struct BackgroundHangMonitorChan {
+ sender: Weak<Sender<(MonitoredComponentId, MonitoredComponentMsg)>>,
+ _tether: Sender<Never>,
component_id: MonitoredComponentId,
disconnected: Cell<bool>,
monitoring_enabled: bool,
}
impl BackgroundHangMonitorChan {
- pub fn new(
- sender: Sender<(MonitoredComponentId, MonitoredComponentMsg)>,
+ fn new(
+ sender: Weak<Sender<(MonitoredComponentId, MonitoredComponentMsg)>>,
+ tether: Sender<Never>,
component_id: MonitoredComponentId,
monitoring_enabled: bool,
) -> Self {
BackgroundHangMonitorChan {
sender,
+ _tether: tether,
component_id: component_id,
disconnected: Default::default(),
monitoring_enabled,
}
}
- pub fn send(&self, msg: MonitoredComponentMsg) {
+ fn send(&self, msg: MonitoredComponentMsg) {
if self.disconnected.get() {
return;
}
- if let Err(_) = self.sender.send((self.component_id.clone(), msg)) {
+
+ // The worker thread owns both the receiver *and* the only strong
+ // reference to the sender. An `upgrade` failure means the latter is
+ // gone, and a `send` failure means the former is gone. They are dropped
+ // simultaneously, but we might observe an intermediate state.
+ if self
+ .sender
+ .upgrade()
+ .and_then(|sender| sender.send((self.component_id.clone(), msg)).ok())
+ .is_none()
+ {
warn!("BackgroundHangMonitor has gone away");
self.disconnected.set(true);
}
@@ -174,6 +251,33 @@ impl BackgroundHangMonitor for BackgroundHangMonitorChan {
}
}
+/// Wraps [`BackgroundHangMonitorExitSignal`] and calls `signal_to_exit` when
+/// dropped.
+struct SignalToExitOnDrop(Option<Box<dyn BackgroundHangMonitorExitSignal>>);
+
+impl SignalToExitOnDrop {
+ /// Call `BackgroundHangMonitorExitSignal::signal_to_exit` now.
+ fn signal_to_exit(&mut self) {
+ if let Some(signal) = self.0.take() {
+ signal.signal_to_exit();
+ }
+ }
+
+ /// Disassociate `BackgroundHangMonitorExitSignal` from itself, preventing
+ /// `BackgroundHangMonitorExitSignal::signal_to_exit` from being called in
+ /// the future.
+ fn release(&mut self) {
+ self.0 = None;
+ }
+}
+
+impl Drop for SignalToExitOnDrop {
+ #[inline]
+ fn drop(&mut self) {
+ self.signal_to_exit();
+ }
+}
+
struct MonitoredComponent {
sampler: Box<dyn Sampler>,
last_activity: Instant,
@@ -183,16 +287,18 @@ struct MonitoredComponent {
sent_transient_alert: bool,
sent_permanent_alert: bool,
is_waiting: bool,
- exit_signal: Option<Box<dyn BackgroundHangMonitorExitSignal>>,
+ exit_signal: SignalToExitOnDrop,
}
struct Sample(MonitoredComponentId, Instant, NativeStack);
-pub struct BackgroundHangMonitorWorker {
+struct BackgroundHangMonitorWorker {
component_names: HashMap<MonitoredComponentId, String>,
monitored_components: HashMap<MonitoredComponentId, MonitoredComponent>,
constellation_chan: IpcSender<HangMonitorAlert>,
port: Receiver<(MonitoredComponentId, MonitoredComponentMsg)>,
+ _port_sender: Arc<Sender<(MonitoredComponentId, MonitoredComponentMsg)>>,
+ tether_port: Receiver<Never>,
control_port: Receiver<BackgroundHangMonitorControlMsg>,
sampling_duration: Option<Duration>,
sampling_max_duration: Option<Duration>,
@@ -204,10 +310,14 @@ pub struct BackgroundHangMonitorWorker {
}
impl BackgroundHangMonitorWorker {
- pub fn new(
+ fn new(
constellation_chan: IpcSender<HangMonitorAlert>,
control_port: IpcReceiver<BackgroundHangMonitorControlMsg>,
- port: Receiver<(MonitoredComponentId, MonitoredComponentMsg)>,
+ (port_sender, port): (
+ Arc<Sender<(MonitoredComponentId, MonitoredComponentMsg)>>,
+ Receiver<(MonitoredComponentId, MonitoredComponentMsg)>,
+ ),
+ tether_port: Receiver<Never>,
monitoring_enabled: bool,
) -> Self {
let control_port = ROUTER.route_ipc_receiver_to_new_crossbeam_receiver(control_port);
@@ -216,6 +326,8 @@ impl BackgroundHangMonitorWorker {
monitored_components: Default::default(),
constellation_chan,
port,
+ _port_sender: port_sender,
+ tether_port,
control_port,
sampling_duration: None,
sampling_max_duration: None,
@@ -268,7 +380,7 @@ impl BackgroundHangMonitorWorker {
.send(HangMonitorAlert::Profile(bytes));
}
- pub fn run(&mut self) -> bool {
+ fn run(&mut self) -> bool {
let tick = if let Some(duration) = self.sampling_duration {
let duration = duration
.checked_sub(Instant::now() - self.last_sample)
@@ -284,11 +396,24 @@ impl BackgroundHangMonitorWorker {
let received = select! {
recv(self.port) -> event => {
+ // Since we own the `Arc<Sender<_>>`, the channel never
+ // gets disconnected.
+ Some(event.unwrap())
+ },
+ recv(self.tether_port) -> event => {
+ // This arm can only reached by a tether disconnection
match event {
- Ok(msg) => Some(msg),
- // Our sender has been dropped, quit.
- Err(_) => return false,
+ Ok(x) => match x {}
+ Err(_) => {}
+ }
+
+ // All associated `HangMonitorRegister` and
+ // `BackgroundHangMonitorChan` have been dropped. Suppress
+ // `signal_to_exit` and exit the BHM.
+ for component in self.monitored_components.values_mut() {
+ component.exit_signal.release();
}
+ return false;
},
recv(self.control_port) -> event => {
match event {
@@ -306,10 +431,8 @@ impl BackgroundHangMonitorWorker {
return true;
},
Ok(BackgroundHangMonitorControlMsg::Exit(sender)) => {
- for component in self.monitored_components.values() {
- if let Some(signal) = component.exit_signal.as_ref() {
- signal.signal_to_exit();
- }
+ for component in self.monitored_components.values_mut() {
+ component.exit_signal.signal_to_exit();
}
// Confirm exit with to the constellation.
@@ -355,6 +478,7 @@ impl BackgroundHangMonitorWorker {
transient_hang_timeout,
permanent_hang_timeout,
exit_signal,
+ _tether,
),
) => {
let component = MonitoredComponent {
@@ -379,10 +503,13 @@ impl BackgroundHangMonitorWorker {
);
},
(component_id, MonitoredComponentMsg::Unregister) => {
- let _ = self
+ let (_, mut component) = self
.monitored_components
.remove_entry(&component_id)
.expect("Received Unregister for an unknown component");
+
+ // Prevent `signal_to_exit` from being called
+ component.exit_signal.release();
},
(component_id, MonitoredComponentMsg::NotifyActivity(annotation)) => {
let component = self
diff --git a/components/background_hang_monitor/tests/hang_monitor_tests.rs b/components/background_hang_monitor/tests/hang_monitor_tests.rs
index 23b32dd1bbb..b92344039bd 100644
--- a/components/background_hang_monitor/tests/hang_monitor_tests.rs
+++ b/components/background_hang_monitor/tests/hang_monitor_tests.rs
@@ -164,10 +164,45 @@ fn test_hang_monitoring_unregister() {
assert!(background_hang_monitor_receiver.try_recv().is_err());
}
+// Perform two certain steps in `test_hang_monitoring_exit_signal_inner` in
+// different orders to check for the race condition that
+// caused <https://github.com/servo/servo/issues/28270> and
+// <https://github.com/servo/servo/issues/27191>.
#[test]
-// https://github.com/servo/servo/issues/28270
-#[cfg(not(any(target_os = "windows", target_os = "macos")))]
-fn test_hang_monitoring_exit_signal() {
+fn test_hang_monitoring_exit_signal1() {
+ test_hang_monitoring_exit_signal_inner(|e1, e2| {
+ e1();
+ thread::sleep(Duration::from_millis(100));
+ e2();
+ });
+}
+
+#[test]
+fn test_hang_monitoring_exit_signal2() {
+ test_hang_monitoring_exit_signal_inner(|e1, e2| {
+ e1();
+ e2();
+ });
+}
+
+#[test]
+fn test_hang_monitoring_exit_signal3() {
+ test_hang_monitoring_exit_signal_inner(|e1, e2| {
+ e2();
+ e1();
+ });
+}
+
+#[test]
+fn test_hang_monitoring_exit_signal4() {
+ test_hang_monitoring_exit_signal_inner(|e1, e2| {
+ e2();
+ thread::sleep(Duration::from_millis(100));
+ e1();
+ });
+}
+
+fn test_hang_monitoring_exit_signal_inner(op_order: fn(&mut dyn FnMut(), &mut dyn FnMut())) {
let _lock = SERIAL.lock().unwrap();
let (background_hang_monitor_ipc_sender, _background_hang_monitor_receiver) =
@@ -185,9 +220,9 @@ fn test_hang_monitoring_exit_signal() {
}
let closing = Arc::new(AtomicBool::new(false));
- let signal = BHMExitSignal {
+ let mut signal = Some(Box::new(BHMExitSignal {
closing: closing.clone(),
- };
+ }));
// Init a worker, without active monitoring.
let background_hang_monitor_register = HangMonitorRegister::init(
@@ -195,26 +230,38 @@ fn test_hang_monitoring_exit_signal() {
control_receiver,
false,
);
- let _background_hang_monitor = background_hang_monitor_register.register_component(
- MonitoredComponentId(TEST_PIPELINE_ID, MonitoredComponentType::Script),
- Duration::from_millis(10),
- Duration::from_millis(1000),
- Some(Box::new(signal)),
- );
+ let mut background_hang_monitor = None;
let (exit_sender, exit_receiver) = ipc::channel().expect("Failed to create IPC channel!");
+ let mut exit_sender = Some(exit_sender);
+
+ // `op_order` determines the order in which these two closures are
+ // executed.
+ op_order(
+ &mut || {
+ // Register a component.
+ background_hang_monitor = Some(background_hang_monitor_register.register_component(
+ MonitoredComponentId(TEST_PIPELINE_ID, MonitoredComponentType::Script),
+ Duration::from_millis(10),
+ Duration::from_millis(1000),
+ Some(signal.take().unwrap()),
+ ));
+ },
+ &mut || {
+ // Send the exit message.
+ control_sender
+ .send(BackgroundHangMonitorControlMsg::Exit(
+ exit_sender.take().unwrap(),
+ ))
+ .unwrap();
+ },
+ );
- // Send the exit message.
- if control_sender
- .send(BackgroundHangMonitorControlMsg::Exit(exit_sender))
- .is_ok()
- {
- // Assert we receive a confirmation back.
- assert!(exit_receiver.recv().is_ok());
-
- // Assert we get the exit signal.
- while !closing.load(Ordering::SeqCst) {
- thread::sleep(Duration::from_millis(10));
- }
+ // Assert we receive a confirmation back.
+ assert!(exit_receiver.recv().is_ok());
+
+ // Assert we get the exit signal.
+ while !closing.load(Ordering::SeqCst) {
+ thread::sleep(Duration::from_millis(10));
}
}