diff options
author | bors-servo <servo-ops@mozilla.com> | 2021-06-20 19:47:54 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-06-20 19:47:54 -0400 |
commit | eab515f22467c3b73ad7e8fb1065072fbb764fa7 (patch) | |
tree | 99537e08c963017bed83b35ebc83aa0001d32a0c | |
parent | ce4f965587c0701c4cc0e8bdd4966980aaaf1098 (diff) | |
parent | 602c02edd29029ab16e988e4bb5e2cddc1c1da5b (diff) | |
download | servo-eab515f22467c3b73ad7e8fb1065072fbb764fa7.tar.gz servo-eab515f22467c3b73ad7e8fb1065072fbb764fa7.zip |
Auto merge of #28522 - yvt:fix-bhm-hangup, r=jdm
Fix hang-ups in `background_hang_monitor` tests
---
- [x] `./mach build -d` does not report any errors
- [ ] `./mach test-tidy` does not report any errors
- [x] These changes fix #28270, #27191 and random failures in the CI pipeline
---
- [x] There are tests for these changes OR
- [ ] These changes do not require tests because ___
-rw-r--r-- | components/background_hang_monitor/background_hang_monitor.rs | 175 | ||||
-rw-r--r-- | components/background_hang_monitor/tests/hang_monitor_tests.rs | 93 |
2 files changed, 221 insertions, 47 deletions
diff --git a/components/background_hang_monitor/background_hang_monitor.rs b/components/background_hang_monitor/background_hang_monitor.rs index b97ec3bceec..ab1e4e5940c 100644 --- a/components/background_hang_monitor/background_hang_monitor.rs +++ b/components/background_hang_monitor/background_hang_monitor.rs @@ -16,12 +16,14 @@ use msg::constellation_msg::{ }; use std::cell::Cell; use std::collections::{HashMap, VecDeque}; +use std::sync::{Arc, Weak}; use std::thread; use std::time::{Duration, Instant}; #[derive(Clone)] pub struct HangMonitorRegister { - sender: Sender<(MonitoredComponentId, MonitoredComponentMsg)>, + sender: Weak<Sender<(MonitoredComponentId, MonitoredComponentMsg)>>, + tether: Sender<Never>, monitoring_enabled: bool, } @@ -32,13 +34,27 @@ impl HangMonitorRegister { control_port: IpcReceiver<BackgroundHangMonitorControlMsg>, monitoring_enabled: bool, ) -> Box<dyn BackgroundHangMonitorRegister> { + // Create a channel to pass messages of type `MonitoredComponentMsg`. + // See the discussion in `<HangMonitorRegister as + // BackgroundHangMonitorRegister>::register_component` for why we wrap + // the sender with `Arc` and why `HangMonitorRegister` only maintains + // a weak reference to it. let (sender, port) = unbounded(); + let sender = Arc::new(sender); + let sender_weak = Arc::downgrade(&sender); + + // Create a "tether" channel, whose sole purpose is to keep the worker + // thread alive. The worker thread will terminates when all copies of + // `tether` are dropped. + let (tether, tether_port) = unbounded(); + let _ = thread::Builder::new() .spawn(move || { let mut monitor = BackgroundHangMonitorWorker::new( constellation_chan, control_port, - port, + (sender, port), + tether_port, monitoring_enabled, ); while monitor.run() { @@ -47,7 +63,8 @@ impl HangMonitorRegister { }) .expect("Couldn't start BHM worker."); Box::new(HangMonitorRegister { - sender, + sender: sender_weak, + tether, monitoring_enabled, }) } @@ -66,6 +83,7 @@ impl BackgroundHangMonitorRegister for HangMonitorRegister { ) -> Box<dyn BackgroundHangMonitor> { let bhm_chan = BackgroundHangMonitorChan::new( self.sender.clone(), + self.tether.clone(), component_id, self.monitoring_enabled, ); @@ -85,12 +103,54 @@ impl BackgroundHangMonitorRegister for HangMonitorRegister { #[cfg(any(target_os = "android", target_arch = "arm", target_arch = "aarch64"))] let sampler = crate::sampler::DummySampler::new(); + // When a component is registered, and there's an exit request that + // reached BHM, we want an exit signal to be delivered to the + // component's exit signal handler eventually. However, there's a race + // condition between the reception of `BackgroundHangMonitorControlMsg:: + // Exit` and `MonitoredComponentMsg::Register` that needs to handled + // carefully. When the worker receives an `Exit` message, it stops + // processing messages, and any further `Register` messages sent to the + // worker thread are ignored. If the submissions of `Exit` and + // `Register` messages are far apart enough, the channel is closed by + // the time the client attempts to send a `Register` message, and + // therefore the client can figure out by `Sender::send`'s return value + // that it must deliver an exit signal. However, if these message + // submissions are close enough, the `Register` message is still sent, + // but the worker thread might exit before it sees the message, leaving + // the message unprocessed and the exit signal unsent. + // + // To fix this, we wrap the exit signal handler in an RAII wrapper of + // type `SignalToExitOnDrop` to automatically send a signal when it's + // dropped. This way, we can make sure the exit signal is sent even if + // the message couldn't reach the worker thread and be processed. + // + // However, as it turns out, `crossbeam-channel`'s channels don't drop + // remaining messages until all associated senders *and* receivers are + // dropped. This means the exit signal won't be delivered as long as + // there's at least one `HangMonitorRegister` or + // `BackgroundHangMonitorChan` maintaining a copy of the sender. To work + // around this and guarantee a rapid delivery of the exit signal, the + // sender is wrapped in `Arc`, and only the worker thread maintains a + // strong reference, thus ensuring both the sender and receiver are + // dropped as soon as the worker thread exits. + let exit_signal = SignalToExitOnDrop(exit_signal); + + // If the tether is dropped after this call, the worker thread might + // exit before processing the `Register` message because there's no + // implicit ordering guarantee between two channels. If this happens, + // an exit signal will be sent despite we haven't received a + // corresponding exit request. To enforce the correct ordering and + // prevent a false exit signal from being sent, we include a copy of + // `self.tether` in the `Register` message. + let tether = self.tether.clone(); + bhm_chan.send(MonitoredComponentMsg::Register( sampler, thread::current().name().map(str::to_owned), transient_hang_timeout, permanent_hang_timeout, exit_signal, + tether, )); Box::new(bhm_chan) } @@ -103,14 +163,15 @@ impl BackgroundHangMonitorClone for HangMonitorRegister { } /// Messages sent from monitored components to the monitor. -pub enum MonitoredComponentMsg { +enum MonitoredComponentMsg { /// Register component for monitoring, Register( Box<dyn Sampler>, Option<String>, Duration, Duration, - Option<Box<dyn BackgroundHangMonitorExitSignal>>, + SignalToExitOnDrop, + Sender<Never>, ), /// Unregister component for monitoring. Unregister, @@ -120,35 +181,51 @@ pub enum MonitoredComponentMsg { NotifyWait, } +/// Stable equivalent to the `!` type +enum Never {} + /// A wrapper around a sender to the monitor, /// which will send the Id of the monitored component along with each message, /// and keep track of whether the monitor is still listening on the other end. -pub struct BackgroundHangMonitorChan { - sender: Sender<(MonitoredComponentId, MonitoredComponentMsg)>, +struct BackgroundHangMonitorChan { + sender: Weak<Sender<(MonitoredComponentId, MonitoredComponentMsg)>>, + _tether: Sender<Never>, component_id: MonitoredComponentId, disconnected: Cell<bool>, monitoring_enabled: bool, } impl BackgroundHangMonitorChan { - pub fn new( - sender: Sender<(MonitoredComponentId, MonitoredComponentMsg)>, + fn new( + sender: Weak<Sender<(MonitoredComponentId, MonitoredComponentMsg)>>, + tether: Sender<Never>, component_id: MonitoredComponentId, monitoring_enabled: bool, ) -> Self { BackgroundHangMonitorChan { sender, + _tether: tether, component_id: component_id, disconnected: Default::default(), monitoring_enabled, } } - pub fn send(&self, msg: MonitoredComponentMsg) { + fn send(&self, msg: MonitoredComponentMsg) { if self.disconnected.get() { return; } - if let Err(_) = self.sender.send((self.component_id.clone(), msg)) { + + // The worker thread owns both the receiver *and* the only strong + // reference to the sender. An `upgrade` failure means the latter is + // gone, and a `send` failure means the former is gone. They are dropped + // simultaneously, but we might observe an intermediate state. + if self + .sender + .upgrade() + .and_then(|sender| sender.send((self.component_id.clone(), msg)).ok()) + .is_none() + { warn!("BackgroundHangMonitor has gone away"); self.disconnected.set(true); } @@ -174,6 +251,33 @@ impl BackgroundHangMonitor for BackgroundHangMonitorChan { } } +/// Wraps [`BackgroundHangMonitorExitSignal`] and calls `signal_to_exit` when +/// dropped. +struct SignalToExitOnDrop(Option<Box<dyn BackgroundHangMonitorExitSignal>>); + +impl SignalToExitOnDrop { + /// Call `BackgroundHangMonitorExitSignal::signal_to_exit` now. + fn signal_to_exit(&mut self) { + if let Some(signal) = self.0.take() { + signal.signal_to_exit(); + } + } + + /// Disassociate `BackgroundHangMonitorExitSignal` from itself, preventing + /// `BackgroundHangMonitorExitSignal::signal_to_exit` from being called in + /// the future. + fn release(&mut self) { + self.0 = None; + } +} + +impl Drop for SignalToExitOnDrop { + #[inline] + fn drop(&mut self) { + self.signal_to_exit(); + } +} + struct MonitoredComponent { sampler: Box<dyn Sampler>, last_activity: Instant, @@ -183,16 +287,18 @@ struct MonitoredComponent { sent_transient_alert: bool, sent_permanent_alert: bool, is_waiting: bool, - exit_signal: Option<Box<dyn BackgroundHangMonitorExitSignal>>, + exit_signal: SignalToExitOnDrop, } struct Sample(MonitoredComponentId, Instant, NativeStack); -pub struct BackgroundHangMonitorWorker { +struct BackgroundHangMonitorWorker { component_names: HashMap<MonitoredComponentId, String>, monitored_components: HashMap<MonitoredComponentId, MonitoredComponent>, constellation_chan: IpcSender<HangMonitorAlert>, port: Receiver<(MonitoredComponentId, MonitoredComponentMsg)>, + _port_sender: Arc<Sender<(MonitoredComponentId, MonitoredComponentMsg)>>, + tether_port: Receiver<Never>, control_port: Receiver<BackgroundHangMonitorControlMsg>, sampling_duration: Option<Duration>, sampling_max_duration: Option<Duration>, @@ -204,10 +310,14 @@ pub struct BackgroundHangMonitorWorker { } impl BackgroundHangMonitorWorker { - pub fn new( + fn new( constellation_chan: IpcSender<HangMonitorAlert>, control_port: IpcReceiver<BackgroundHangMonitorControlMsg>, - port: Receiver<(MonitoredComponentId, MonitoredComponentMsg)>, + (port_sender, port): ( + Arc<Sender<(MonitoredComponentId, MonitoredComponentMsg)>>, + Receiver<(MonitoredComponentId, MonitoredComponentMsg)>, + ), + tether_port: Receiver<Never>, monitoring_enabled: bool, ) -> Self { let control_port = ROUTER.route_ipc_receiver_to_new_crossbeam_receiver(control_port); @@ -216,6 +326,8 @@ impl BackgroundHangMonitorWorker { monitored_components: Default::default(), constellation_chan, port, + _port_sender: port_sender, + tether_port, control_port, sampling_duration: None, sampling_max_duration: None, @@ -268,7 +380,7 @@ impl BackgroundHangMonitorWorker { .send(HangMonitorAlert::Profile(bytes)); } - pub fn run(&mut self) -> bool { + fn run(&mut self) -> bool { let tick = if let Some(duration) = self.sampling_duration { let duration = duration .checked_sub(Instant::now() - self.last_sample) @@ -284,11 +396,24 @@ impl BackgroundHangMonitorWorker { let received = select! { recv(self.port) -> event => { + // Since we own the `Arc<Sender<_>>`, the channel never + // gets disconnected. + Some(event.unwrap()) + }, + recv(self.tether_port) -> event => { + // This arm can only reached by a tether disconnection match event { - Ok(msg) => Some(msg), - // Our sender has been dropped, quit. - Err(_) => return false, + Ok(x) => match x {} + Err(_) => {} + } + + // All associated `HangMonitorRegister` and + // `BackgroundHangMonitorChan` have been dropped. Suppress + // `signal_to_exit` and exit the BHM. + for component in self.monitored_components.values_mut() { + component.exit_signal.release(); } + return false; }, recv(self.control_port) -> event => { match event { @@ -306,10 +431,8 @@ impl BackgroundHangMonitorWorker { return true; }, Ok(BackgroundHangMonitorControlMsg::Exit(sender)) => { - for component in self.monitored_components.values() { - if let Some(signal) = component.exit_signal.as_ref() { - signal.signal_to_exit(); - } + for component in self.monitored_components.values_mut() { + component.exit_signal.signal_to_exit(); } // Confirm exit with to the constellation. @@ -355,6 +478,7 @@ impl BackgroundHangMonitorWorker { transient_hang_timeout, permanent_hang_timeout, exit_signal, + _tether, ), ) => { let component = MonitoredComponent { @@ -379,10 +503,13 @@ impl BackgroundHangMonitorWorker { ); }, (component_id, MonitoredComponentMsg::Unregister) => { - let _ = self + let (_, mut component) = self .monitored_components .remove_entry(&component_id) .expect("Received Unregister for an unknown component"); + + // Prevent `signal_to_exit` from being called + component.exit_signal.release(); }, (component_id, MonitoredComponentMsg::NotifyActivity(annotation)) => { let component = self diff --git a/components/background_hang_monitor/tests/hang_monitor_tests.rs b/components/background_hang_monitor/tests/hang_monitor_tests.rs index 23b32dd1bbb..b92344039bd 100644 --- a/components/background_hang_monitor/tests/hang_monitor_tests.rs +++ b/components/background_hang_monitor/tests/hang_monitor_tests.rs @@ -164,10 +164,45 @@ fn test_hang_monitoring_unregister() { assert!(background_hang_monitor_receiver.try_recv().is_err()); } +// Perform two certain steps in `test_hang_monitoring_exit_signal_inner` in +// different orders to check for the race condition that +// caused <https://github.com/servo/servo/issues/28270> and +// <https://github.com/servo/servo/issues/27191>. #[test] -// https://github.com/servo/servo/issues/28270 -#[cfg(not(any(target_os = "windows", target_os = "macos")))] -fn test_hang_monitoring_exit_signal() { +fn test_hang_monitoring_exit_signal1() { + test_hang_monitoring_exit_signal_inner(|e1, e2| { + e1(); + thread::sleep(Duration::from_millis(100)); + e2(); + }); +} + +#[test] +fn test_hang_monitoring_exit_signal2() { + test_hang_monitoring_exit_signal_inner(|e1, e2| { + e1(); + e2(); + }); +} + +#[test] +fn test_hang_monitoring_exit_signal3() { + test_hang_monitoring_exit_signal_inner(|e1, e2| { + e2(); + e1(); + }); +} + +#[test] +fn test_hang_monitoring_exit_signal4() { + test_hang_monitoring_exit_signal_inner(|e1, e2| { + e2(); + thread::sleep(Duration::from_millis(100)); + e1(); + }); +} + +fn test_hang_monitoring_exit_signal_inner(op_order: fn(&mut dyn FnMut(), &mut dyn FnMut())) { let _lock = SERIAL.lock().unwrap(); let (background_hang_monitor_ipc_sender, _background_hang_monitor_receiver) = @@ -185,9 +220,9 @@ fn test_hang_monitoring_exit_signal() { } let closing = Arc::new(AtomicBool::new(false)); - let signal = BHMExitSignal { + let mut signal = Some(Box::new(BHMExitSignal { closing: closing.clone(), - }; + })); // Init a worker, without active monitoring. let background_hang_monitor_register = HangMonitorRegister::init( @@ -195,26 +230,38 @@ fn test_hang_monitoring_exit_signal() { control_receiver, false, ); - let _background_hang_monitor = background_hang_monitor_register.register_component( - MonitoredComponentId(TEST_PIPELINE_ID, MonitoredComponentType::Script), - Duration::from_millis(10), - Duration::from_millis(1000), - Some(Box::new(signal)), - ); + let mut background_hang_monitor = None; let (exit_sender, exit_receiver) = ipc::channel().expect("Failed to create IPC channel!"); + let mut exit_sender = Some(exit_sender); + + // `op_order` determines the order in which these two closures are + // executed. + op_order( + &mut || { + // Register a component. + background_hang_monitor = Some(background_hang_monitor_register.register_component( + MonitoredComponentId(TEST_PIPELINE_ID, MonitoredComponentType::Script), + Duration::from_millis(10), + Duration::from_millis(1000), + Some(signal.take().unwrap()), + )); + }, + &mut || { + // Send the exit message. + control_sender + .send(BackgroundHangMonitorControlMsg::Exit( + exit_sender.take().unwrap(), + )) + .unwrap(); + }, + ); - // Send the exit message. - if control_sender - .send(BackgroundHangMonitorControlMsg::Exit(exit_sender)) - .is_ok() - { - // Assert we receive a confirmation back. - assert!(exit_receiver.recv().is_ok()); - - // Assert we get the exit signal. - while !closing.load(Ordering::SeqCst) { - thread::sleep(Duration::from_millis(10)); - } + // Assert we receive a confirmation back. + assert!(exit_receiver.recv().is_ok()); + + // Assert we get the exit signal. + while !closing.load(Ordering::SeqCst) { + thread::sleep(Duration::from_millis(10)); } } |