From 38d9026865aa59f1efb46dfd2782076c78cdcb8b Mon Sep 17 00:00:00 2001 From: Artlef Date: Mon, 16 Mar 2020 00:18:50 +0100 Subject: [PATCH] Detect when a waiting player disconnects --- src/bin/client.rs | 19 +++--- src/bin/server.rs | 143 +++++++++++++++++++++++++++++++++------------- src/lib.rs | 4 ++ 3 files changed, 117 insertions(+), 49 deletions(-) diff --git a/src/bin/client.rs b/src/bin/client.rs index e9eb2e1..0f3e99f 100644 --- a/src/bin/client.rs +++ b/src/bin/client.rs @@ -1,6 +1,5 @@ extern crate ctrlc; -use clichess::RecvPositionError; -use clichess::UserRole; +use clichess::{RecvPositionError, UserRole, EXIT_MSG}; use serde_json::json; use shakmaty::fen::Fen; use shakmaty::{Chess, Color, Outcome, Position, Setup}; @@ -83,16 +82,17 @@ fn main() { //it's the user turn, taking user input let input = read_user_input(&client); clichess::write_to_stream(&mut stream, String::from(input.trim())).unwrap(); - if input.trim() == "exit" { + if input.trim() == EXIT_MSG { break; } } //update position after playing. match get_current_position(&client) { Ok(position) => current_position = position, - Err(RecvPositionError::UserCanceledError) => break, - Err(RecvPositionError::CommunicationError) => break, - Err(RecvPositionError::ParsePositionError) => break, + Err(_) => { + clichess::write_to_stream(&mut stream, String::from(EXIT_MSG)).unwrap(); + break; + } }; } match current_position.outcome() { @@ -141,7 +141,6 @@ fn setupctrlc() -> Arc { } fn setup_server_message_recv(stream: &UnixStream) -> io::Result> { - let buf = Arc::new(Mutex::new(String::new())); let (sender, receiver) = channel(); let thread_stream = stream.try_clone()?; @@ -178,7 +177,7 @@ fn read_user_input(client: &Client) -> String { if client.running.load(Ordering::SeqCst) { input } else { - String::from("exit") + String::from(EXIT_MSG) } } @@ -238,8 +237,8 @@ fn prompt_user_for_role(client: &Client, stream: &mut UnixStream) -> Option>, - players: Arc, Condvar)>)>>>, - receiving_buffer: Arc<(Mutex, Condvar)>, + players: Arc>)>>>, + others_serv_msg_buf: Arc>, + client_message_recv: Receiver, } fn main() { @@ -31,11 +32,13 @@ fn main() { for stream in listener.incoming() { match stream { Ok(stream) => { + let client_message_recv = setup_client_message_recv(&stream).unwrap(); let server = Server { id: counter, chess_position: chess.clone(), players: players.clone(), - receiving_buffer: Arc::new((Mutex::new(String::new()), Condvar::new())), + others_serv_msg_buf: Arc::new(Mutex::new(String::new())), + client_message_recv, }; /* connection succeeded */ thread::spawn(move || handle_player(stream, server)); @@ -84,14 +87,14 @@ fn main_loop(stream: &mut UnixStream, server: &Server, player: Player, mut playe //let go of the lock while waiting for user input. println!("server {}, waiting for player move..", server.id); let input; - match clichess::read_line_from_stream(stream) { + match server.client_message_recv.recv() { Ok(i) => input = i, Err(e) => { println!("Error while getting user input: {}", e); break; } }; - if input == "exit" { + if input == EXIT_MSG { break; } { @@ -103,43 +106,71 @@ fn main_loop(stream: &mut UnixStream, server: &Server, player: Player, mut playe Err(e) => println!("Error: {}", e), }; let chessfen = fen::fen(&*chess); - for (id, (_, cvar_buffer)) in players.iter() { + for (id, (_, others_serv_msg_buf)) in players.iter() { if server.id != *id { - cvar_buffer.0.lock().unwrap().push_str(&chessfen); - cvar_buffer.1.notify_one(); + others_serv_msg_buf.lock().unwrap().push_str(&chessfen); } } clichess::write_to_stream(stream, chessfen).unwrap(); player_turn = chess.turn(); } } else { - { - let (buffer_mutex, cvar) = &*server.receiving_buffer; - let mut buffer = buffer_mutex.lock().unwrap(); - println!("server id: {}, waiting for move to send...", server.id); - while buffer.is_empty() { - buffer = cvar.wait(buffer).unwrap(); - } - println!("server id: {}, sending {}", server.id, buffer); - match clichess::write_to_stream(stream, buffer.clone()) { - Ok(()) => buffer.clear(), - Err(e) => { - println!("{}", e); - break; - } - } - let chess = server.chess_position.lock().unwrap(); - player_turn = chess.turn(); + let position_as_fen_result = wait_for_opponent_move(server); + if position_as_fen_result.is_err() { + break; } + let position_as_fen = position_as_fen_result.unwrap(); + println!("server id: {}, sending {}", server.id, position_as_fen); + clichess::write_to_stream(stream, position_as_fen.clone()).unwrap(); + let chess = server.chess_position.lock().unwrap(); + player_turn = chess.turn(); } } } +fn wait_for_opponent_move(server: &Server) -> Result { + println!("server id: {}, waiting for move to send...", server.id); + //wait: either we receive next position from other server threads, or we receive + //"exit" from the client. + let returned_result: Result; + loop { + { + let mut others_serv_msg_buf = server.others_serv_msg_buf.lock().unwrap(); + if !others_serv_msg_buf.is_empty() { + returned_result = Ok(others_serv_msg_buf.clone()); + others_serv_msg_buf.clear(); + break; + } else { + match server.client_message_recv.try_recv() { + Ok(msg) => { + if msg == EXIT_MSG { + returned_result = Err(RecvPositionError::UserCanceledError); + break; + } else { + println!( + "Client sent message while it's not its turn, this is an error." + ); + println!("Here is the message: {}", msg); + continue; + } + } + Err(TryRecvError::Disconnected) => println!("Error: client disconnected."), + Err(TryRecvError::Empty) => continue, + } + } + } + thread::sleep(time::Duration::from_millis(10)); + } + returned_result +} + fn create_player(server: &Server, stream: &mut UnixStream) -> Result { println!("Creating player {}...", server.id); //get player name and pubkey - let username_pubkey_json = - clichess::read_line_from_stream(stream).expect("Player closed connection."); + let username_pubkey_json = server + .client_message_recv + .recv() + .expect("Player closed connection."); let username_pubkey_value: Value = serde_json::from_str(&username_pubkey_json).unwrap(); let username = username_pubkey_value["username"].to_string(); let public_key = username_pubkey_value["pubkey"].to_string(); @@ -152,7 +183,10 @@ fn create_player(server: &Server, stream: &mut UnixStream) -> Result String { fn player_disconnected(server: &Server) { let mut players = server.players.lock().unwrap(); + let player_and_receiving_buf = players.get(&server.id); + if player_and_receiving_buf.is_some() { + let (player, _) = player_and_receiving_buf.expect("is some"); + println!("Player {} disconnected.", player.username); + } players.remove(&server.id); - println!("Player disconnected."); +} + +fn setup_client_message_recv(stream: &UnixStream) -> io::Result> { + let (sender, receiver) = channel(); + let thread_stream = stream.try_clone()?; + + thread::spawn(move || { + loop { + //wait for client message + let buffer = + clichess::read_line_from_stream(&thread_stream).expect("Error message from server"); + let send_result = sender.send(buffer); + // stop the thread during a disconnection + if send_result.is_err() { + break; + } + } + }); + Ok(receiver) } diff --git a/src/lib.rs b/src/lib.rs index 7b92312..ce27fc6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,6 +10,9 @@ use std::io::prelude::*; use std::io::{BufReader, Write}; use std::os::unix::net::UnixStream; +//message to send to the server to signal we disconnected. +pub const EXIT_MSG: &str = "exit"; + #[derive(Clone)] pub struct Player { pub role: UserRole, @@ -38,6 +41,7 @@ impl fmt::Display for UserRole { } } +#[derive(Debug)] pub enum RecvPositionError { UserCanceledError, CommunicationError,