retry sending messages

This commit is contained in:
2026-01-04 01:35:51 +01:00
parent 74f30f2c7f
commit c748dfa71d
3 changed files with 134 additions and 18 deletions

View File

@@ -15,7 +15,10 @@ use crate::{
}, },
server_communication::{generate_id, get_peer_list}, server_communication::{generate_id, get_peer_list},
}; };
use std::sync::{Arc, Mutex}; use std::{
fmt,
sync::{Arc, Mutex},
};
use std::{ use std::{
io::Error, io::Error,
net::{SocketAddr, UdpSocket}, net::{SocketAddr, UdpSocket},
@@ -173,29 +176,34 @@ pub fn start_p2p_executor(
generate_id(), generate_id(),
); );
let res = event_tx //let res = event_tx
.send(NetworkEvent::ConnectedHandshake()); // .send(NetworkEvent::());
} else { } else {
//let res = event_tx.send(NetworkEvent::Error()); //let res = event_tx.send(NetworkEvent::Error());
eprintln!( let err_msg = format!(
"no valid socket addresses found in: {}", "no valid socket addresses found in: {}",
s s
); )
.to_string();
let res =
event_tx.send(NetworkEvent::Error(err_msg));
} }
} }
Err(e) => { Err(e) => {
//let res = event_tx.send(NetworkEvent::Error()); //let res = event_tx.send(NetworkEvent::Error());
eprintln!( let err_msg = format!(
"invalid UTF-8 in socket address bytes: {}", "invalid UTF-8 in socket address bytes: {}",
e e
); )
.to_string();
let res = event_tx.send(NetworkEvent::Error(err_msg));
} }
} }
} }
Err(e) => { Err(e) => {
let mut err_msg = let err_msg =
String::from("failed to retrieve socket address:"); format!("failed to retreive socket address: {}", e)
err_msg += &e.to_string(); .to_string();
let res = event_tx.send(NetworkEvent::Error(err_msg)); let res = event_tx.send(NetworkEvent::Error(err_msg));
} }
} }

View File

@@ -70,6 +70,8 @@ pub fn handle_recevied_message(
Some(_) => print!("Not implemented"), Some(_) => print!("Not implemented"),
None => { None => {
let message_type = recevied_message[4]; let message_type = recevied_message[4];
// Handle handshake
if message_type == 1 { if message_type == 1 {
println!("verify the signature"); println!("verify the signature");
let parsed_received_message = HandshakeMessage::parse(recevied_message.to_vec()); let parsed_received_message = HandshakeMessage::parse(recevied_message.to_vec());

View File

@@ -10,6 +10,9 @@ use std::sync::{Arc, Mutex};
use std::sync::mpsc::{self, Sender}; use std::sync::mpsc::{self, Sender};
use std::thread; use std::thread;
use std::collections::VecDeque;
use std::time::{Duration, Instant};
pub struct MultipleSenders { pub struct MultipleSenders {
senders: Vec<Sender<Message>>, senders: Vec<Sender<Message>>,
} }
@@ -19,17 +22,14 @@ pub struct Message {
address: String, address: String,
} }
impl Message { struct RetryMessage {
pub fn new(payload: Vec<u8>, address: String) -> Self { msg: Message,
Message { attempts: u8,
payload: payload, next_try: Instant,
address: address,
}
}
} }
impl MultipleSenders { impl MultipleSenders {
pub fn new(num_channels: usize, socket: &Arc<UdpSocket>) -> Self { /*pub fn new(num_channels: usize, socket: &Arc<UdpSocket>) -> Self {
let mut senders = Vec::new(); let mut senders = Vec::new();
// Wrap the socket in an Arc so it can be shared across threads // Wrap the socket in an Arc so it can be shared across threads
@@ -65,6 +65,112 @@ impl MultipleSenders {
}); });
} }
MultipleSenders { senders }
}*/
pub fn new(num_channels: usize, socket: &Arc<UdpSocket>) -> Self {
let mut senders = Vec::new();
for i in 0..num_channels {
let (tx, rx) = mpsc::channel::<Message>();
let sock_clone = Arc::clone(&socket);
senders.push(tx);
thread::spawn(move || {
println!("Canal d'envoi {} prêt", i);
let mut queue: VecDeque<RetryMessage> = VecDeque::new();
let max_attempts = 5;
loop {
// Priorité aux messages en attente prêts à être réessayés
if let Some(front) = queue.front() {
if front.next_try <= Instant::now() {
// On prend le message de la queue
let mut item = queue.pop_front().unwrap();
match sock_clone.send_to(&item.msg.payload, &item.msg.address) {
Ok(_) => {
let message_id: [u8; 4] =
item.msg.payload[0..4].try_into().expect("size error");
let id = i32::from_be_bytes(message_id);
let message_type = item.msg.payload[4];
println!(
"Message {0} de type {1} envoyé à {2} par le canal {3} (retry {4})",
id, message_type, item.msg.address, i, item.attempts
);
}
Err(e) => {
item.attempts += 1;
if item.attempts >= max_attempts {
eprintln!(
"Abandon du message après {} tentatives sur canal {}: {}, address: {}",
item.attempts, i, e, item.msg.address
);
} else {
// Backoff exponentiel simple
let backoff = Duration::from_millis(
2000u64.saturating_pow(item.attempts as u32),
);
item.next_try = Instant::now() + backoff;
eprintln!(
"Erreur d'envoi sur canal {}: {}, reprogrammation dans {:?}, tentative {}",
i, e, backoff, item.attempts
);
queue.push_front(item); // remettre en tête pour réessayer plus tôt
}
}
}
continue;
}
}
// Si aucun retry prêt, on bloque sur rx avec timeout court, pour pouvoir traiter les timers
match rx.recv_timeout(Duration::from_millis(200)) {
Ok(msg) => {
// On tente d'envoyer immédiatement
match sock_clone.send_to(&msg.payload, &msg.address) {
Ok(_) => {
let message_id: [u8; 4] =
msg.payload[0..4].try_into().expect("size error");
let id = i32::from_be_bytes(message_id);
let message_type = msg.payload[4];
println!(
"Message {0} de type {1} envoyé à {2} par le canal {3}",
id, message_type, msg.address, i
);
}
Err(e) => {
eprintln!(
"Erreur d'envoi initial sur canal {}: {}, address: {} -- mise en queue pour retry",
i, e, &msg.address
);
let retry = RetryMessage {
msg,
attempts: 1,
next_try: Instant::now() + Duration::from_millis(100),
};
queue.push_back(retry);
}
}
}
Err(mpsc::RecvTimeoutError::Timeout) => {
// Permet de vérifier la queue à nouveau
continue;
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
// Le sender a été fermé ; vider la queue et sortir
eprintln!(
"Sender fermé pour le canal {}, fermeture du thread d'envoi",
i
);
break;
}
}
}
});
}
MultipleSenders { senders } MultipleSenders { senders }
} }