2015-02-22 1 views
7

У меня есть поток, который запускает рабочие потоки, и ожидается, что они будут жить вечно. Каждый рабочий поток поддерживает собственный список Socket с.Пожизненные проблемы обмена ссылками между потоками

Некоторые операции требуют, чтобы я проходил все сокеты в настоящее время, но у меня возникают проблемы со временем жизни, пытаясь создать главный список сокетов, содержащий указатель на сокет, принадлежащий другому списку.

use std::{str, thread}; 
use std::thread::JoinHandle; 
use std::io::{Read, Write}; 
use std::net::{TcpListener, TcpStream}; 
use std::sync::{Arc, Mutex}; 
use std::ops::DerefMut; 
use std::sync::mpsc::{channel, Sender, Receiver, TryRecvError}; 
use self::socketlist::SocketList; 
use self::mastersocketlist::MasterSocketList; 

pub struct Socket { 
    user: String, 
    stream: TcpStream, 
} 

mod socketlist { 
    use self::SocketList::{Node, End}; 
    use super::Socket; 

    pub enum SocketList { 
     Node(Socket, Box<SocketList>), 
     End, 
    } 

    impl SocketList { 
     pub fn new() -> SocketList { 
      End 
     } 

     pub fn add(self, socket: Socket) -> SocketList { 
      Node(socket, Box::new(self)) 
     } 

     pub fn newest<'a>(&'a mut self) -> Result<&'a Socket, String> { 
      match *self { 
       Node(ref mut socket, ref mut next) => Ok(socket), 
       End => Err("No socket available".to_string()), 
      } 
     } 
    } 
} 

mod mastersocketlist { 
    use self::MasterSocketList::{Node, End}; 
    use super::Socket; 

    pub enum MasterSocketList<'a> { 
     Node(Box<&'a Socket>, Box<MasterSocketList<'a>>), 
     End, 
    } 

    impl<'a> MasterSocketList<'a> { 
     pub fn new() -> MasterSocketList<'a> { 
      End 
     } 

     pub fn add(self, socket: &'a Socket) -> MasterSocketList<'a> { 
      MasterSocketList::Node(Box::new(&socket), Box::new(self)) 
     } 
    } 
} 

pub struct SlotManager { 
    prox: JoinHandle<()>, 
    prox_tx: Sender<TcpStream>, 
} 

impl SlotManager { 
    pub fn new() -> SlotManager { 
     let (tx, rx): (Sender<TcpStream>, Receiver<TcpStream>) = channel(); 

     let tx_clone = tx.clone(); 
     let prox = thread::spawn(move || SlotManager::event_loop(tx, rx)); 

     SlotManager { 
      prox: prox, 
      prox_tx: tx_clone, 
     } 
    } 

    pub fn sender(&self) -> Sender<TcpStream> { 
     self.prox_tx.clone() 
    } 

    fn event_loop(tx: Sender<TcpStream>, rx: Receiver<TcpStream>) { 
     let socket_list = Arc::new(Mutex::new(MasterSocketList::new())); 
     let mut slot = Slot::new(socket_list.clone()); 
     loop { 
      match rx.try_recv() { 
       Ok(stream) => slot.new_connection(stream), 
       Err(e) => {} 
      } 
     } 
    } 
} 

pub struct Slot { 
    prox: JoinHandle<()>, 
    prox_tx: Sender<TcpStream>, 
} 

impl Slot { 
    pub fn new(master_socket_list: Arc<Mutex<MasterSocketList>>) -> Slot { 
     let (tx, rx): (Sender<TcpStream>, Receiver<TcpStream>) = channel(); 

     let tx_clone = tx.clone(); 
     let prox = thread::spawn(move || Slot::event_loop(tx, rx, master_socket_list)); 

     Slot { 
      prox: prox, 
      prox_tx: tx_clone, 
     } 
    } 

    pub fn new_connection(&self, stream: TcpStream) { 
     self.prox_tx.send(stream); 
    } 

    fn event_loop(tx: Sender<TcpStream>, 
        rx: Receiver<TcpStream>, 
        master_socket_list: Arc<Mutex<MasterSocketList>>) { 

     let mut sockets = SocketList::new(); 
     loop { 
      // Check for new connections 
      match rx.try_recv() { 
       Ok(stream) => { 
        let mut socket = Socket { 
         user: "default".to_string(), 
         stream: stream, 
        }; 
        sockets = sockets.add(socket); 

        let mut msl_guard = match master_socket_list.lock() { 
         Ok(guard) => guard, 
         Err(poisoned) => poisoned.into_inner(), 
        }; 
        let mut msl_handle = msl_guard.deref_mut(); 
        *msl_handle = msl_handle.add(sockets.newest().unwrap()); 
       } 
       Err(e) => {} 
      } 
     } 
    } 
} 

