server handshake handling
This commit is contained in:
@@ -13,13 +13,17 @@ use std::thread;
|
||||
use std::collections::VecDeque;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use crate::NetworkEvent;
|
||||
|
||||
pub struct MultipleSenders {
|
||||
senders: Vec<Sender<Message>>,
|
||||
response_channel: crossbeam_channel::Sender<NetworkEvent>,
|
||||
}
|
||||
|
||||
pub struct Message {
|
||||
payload: Vec<u8>,
|
||||
address: String,
|
||||
is_resp_to_server_handshake: bool,
|
||||
}
|
||||
|
||||
struct RetryMessage {
|
||||
@@ -68,12 +72,17 @@ impl MultipleSenders {
|
||||
MultipleSenders { senders }
|
||||
}*/
|
||||
|
||||
pub fn new(num_channels: usize, socket: &Arc<UdpSocket>) -> Self {
|
||||
pub fn new(
|
||||
num_channels: usize,
|
||||
socket: &Arc<UdpSocket>,
|
||||
cmd_tx: crossbeam_channel::Sender<NetworkEvent>,
|
||||
) -> Self {
|
||||
let mut senders = Vec::new();
|
||||
|
||||
for i in 0..num_channels {
|
||||
let (tx, rx) = mpsc::channel::<Message>();
|
||||
let sock_clone = Arc::clone(&socket);
|
||||
let cmd_tx_clone = cmd_tx.clone();
|
||||
|
||||
senders.push(tx);
|
||||
|
||||
@@ -91,6 +100,10 @@ impl MultipleSenders {
|
||||
let mut item = queue.pop_front().unwrap();
|
||||
match sock_clone.send_to(&item.msg.payload, &item.msg.address) {
|
||||
Ok(_) => {
|
||||
if (&item).msg.is_resp_to_server_handshake {
|
||||
let res =
|
||||
cmd_tx_clone.send(NetworkEvent::ConnectedHandshake());
|
||||
}
|
||||
let message_id: [u8; 4] =
|
||||
item.msg.payload[0..4].try_into().expect("size error");
|
||||
let id = i32::from_be_bytes(message_id);
|
||||
@@ -103,10 +116,14 @@ impl MultipleSenders {
|
||||
Err(e) => {
|
||||
item.attempts += 1;
|
||||
if item.attempts >= max_attempts {
|
||||
eprintln!(
|
||||
let str = format!(
|
||||
"Abandon du message après {} tentatives sur canal {}: {}, address: {}",
|
||||
item.attempts, i, e, item.msg.address
|
||||
);
|
||||
if (&item).msg.is_resp_to_server_handshake {
|
||||
let res = cmd_tx_clone
|
||||
.send(NetworkEvent::ServerHandshakeFailed(str));
|
||||
}
|
||||
} else {
|
||||
// Backoff exponentiel simple
|
||||
let backoff = Duration::from_millis(
|
||||
@@ -131,6 +148,10 @@ impl MultipleSenders {
|
||||
// On tente d'envoyer immédiatement
|
||||
match sock_clone.send_to(&msg.payload, &msg.address) {
|
||||
Ok(_) => {
|
||||
if msg.is_resp_to_server_handshake {
|
||||
let res =
|
||||
cmd_tx_clone.send(NetworkEvent::ConnectedHandshake());
|
||||
}
|
||||
let message_id: [u8; 4] =
|
||||
msg.payload[0..4].try_into().expect("size error");
|
||||
let id = i32::from_be_bytes(message_id);
|
||||
@@ -171,15 +192,29 @@ impl MultipleSenders {
|
||||
});
|
||||
}
|
||||
|
||||
MultipleSenders { senders }
|
||||
MultipleSenders {
|
||||
senders,
|
||||
response_channel: cmd_tx.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Envoie un message via un canal spécifique (round-robin ou index précis)
|
||||
pub fn send_via(&self, channel_idx: usize, data: Vec<u8>, remote_addr: String) {
|
||||
pub fn send_via(
|
||||
&self,
|
||||
channel_idx: usize,
|
||||
data: Vec<u8>,
|
||||
remote_addr: String,
|
||||
is_resp_to_server_handshake: bool,
|
||||
) {
|
||||
println!(
|
||||
"is_resp_to_server_handshake {}",
|
||||
is_resp_to_server_handshake
|
||||
);
|
||||
if let Some(sender) = self.senders.get(channel_idx) {
|
||||
let _ = sender.send(Message {
|
||||
payload: data,
|
||||
address: remote_addr,
|
||||
is_resp_to_server_handshake,
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -223,6 +258,7 @@ pub fn start_receving_thread(shared_data: &P2PSharedData, socket_addr: SocketAdd
|
||||
let cryptopair_clone = shared_data.cryptopair();
|
||||
let senders_clone = shared_data.senders();
|
||||
let messages_clone = shared_data.messages_list();
|
||||
let servername_clone = shared_data.servername();
|
||||
thread::spawn(move || {
|
||||
let mut buf = [0u8; 1024];
|
||||
loop {
|
||||
@@ -237,6 +273,7 @@ pub fn start_receving_thread(shared_data: &P2PSharedData, socket_addr: SocketAdd
|
||||
&cryptopair_clone,
|
||||
&socket_addr,
|
||||
&senders_clone,
|
||||
&servername_clone,
|
||||
);
|
||||
}
|
||||
Err(e) => eprintln!("Erreur de réception: {}", e),
|
||||
|
||||
Reference in New Issue
Block a user