diff --git a/src/bin/server.rs b/src/bin/server.rs index 4cc794a..009139c 100644 --- a/src/bin/server.rs +++ b/src/bin/server.rs @@ -6,18 +6,18 @@ use std::collections::HashMap; use std::fs; use std::io; use std::os::unix::net::{UnixListener, UnixStream}; -use std::sync::mpsc::{channel, Receiver, TryRecvError}; -use std::sync::{Arc, Mutex}; +use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError}; +use std::sync::{Arc, Condvar, Mutex}; use std::thread; -use std::time; struct Server { id: usize, chess_position: Arc>, - players: Arc>)>>>, - others_serv_msg_buf: Arc>, + players: Arc, Arc<(Mutex, Condvar)>)>>>, + others_serv_msg_recv: Receiver, client_message_recv: Receiver, + cvar: Arc<(Mutex, Condvar)>, } fn main() { @@ -32,16 +32,20 @@ fn main() { for stream in listener.incoming() { match stream { Ok(stream) => { - let client_message_recv = setup_client_message_recv(&stream).unwrap(); + let condvar_pair = Arc::new((Mutex::new(false), Condvar::new())); + let client_message_recv = + setup_client_message_recv(&stream, condvar_pair.clone()).unwrap(); + let (others_serv_msg_sender, others_serv_msg_recv) = channel(); let server = Server { id: counter, chess_position: chess.clone(), players: players.clone(), - others_serv_msg_buf: Arc::new(Mutex::new(String::new())), + others_serv_msg_recv, client_message_recv, + cvar: condvar_pair, }; /* connection succeeded */ - thread::spawn(move || handle_player(stream, server)); + thread::spawn(move || handle_player(stream, server, others_serv_msg_sender)); counter += 1; } Err(err) => { @@ -52,8 +56,8 @@ fn main() { } } -fn handle_player(mut stream: UnixStream, server: Server) { - match initialize_client(&mut stream, &server) { +fn handle_player(mut stream: UnixStream, server: Server, others_serv_msg_sender: Sender) { + match initialize_client(&mut stream, &server, others_serv_msg_sender) { Ok((player, player_turn)) => main_loop(&mut stream, &server, player, player_turn), Err(e) => println!("User id {} could not be initialized: {}", server.id, e), }; @@ -63,9 +67,10 @@ fn handle_player(mut stream: UnixStream, server: Server) { fn initialize_client( stream: &mut UnixStream, server: &Server, + others_serv_msg_sender: Sender, ) -> Result<(Player, Color), RecvPositionError> { //create player - let player = create_player(server, stream)?; + let player = create_player(server, stream, others_serv_msg_sender)?; let player_turn: Color; //send current position to the player println!( @@ -106,9 +111,13 @@ fn main_loop(stream: &mut UnixStream, server: &Server, player: Player, mut playe Err(e) => println!("Error: {}", e), }; let chessfen = fen::fen(&*chess); - for (id, (_, others_serv_msg_buf)) in players.iter() { + for (id, (_, others_serv_msg_sender, cvar_pair)) in players.iter() { if server.id != *id { - others_serv_msg_buf.lock().unwrap().push_str(&chessfen); + others_serv_msg_sender.send(chessfen.clone()); + let (lock, cvar) = &**cvar_pair; + let mut message_sent = lock.lock().unwrap(); + *message_sent = true; + cvar.notify_one(); } } if clichess::write_to_stream(stream, chessfen).is_err() { @@ -134,40 +143,44 @@ fn wait_for_opponent_move(server: &Server) -> Result println!("server id: {}, waiting for move to send...", server.id); //wait: either we receive next position from other server threads, or we receive //"exit" from the client. - let returned_result: Result; - loop { - { - let mut others_serv_msg_buf = server.others_serv_msg_buf.lock().unwrap(); - if !others_serv_msg_buf.is_empty() { - returned_result = Ok(others_serv_msg_buf.clone()); - others_serv_msg_buf.clear(); - break; + let mut returned_result = Err(RecvPositionError::UserCanceledError); + let (lock, cvar) = &*server.cvar; + let mut message_sent = lock.lock().unwrap(); + *message_sent = false; + while !*message_sent { + message_sent = cvar.wait(message_sent).unwrap(); + } + *message_sent = false; + match server.others_serv_msg_recv.try_recv() { + Ok(msg) => { + returned_result = Ok(msg); + } + Err(TryRecvError::Disconnected) => { + println!("Error: other server thread disconnected while sending move.") + } + Err(TryRecvError::Empty) => { /*nothing to do*/ } + } + match server.client_message_recv.try_recv() { + Ok(msg) => { + if msg == EXIT_MSG { + returned_result = Err(RecvPositionError::UserCanceledError); } else { - match server.client_message_recv.try_recv() { - Ok(msg) => { - if msg == EXIT_MSG { - returned_result = Err(RecvPositionError::UserCanceledError); - break; - } else { - println!( - "Client sent message while it's not its turn, this is an error." - ); - println!("Here is the message: {}", msg); - returned_result = Err(RecvPositionError::UserCanceledError); - break; - } - } - Err(TryRecvError::Disconnected) => println!("Error: client disconnected."), - Err(TryRecvError::Empty) => continue, - } + println!("Client sent message while it's not its turn, this is an error."); + println!("Here is the message: {}", msg); + returned_result = Err(RecvPositionError::UserCanceledError); } } - thread::sleep(time::Duration::from_millis(10)); + Err(TryRecvError::Disconnected) => println!("Error: client disconnected."), + Err(TryRecvError::Empty) => {} } returned_result } -fn create_player(server: &Server, stream: &mut UnixStream) -> Result { +fn create_player( + server: &Server, + stream: &mut UnixStream, + others_serv_msg_sender: Sender, +) -> Result { println!("Creating player {}...", server.id); //get player name and pubkey let username_pubkey_json = server @@ -188,7 +201,7 @@ fn create_player(server: &Server, stream: &mut UnixStream) -> Result String { let mut available_roles = vec![UserRole::White, UserRole::Black, UserRole::Spectator]; let players = server.players.lock().unwrap(); - for (_, (player, _)) in players.iter() { + for (_, (player, _, _)) in players.iter() { match available_roles.iter().position(|r| *r == player.role) { Some(index) => available_roles.remove(index), None => continue, @@ -275,13 +288,16 @@ fn player_disconnected(server: &Server) { let mut players = server.players.lock().unwrap(); let player_and_receiving_buf = players.get(&server.id); if player_and_receiving_buf.is_some() { - let (player, _) = player_and_receiving_buf.expect("is some"); + let (player, _, _) = player_and_receiving_buf.expect("is some"); println!("Player {} disconnected.", player.username); } players.remove(&server.id); } -fn setup_client_message_recv(stream: &UnixStream) -> io::Result> { +fn setup_client_message_recv( + stream: &UnixStream, + cvar_pair: Arc<(Mutex, Condvar)>, +) -> io::Result> { let (sender, receiver) = channel(); let thread_stream = stream.try_clone()?; @@ -290,6 +306,11 @@ fn setup_client_message_recv(stream: &UnixStream) -> io::Result //wait for client message let buffer = clichess::read_line_from_stream(&thread_stream).expect("Error message from server"); + //notify the server if waiting + let (lock, cvar) = &*cvar_pair; + let mut message_sent = lock.lock().unwrap(); + *message_sent = true; + cvar.notify_one(); let send_result = sender.send(buffer); // stop the thread during a disconnection if send_result.is_err() {