fn main() { 
    let mut slot_manager = SlotManager::new(); 
    let listener = TcpListener::bind("127.0.0.1:1234").unwrap(); 
    for stream in listener.incoming() { 
     match stream { 
      Ok(stream) => { 
       let sender = slot_manager.sender(); 
       thread::spawn(move || { 
        sender.send(stream); 
        //process_new_connection(stream, sender) 
       }); 
      } 
      Err(e) => println!("Connection error: {}", e), 
     } 
    } 
    drop(listener); 
} 

Ошибки, которые я получаю ...

error[E0477]: the type `[[email protected]/main.rs:107:34: 107:86 tx:std::sync::mpsc::Sender<std::net::TcpStream>, rx:std::sync::mpsc::Receiver<std::net::TcpStream>, master_socket_list:std::sync::Arc<std::sync::Mutex<mastersocketlist::MasterSocketList<'_>>>]` does not fulfill the required lifetime 
    --> src/main.rs:107:20 
    | 
107 |   let prox = thread::spawn(move || Slot::event_loop(tx, rx, master_socket_list)); 
    |     ^^^^^^^^^^^^^ 
    | 
    = note: type must outlive the static lifetime 

Я даже не знаю, что я пытаюсь можно как безопасный код.

Я хочу, чтобы mastersocketlist содержал указатель на сокет, где время жизни сокета определяется потоком, который его создал. Я считаю, что это все эти ошибки, но я понятия не имею, как обеспечить правильные аннотации для жизни, чтобы исправить это.

ответ

22

Замечательная вещь о Rust заключается в том, что проверка типов по всем функциям выполняется исключительно подписи функции. Это означает, что вы можете заменить большинство тел функций unimplemented!() и сохранить ошибки проверки типов.

Повторите этот процесс несколько раз, и вы в конечном итоге не называете множество функций - удалите их. Могут также помочь использование модулей и сокращение структур/перечислений.

В какой-то момент ваша ошибка исчезнет - первый ключ к проблеме! Держите на него, и вы получите крошечную воспроизведение:

use std::thread; 
use std::sync::{Arc, Mutex}; 

pub enum MasterSocketList<'a> { 
    One(&'a u8), 
} 

pub struct Slot; 

impl Slot { 
    pub fn new<'a>(master_socket_list: Arc<Mutex<MasterSocketList<'a>>>) -> Slot { 
     thread::spawn(move || { master_socket_list; }); 
     unimplemented!(); 
    } 
} 

fn main() {} 

Проверка на ошибку, он по-прежнему соответствует:

error[E0477]: the type `[[email protected]/main.rs:12:23: 12:54 master_socket_list:std::sync::Arc<std::sync::Mutex<MasterSocketList<'a>>>]` does not fulfill the required lifetime 
    --> src/main.rs:12:9 
    | 
12 |   thread::spawn(move || { master_socket_list; }); 
    |   ^^^^^^^^^^^^^ 
    | 
    = note: type must outlive the static lifetime 

Давайте проверим документы для подписи thread::spawn:

pub fn spawn<F>(f: F) -> JoinHandle where F: FnOnce(), F: Send + 'static 

Ключевым моментом здесь является F: Send + 'static - замыкание, которое вы предоставляете spawnдолжно содержать только ссылки, которые в последний раз соответствуют е жизнь программы. Это потому, что spawn может создавать потоки, которые становятся снят. Когда-то отделившись, нить могла жить вечно, поэтому все ссылки должны жить как минимум так долго, иначе вы бы получили болтающиеся ссылки, плохая вещь! Руста спасает день, еще раз!

Если вы хотите, чтобы гарантировать, что потоки будут заканчиваться в известной точке, вы можете использовать область видимости темы, такие как те, которые предусмотрены scoped-threadpool или crossbeam.

Если у вашего кода не было переменной с продолжительностью жизни внутри нее, использование какого-либо типа совместного использования, такого как Arc, в паре с чем-то, что гарантирует, что только один поток может мутировать переменную, например Mutex. Это позволяет каждому потоку владеть общим значением, и, наконец, отбрасывает его всякий раз, когда последний поток выходит. См. Share mutable object between threads.

+3

Эй, я понятия не имел, что могу похлопать 'unimplemented()!' Там и сделать примеры чище. Большое вам спасибо за помощь! – nathansizemore

 Смежные вопросы

  • Нет связанных вопросов^_^