From c748dfa71dc0702bee74090c87d80c19fd95253d Mon Sep 17 00:00:00 2001 From: wikano Date: Sun, 4 Jan 2026 01:35:51 +0100 Subject: [PATCH] retry sending messages --- client-network/src/lib.rs | 28 ++++-- client-network/src/message_handling.rs | 2 + client-network/src/messages_channels.rs | 122 ++++++++++++++++++++++-- 3 files changed, 134 insertions(+), 18 deletions(-) diff --git a/client-network/src/lib.rs b/client-network/src/lib.rs index 86f49a1..bc99605 100644 --- a/client-network/src/lib.rs +++ b/client-network/src/lib.rs @@ -15,7 +15,10 @@ use crate::{ }, server_communication::{generate_id, get_peer_list}, }; -use std::sync::{Arc, Mutex}; +use std::{ + fmt, + sync::{Arc, Mutex}, +}; use std::{ io::Error, net::{SocketAddr, UdpSocket}, @@ -173,29 +176,34 @@ pub fn start_p2p_executor( generate_id(), ); - let res = event_tx - .send(NetworkEvent::ConnectedHandshake()); + //let res = event_tx + // .send(NetworkEvent::()); } else { //let res = event_tx.send(NetworkEvent::Error()); - eprintln!( + let err_msg = format!( "no valid socket addresses found in: {}", s - ); + ) + .to_string(); + let res = + event_tx.send(NetworkEvent::Error(err_msg)); } } Err(e) => { //let res = event_tx.send(NetworkEvent::Error()); - eprintln!( + let err_msg = format!( "invalid UTF-8 in socket address bytes: {}", e - ); + ) + .to_string(); + let res = event_tx.send(NetworkEvent::Error(err_msg)); } } } Err(e) => { - let mut err_msg = - String::from("failed to retrieve socket address:"); - err_msg += &e.to_string(); + let err_msg = + format!("failed to retreive socket address: {}", e) + .to_string(); let res = event_tx.send(NetworkEvent::Error(err_msg)); } } diff --git a/client-network/src/message_handling.rs b/client-network/src/message_handling.rs index 7744a96..79fef9b 100644 --- a/client-network/src/message_handling.rs +++ b/client-network/src/message_handling.rs @@ -70,6 +70,8 @@ pub fn handle_recevied_message( Some(_) => print!("Not implemented"), None => { let message_type = recevied_message[4]; + + // Handle handshake if message_type == 1 { println!("verify the signature"); let parsed_received_message = HandshakeMessage::parse(recevied_message.to_vec()); diff --git a/client-network/src/messages_channels.rs b/client-network/src/messages_channels.rs index 5e6ccb3..34b2905 100644 --- a/client-network/src/messages_channels.rs +++ b/client-network/src/messages_channels.rs @@ -10,6 +10,9 @@ use std::sync::{Arc, Mutex}; use std::sync::mpsc::{self, Sender}; use std::thread; +use std::collections::VecDeque; +use std::time::{Duration, Instant}; + pub struct MultipleSenders { senders: Vec>, } @@ -19,17 +22,14 @@ pub struct Message { address: String, } -impl Message { - pub fn new(payload: Vec, address: String) -> Self { - Message { - payload: payload, - address: address, - } - } +struct RetryMessage { + msg: Message, + attempts: u8, + next_try: Instant, } impl MultipleSenders { - pub fn new(num_channels: usize, socket: &Arc) -> Self { + /*pub fn new(num_channels: usize, socket: &Arc) -> Self { let mut senders = Vec::new(); // Wrap the socket in an Arc so it can be shared across threads @@ -65,6 +65,112 @@ impl MultipleSenders { }); } + MultipleSenders { senders } + }*/ + + pub fn new(num_channels: usize, socket: &Arc) -> Self { + let mut senders = Vec::new(); + + for i in 0..num_channels { + let (tx, rx) = mpsc::channel::(); + let sock_clone = Arc::clone(&socket); + + senders.push(tx); + + thread::spawn(move || { + println!("Canal d'envoi {} prêt", i); + + let mut queue: VecDeque = VecDeque::new(); + let max_attempts = 5; + + loop { + // Priorité aux messages en attente prêts à être réessayés + if let Some(front) = queue.front() { + if front.next_try <= Instant::now() { + // On prend le message de la queue + let mut item = queue.pop_front().unwrap(); + match sock_clone.send_to(&item.msg.payload, &item.msg.address) { + Ok(_) => { + let message_id: [u8; 4] = + item.msg.payload[0..4].try_into().expect("size error"); + let id = i32::from_be_bytes(message_id); + let message_type = item.msg.payload[4]; + println!( + "Message {0} de type {1} envoyé à {2} par le canal {3} (retry {4})", + id, message_type, item.msg.address, i, item.attempts + ); + } + Err(e) => { + item.attempts += 1; + if item.attempts >= max_attempts { + eprintln!( + "Abandon du message après {} tentatives sur canal {}: {}, address: {}", + item.attempts, i, e, item.msg.address + ); + } else { + // Backoff exponentiel simple + let backoff = Duration::from_millis( + 2000u64.saturating_pow(item.attempts as u32), + ); + item.next_try = Instant::now() + backoff; + eprintln!( + "Erreur d'envoi sur canal {}: {}, reprogrammation dans {:?}, tentative {}", + i, e, backoff, item.attempts + ); + queue.push_front(item); // remettre en tête pour réessayer plus tôt + } + } + } + continue; + } + } + + // Si aucun retry prêt, on bloque sur rx avec timeout court, pour pouvoir traiter les timers + match rx.recv_timeout(Duration::from_millis(200)) { + Ok(msg) => { + // On tente d'envoyer immédiatement + match sock_clone.send_to(&msg.payload, &msg.address) { + Ok(_) => { + let message_id: [u8; 4] = + msg.payload[0..4].try_into().expect("size error"); + let id = i32::from_be_bytes(message_id); + let message_type = msg.payload[4]; + println!( + "Message {0} de type {1} envoyé à {2} par le canal {3}", + id, message_type, msg.address, i + ); + } + Err(e) => { + eprintln!( + "Erreur d'envoi initial sur canal {}: {}, address: {} -- mise en queue pour retry", + i, e, &msg.address + ); + let retry = RetryMessage { + msg, + attempts: 1, + next_try: Instant::now() + Duration::from_millis(100), + }; + queue.push_back(retry); + } + } + } + Err(mpsc::RecvTimeoutError::Timeout) => { + // Permet de vérifier la queue à nouveau + continue; + } + Err(mpsc::RecvTimeoutError::Disconnected) => { + // Le sender a été fermé ; vider la queue et sortir + eprintln!( + "Sender fermé pour le canal {}, fermeture du thread d'envoi", + i + ); + break; + } + } + } + }); + } + MultipleSenders { senders } }