aboutsummaryrefslogtreecommitdiffstats
path: root/components/util/ipc.rs
blob: 32bbb8c14b72022c1989ee9763262518f21b5c61 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */

use opts;

use ipc_channel::ipc::{self, IpcSender};
use ipc_channel::router::ROUTER;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::any::Any;
use std::collections::HashMap;
use std::sync::Mutex;
use std::sync::atomic::{ATOMIC_USIZE_INIT, AtomicUsize, Ordering};
use std::sync::mpsc::{self, Receiver, Sender};

lazy_static! {
    static ref IN_PROCESS_SENDERS: Mutex<HashMap<usize, Box<Any + Send>>> =
        Mutex::new(HashMap::new());
}

static NEXT_SENDER_ID: AtomicUsize = ATOMIC_USIZE_INIT;

pub enum OptionalIpcSender<T> where T: Deserialize + Serialize + Send + Any {
    OutOfProcess(IpcSender<T>),
    InProcess(Sender<T>),
}

impl<T> OptionalIpcSender<T> where T: Deserialize + Serialize + Send + Any {
    pub fn send(&self, value: T) -> Result<(), ()> {
        match *self {
            OptionalIpcSender::OutOfProcess(ref ipc_sender) => ipc_sender.send(value),
            OptionalIpcSender::InProcess(ref sender) => sender.send(value).map_err(|_| ()),
        }
    }
}

impl<T> Clone for OptionalIpcSender<T> where T: Deserialize + Serialize + Send + Any {
    fn clone(&self) -> OptionalIpcSender<T> {
        match *self {
            OptionalIpcSender::OutOfProcess(ref ipc_sender) => {
                OptionalIpcSender::OutOfProcess((*ipc_sender).clone())
            }
            OptionalIpcSender::InProcess(ref sender) => {
                OptionalIpcSender::InProcess((*sender).clone())
            }
        }
    }
}

impl<T> Deserialize for OptionalIpcSender<T> where T: Deserialize + Serialize + Send + Any {
    fn deserialize<D>(deserializer: &mut D)
                      -> Result<OptionalIpcSender<T>, D::Error> where D: Deserializer {
        if opts::get().multiprocess {
            return Ok(OptionalIpcSender::OutOfProcess(try!(Deserialize::deserialize(
                            deserializer))))
        }
        let id: usize = try!(Deserialize::deserialize(deserializer));
        let sender = (*IN_PROCESS_SENDERS.lock()
                                         .unwrap()
                                         .remove(&id)
                                         .unwrap()
                                         .downcast_ref::<Sender<T>>()
                                         .unwrap()).clone();
        Ok(OptionalIpcSender::InProcess(sender))
     }
}

impl<T> Serialize for OptionalIpcSender<T> where T: Deserialize + Serialize + Send + Any {
    fn serialize<S>(&self, serializer: &mut S) -> Result<(), S::Error> where S: Serializer {
        match *self {
            OptionalIpcSender::OutOfProcess(ref ipc_sender) => ipc_sender.serialize(serializer),
            OptionalIpcSender::InProcess(ref sender) => {
                let id = NEXT_SENDER_ID.fetch_add(1, Ordering::SeqCst);
                IN_PROCESS_SENDERS.lock()
                                  .unwrap()
                                  .insert(id, Box::new((*sender).clone()) as Box<Any + Send>);
                id.serialize(serializer)
            }
        }
    }
}

pub fn optional_ipc_channel<T>() -> (OptionalIpcSender<T>, Receiver<T>)
                                 where T: Deserialize + Serialize + Send + Any {
    if opts::get().multiprocess {
        let (ipc_sender, ipc_receiver) = ipc::channel().unwrap();
        let receiver = ROUTER.route_ipc_receiver_to_new_mpsc_receiver(ipc_receiver);
        (OptionalIpcSender::OutOfProcess(ipc_sender), receiver)
    } else {
        let (sender, receiver) = mpsc::channel();
        (OptionalIpcSender::InProcess(sender), receiver)
    }
}