290 lines
12 KiB
Rust
290 lines
12 KiB
Rust
use crate::P2PSharedData;
|
|
use crate::cryptographic_signature::CryptographicSignature;
|
|
use crate::message_handling::EventType;
|
|
use crate::message_handling::handle_recevied_message;
|
|
use std::collections::HashMap;
|
|
use std::net::SocketAddr;
|
|
use std::net::UdpSocket;
|
|
use std::sync::{Arc, Mutex};
|
|
|
|
use std::sync::mpsc::{self, Sender};
|
|
use std::thread;
|
|
|
|
use std::collections::VecDeque;
|
|
use std::time::{Duration, Instant};
|
|
|
|
use crate::NetworkEvent;
|
|
|
|
pub struct MultipleSenders {
|
|
senders: Vec<Sender<Message>>,
|
|
response_channel: crossbeam_channel::Sender<NetworkEvent>,
|
|
}
|
|
|
|
pub struct Message {
|
|
payload: Vec<u8>,
|
|
address: String,
|
|
is_resp_to_server_handshake: bool,
|
|
}
|
|
|
|
struct RetryMessage {
|
|
msg: Message,
|
|
attempts: u8,
|
|
next_try: Instant,
|
|
}
|
|
|
|
impl MultipleSenders {
|
|
/*pub fn new(num_channels: usize, socket: &Arc<UdpSocket>) -> Self {
|
|
let mut senders = Vec::new();
|
|
|
|
// Wrap the socket in an Arc so it can be shared across threads
|
|
|
|
for i in 0..num_channels {
|
|
let (tx, rx) = mpsc::channel::<Message>();
|
|
|
|
// Clone the Arc (this just bumps the reference count, it doesn't copy the socket)
|
|
let sock_clone = Arc::clone(&socket);
|
|
|
|
senders.push(tx);
|
|
|
|
thread::spawn(move || {
|
|
println!("Canal d'envoi {} prêt", i);
|
|
|
|
for msg in rx {
|
|
// Use the cloned Arc inside the thread
|
|
if let Err(e) = sock_clone.send_to(&msg.payload, &msg.address) {
|
|
eprintln!(
|
|
"Erreur d'envoi sur canal {}: {}, address: {}",
|
|
i, e, &msg.address
|
|
);
|
|
} else {
|
|
let message_id: [u8; 4] = msg.payload[0..4].try_into().expect("size error");
|
|
let id = i32::from_be_bytes(message_id);
|
|
let message_type = msg.payload[4];
|
|
println!(
|
|
"Message {0} de type {1} envoyé à {2} par le canal {3}",
|
|
id, message_type, msg.address, i
|
|
);
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
MultipleSenders { senders }
|
|
}*/
|
|
|
|
pub fn new(
|
|
num_channels: usize,
|
|
socket: &Arc<UdpSocket>,
|
|
cmd_tx: crossbeam_channel::Sender<NetworkEvent>,
|
|
) -> Self {
|
|
let mut senders = Vec::new();
|
|
|
|
for i in 0..num_channels {
|
|
let (tx, rx) = mpsc::channel::<Message>();
|
|
let sock_clone = Arc::clone(&socket);
|
|
let cmd_tx_clone = cmd_tx.clone();
|
|
|
|
senders.push(tx);
|
|
|
|
thread::spawn(move || {
|
|
println!("Canal d'envoi {} prêt", i);
|
|
|
|
let mut queue: VecDeque<RetryMessage> = VecDeque::new();
|
|
let max_attempts = 5;
|
|
|
|
loop {
|
|
// Priorité aux messages en attente prêts à être réessayés
|
|
if let Some(front) = queue.front() {
|
|
if front.next_try <= Instant::now() {
|
|
// On prend le message de la queue
|
|
let mut item = queue.pop_front().unwrap();
|
|
match sock_clone.send_to(&item.msg.payload, &item.msg.address) {
|
|
Ok(_) => {
|
|
if (&item).msg.is_resp_to_server_handshake {
|
|
let res =
|
|
cmd_tx_clone.send(NetworkEvent::ConnectedHandshake());
|
|
}
|
|
let message_id: [u8; 4] =
|
|
item.msg.payload[0..4].try_into().expect("size error");
|
|
let id = i32::from_be_bytes(message_id);
|
|
let message_type = item.msg.payload[4];
|
|
println!(
|
|
"Message {0} de type {1} envoyé à {2} par le canal {3} (retry {4})",
|
|
id, message_type, item.msg.address, i, item.attempts
|
|
);
|
|
}
|
|
Err(e) => {
|
|
item.attempts += 1;
|
|
if item.attempts >= max_attempts {
|
|
let str = format!(
|
|
"Abandon du message après {} tentatives sur canal {}: {}, address: {}",
|
|
item.attempts, i, e, item.msg.address
|
|
);
|
|
if (&item).msg.is_resp_to_server_handshake {
|
|
let res = cmd_tx_clone
|
|
.send(NetworkEvent::ServerHandshakeFailed(str));
|
|
}
|
|
} else {
|
|
// Backoff exponentiel simple
|
|
let backoff = Duration::from_millis(
|
|
2000u64.saturating_pow(item.attempts as u32),
|
|
);
|
|
item.next_try = Instant::now() + backoff;
|
|
eprintln!(
|
|
"Erreur d'envoi sur canal {}: {}, reprogrammation dans {:?}, tentative {}",
|
|
i, e, backoff, item.attempts
|
|
);
|
|
queue.push_front(item); // remettre en tête pour réessayer plus tôt
|
|
}
|
|
}
|
|
}
|
|
continue;
|
|
}
|
|
}
|
|
|
|
// Si aucun retry prêt, on bloque sur rx avec timeout court, pour pouvoir traiter les timers
|
|
match rx.recv_timeout(Duration::from_millis(200)) {
|
|
Ok(msg) => {
|
|
// On tente d'envoyer immédiatement
|
|
match sock_clone.send_to(&msg.payload, &msg.address) {
|
|
Ok(_) => {
|
|
if msg.is_resp_to_server_handshake {
|
|
let res =
|
|
cmd_tx_clone.send(NetworkEvent::ConnectedHandshake());
|
|
}
|
|
let message_id: [u8; 4] =
|
|
msg.payload[0..4].try_into().expect("size error");
|
|
let id = i32::from_be_bytes(message_id);
|
|
let message_type = msg.payload[4];
|
|
println!(
|
|
"Message {0} de type {1} envoyé à {2} par le canal {3}",
|
|
id, message_type, msg.address, i
|
|
);
|
|
}
|
|
Err(e) => {
|
|
eprintln!(
|
|
"Erreur d'envoi initial sur canal {}: {}, address: {} -- mise en queue pour retry",
|
|
i, e, &msg.address
|
|
);
|
|
let retry = RetryMessage {
|
|
msg,
|
|
attempts: 1,
|
|
next_try: Instant::now() + Duration::from_millis(100),
|
|
};
|
|
queue.push_back(retry);
|
|
}
|
|
}
|
|
}
|
|
Err(mpsc::RecvTimeoutError::Timeout) => {
|
|
// Permet de vérifier la queue à nouveau
|
|
continue;
|
|
}
|
|
Err(mpsc::RecvTimeoutError::Disconnected) => {
|
|
// Le sender a été fermé ; vider la queue et sortir
|
|
eprintln!(
|
|
"Sender fermé pour le canal {}, fermeture du thread d'envoi",
|
|
i
|
|
);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
MultipleSenders {
|
|
senders,
|
|
response_channel: cmd_tx.clone(),
|
|
}
|
|
}
|
|
|
|
/// Envoie un message via un canal spécifique (round-robin ou index précis)
|
|
pub fn send_via(
|
|
&self,
|
|
channel_idx: usize,
|
|
data: Vec<u8>,
|
|
remote_addr: String,
|
|
is_resp_to_server_handshake: bool,
|
|
) {
|
|
println!(
|
|
"is_resp_to_server_handshake {}",
|
|
is_resp_to_server_handshake
|
|
);
|
|
if let Some(sender) = self.senders.get(channel_idx) {
|
|
let _ = sender.send(Message {
|
|
payload: data,
|
|
address: remote_addr,
|
|
is_resp_to_server_handshake,
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
/*pub fn start_receving_thread(
|
|
socket: &Arc<UdpSocket>,
|
|
messages_list: &Arc<HashMap<i32, EventType>>,
|
|
crypto_pair: &Arc<CryptographicSignature>,
|
|
socket_addr: SocketAddr,
|
|
senders: &Arc<MultipleSenders>,
|
|
) {
|
|
let sock_clone = Arc::clone(socket);
|
|
let cryptopair_clone = Arc::clone(crypto_pair);
|
|
let senders_clone = Arc::clone(senders);
|
|
|
|
let messages_clone = Arc::clone(messages_list);
|
|
thread::spawn(move || {
|
|
let mut buf = [0u8; 1024];
|
|
|
|
loop {
|
|
match sock_clone.recv_from(&mut buf) {
|
|
Ok((amt, src)) => {
|
|
handle_recevied_message(
|
|
&messages_clone,
|
|
&buf.to_vec(),
|
|
&cryptopair_clone,
|
|
&socket_addr,
|
|
&senders_clone,
|
|
);
|
|
println!("Reçu {} octets de {}: {:?}", amt, src, &buf[..amt]);
|
|
}
|
|
Err(e) => eprintln!("Erreur de réception: {}", e),
|
|
}
|
|
}
|
|
});
|
|
}*/
|
|
|
|
pub fn start_receving_thread(
|
|
shared_data: &P2PSharedData,
|
|
socket_addr: SocketAddr,
|
|
cmd_tx: crossbeam_channel::Sender<NetworkEvent>,
|
|
) {
|
|
let sock_clone = shared_data.socket();
|
|
let cryptopair_clone = shared_data.cryptopair();
|
|
let senders_clone = shared_data.senders();
|
|
let messages_clone = shared_data.messages_list();
|
|
let servername_clone = shared_data.servername();
|
|
thread::spawn(move || {
|
|
let mut buf = [0u8; 1024];
|
|
loop {
|
|
match sock_clone.recv_from(&mut buf) {
|
|
Ok((amt, src)) => {
|
|
let received_data = buf[..amt].to_vec();
|
|
|
|
println!("Reçu {} octets de {}: {:?}", amt, src, received_data);
|
|
handle_recevied_message(
|
|
&messages_clone,
|
|
&received_data,
|
|
&cryptopair_clone,
|
|
&socket_addr,
|
|
&senders_clone,
|
|
&servername_clone,
|
|
cmd_tx.clone(),
|
|
src,
|
|
);
|
|
}
|
|
Err(e) => eprintln!("Erreur de réception: {}", e),
|
|
}
|
|
}
|
|
});
|
|
}
|