From dacedd1cebbbdf98e7145cf0b95984968889fc24 Mon Sep 17 00:00:00 2001 From: TIBERGHIEN corentin Date: Tue, 20 Jan 2026 01:10:09 +0100 Subject: [PATCH] exp backoff and theads handling --- client-gui/src/gui_app.rs | 12 +- client-network/src/datum_parsing.rs | 47 ++- client-network/src/lib.rs | 87 +++++- client-network/src/message_handling.rs | 239 ++++++++++------ client-network/src/messages_channels.rs | 346 +++++++++++------------ client-network/src/messages_structure.rs | 11 +- client-network/src/peers_refresh.rs | 16 +- client-network/src/registration.rs | 7 +- client-network/src/threads_handling.rs | 35 +++ 9 files changed, 469 insertions(+), 331 deletions(-) create mode 100644 client-network/src/threads_handling.rs diff --git a/client-gui/src/gui_app.rs b/client-gui/src/gui_app.rs index 8f73e6c..05a17fe 100644 --- a/client-gui/src/gui_app.rs +++ b/client-gui/src/gui_app.rs @@ -120,10 +120,7 @@ impl eframe::App for P2PClientApp { self.known_peers = peers; } - NetworkEvent::FileTreeReceived(_peer_id, _) => { - todo!(); - - //self.loaded_tree_nodes.insert(_peer_id, tree); + NetworkEvent::FileTreeReceived(node_hash, merklenode) => { //self.status_message = "🔄 File tree updated successfully.".to_string(); } NetworkEvent::FileTreeRootReceived(peer_id, root_hash) => { @@ -406,9 +403,10 @@ impl eframe::App for P2PClientApp { _ => {} } if ui.button("Send Ping").clicked() { - let res = self - .network_cmd_tx - .send(NetworkCommand::Ping(peer.0.to_string())); + let res = self.network_cmd_tx.send(NetworkCommand::Ping( + peer.0.to_string(), + self.connected_address.clone(), + )); } if ui.button("Send Nat Traversal Request").clicked() { match self.network_cmd_tx.send(NetworkCommand::NatTraversal( diff --git a/client-network/src/datum_parsing.rs b/client-network/src/datum_parsing.rs index aff0dde..88bedd8 100644 --- a/client-network/src/datum_parsing.rs +++ b/client-network/src/datum_parsing.rs @@ -6,28 +6,20 @@ const DIRECTORY: u8 = 1; const BIG: u8 = 2; const BIGDIRECTORY: u8 = 3; -fn parse_received_datum(recevied_datum: Vec, datum_length: usize, mut tree: MerkleTree) { - if datum_length > recevied_datum.len() { - return; - } - if datum_length < 32 + 64 { - return; - } +pub fn parse_received_datum( + recevied_datum: Vec, + datum_length: usize, +) -> Option<([u8; 32], MerkleNode)> { let hash_name: [u8; 32] = recevied_datum[..32].try_into().expect("error"); let sigstart = datum_length - 64; let value = &recevied_datum[32..sigstart]; let value_slice = value.to_vec(); - let signature: [u8; 32] = recevied_datum[sigstart..datum_length] - .try_into() - .expect("Taille incorrecte"); let datum_type = value_slice[0]; match datum_type { - CHUNK => { - tree.data.insert( - hash_name, - MerkleNode::Chunk(crate::ChunkNode { data: value_slice }), - ); - } + CHUNK => Some(( + hash_name, + MerkleNode::Chunk(crate::ChunkNode { data: value_slice }), + )), DIRECTORY => { let nb_entries = value_slice[1]; let mut dir_entries = Vec::new(); @@ -46,23 +38,28 @@ fn parse_received_datum(recevied_datum: Vec, datum_length: usize, mut tree: let current = DirectoryNode::new(dir_entries); match current { - Ok(current_node) => { - tree.data - .insert(hash_name, MerkleNode::Directory(current_node)); - } + Ok(current_node) => Some((hash_name, MerkleNode::Directory(current_node))), Err(e) => { println!("{}", e); + None } } } BIG => { let chlidren: Vec = Vec::new(); + Some(( + hash_name, + MerkleNode::Big(crate::BigNode { + children_hashes: chlidren, + }), + )) + /*let chlidren: Vec = Vec::new(); tree.data.insert( hash_name, MerkleNode::Big(crate::BigNode { children_hashes: chlidren, }), - ); + );*/ } BIGDIRECTORY => { let nb_entries = value_slice[1]; @@ -82,15 +79,13 @@ fn parse_received_datum(recevied_datum: Vec, datum_length: usize, mut tree: let current = BigDirectoryNode::new(dir_entries); match current { - Ok(current_node) => { - tree.data - .insert(hash_name, MerkleNode::BigDirectory(current_node)); - } + Ok(current_node) => Some((hash_name, MerkleNode::BigDirectory(current_node))), Err(e) => { println!("{}", e); + None } } } - _ => {} + _ => None, } } diff --git a/client-network/src/lib.rs b/client-network/src/lib.rs index 6b7f9d2..a80b9c0 100644 --- a/client-network/src/lib.rs +++ b/client-network/src/lib.rs @@ -7,17 +7,19 @@ mod messages_structure; mod peers_refresh; mod registration; mod server_communication; +mod threads_handling; use crate::{ cryptographic_signature::CryptographicSignature, message_handling::EventType, - messages_channels::{MultipleSenders, start_receving_thread}, + messages_channels::{MultipleSenders, start_receving_thread, start_retry_thread}, messages_structure::{ - NATTRAVERSALREQUEST, NATTRAVERSALREQUEST2, ROOTREQUEST, construct_message, + NATTRAVERSALREQUEST, NATTRAVERSALREQUEST2, PING, ROOTREQUEST, construct_message, }, peers_refresh::HandshakeHistory, registration::{parse_addresses, perform_handshake, register_with_the_server}, server_communication::{generate_id, get_peer_list}, + threads_handling::Worker, }; use std::{ io::Error, @@ -35,6 +37,7 @@ pub struct P2PSharedData { shared_senders: Arc, server_name: Arc>, handshake_peers: Arc, + threads: Vec, } use bytes::Bytes; @@ -53,10 +56,16 @@ impl P2PSharedData { let shared_cryptopair = Arc::new(crypto_pair); let shared_messageslist = Arc::new(Mutex::new(messages_list)); - let senders = MultipleSenders::new(1, &shared_socket, cmd_tx); + let mut threads = Vec::new(); + + let senders = MultipleSenders::new(1, &shared_socket, cmd_tx, &mut threads); let shared_senders = Arc::new(senders); let server_name = Arc::new(Mutex::new("".to_string())); let handhsake_peers = Arc::new(HandshakeHistory::new()); + + let worker = handhsake_peers.update_handshake(); + + threads.push(worker); Ok(P2PSharedData { shared_socket: shared_socket, shared_cryptopair: shared_cryptopair, @@ -64,6 +73,7 @@ impl P2PSharedData { shared_senders: shared_senders, server_name: server_name, handshake_peers: handhsake_peers, + threads, }) } pub fn socket(&self) -> Arc { @@ -110,6 +120,15 @@ impl P2PSharedData { let mut map = self.shared_messageslist.lock().unwrap(); map.insert(id, evt); } + pub fn threads(&mut self) -> &mut Vec { + &mut self.threads + } + + pub fn close_threads(&mut self) { + for w in self.threads.drain(..) { + w.stop(); + } + } } /// Messages sent to the Network thread by the GUI. @@ -118,7 +137,7 @@ pub enum NetworkCommand { ServerHandshake(String, String), // ServerName FetchPeerList(String), // ServerIP RegisterAsPeer(String), - Ping(String), + Ping(String, String), NatTraversal(String, String), ConnectPeer((String, bool)), // IP:PORT RequestFileTree(String), // peer_id @@ -139,7 +158,7 @@ pub enum NetworkEvent { Error(String), PeerConnected(String), PeerListUpdated(Vec<(String, bool)>), - FileTreeReceived(String, Vec), // peer_id, content + FileTreeReceived([u8; 32], MerkleNode), // peer_id, content DataReceived(String, MerkleNode), FileTreeRootReceived(String, NodeHash), HandshakeFailed(), @@ -188,8 +207,14 @@ pub fn start_p2p_executor( match cmd { NetworkCommand::ServerHandshake(username, ip) => { println!("server handshake called"); - if let Some(sd) = shared_data.as_ref() { + if let Some(sd) = shared_data.as_mut() { start_receving_thread(sd, event_tx.clone(), &handshake_clone); + start_retry_thread( + sd.senders(), + 4, + sd.messages_list(), + sd.threads().as_mut(), + ); let res = perform_handshake(&sd, username, ip, event_tx.clone(), true).await; } else { @@ -215,24 +240,32 @@ pub fn start_p2p_executor( }; match res { Some(peerinfo) => { + let id = generate_id(); // envoyer un root request let rootrequest = construct_message( ROOTREQUEST, Vec::new(), - generate_id(), + id, sd.cryptopair_ref(), ); + println!("matching"); match rootrequest { None => {} Some(resp_msg) => { + sd.add_message(id, EventType::RootRequest); println!("msg_sent:{:?}", resp_msg); - sd.senders_ref().send_via( - 0, + sd.senders_ref().add_message_to_retry_queue( + resp_msg.clone(), + peerinfo.ip.to_string(), + false, + ); + + sd.senders_ref().send_dispatch( resp_msg, peerinfo.ip.to_string(), false, - sd.messages_list_ref(), + sd.messages_list(), ); } } @@ -336,7 +369,32 @@ pub fn start_p2p_executor( NetworkCommand::RegisterAsPeer(_) => { println!("[Network] RegisterAsPeer() called"); } - NetworkCommand::Ping(String) => { + NetworkCommand::Ping(str, ip) => { + if let Some(sd) = shared_data.as_ref() { + let id = generate_id(); + sd.add_message(id, EventType::Ping); + let pingrequest = + construct_message(PING, Vec::new(), id, sd.cryptopair_ref()); + let peer_address = get_socket_address(str, ip).await; + match peer_address { + Some(addr) => { + if let Some(ping) = pingrequest { + sd.senders_ref().add_message_to_retry_queue( + ping.clone(), + addr.to_string(), + false, + ); + sd.senders_ref().send_dispatch( + ping, + addr.to_string(), + false, + sd.messages_list(), + ); + } + } + None => {} + } + } println!("[Network] Ping() called"); } NetworkCommand::Disconnect() => { @@ -372,19 +430,18 @@ pub fn start_p2p_executor( let natreq = construct_message( NATTRAVERSALREQUEST, - server_addr.to_string().into_bytes(), + payload.clone(), generate_id(), &sd.cryptopair(), ); - sd.senders_ref().send_via( - 0, + sd.senders_ref().send_dispatch( natreq.expect( "couldnt construct message nattraversalrequest2", ), server_addr.to_string(), false, - sd.messages_list_ref(), + sd.messages_list(), ); } None => { diff --git a/client-network/src/message_handling.rs b/client-network/src/message_handling.rs index dcbea93..d7e8727 100644 --- a/client-network/src/message_handling.rs +++ b/client-network/src/message_handling.rs @@ -3,20 +3,31 @@ use crate::{ cryptographic_signature::{ CryptographicSignature, get_peer_key, sign_message, verify_signature, }, + datum_parsing::parse_received_datum, messages_channels::MultipleSenders, messages_structure::construct_message, peers_refresh::HandshakeHistory, registration, server_communication::generate_id, }; -use std::{collections::HashMap, net::SocketAddr}; +use std::{ + collections::HashMap, + net::{Ipv4Addr, SocketAddr}, +}; use std::{ net::IpAddr, sync::{Arc, Mutex}, }; +// Types of messages that await for a response +#[derive(Debug)] pub enum EventType { - SendRootRequest, + HelloThenRootRequest, + Hello, + RootRequest, + Ping, + NatTraversal, + DatumRequest, } const ID: usize = 4; @@ -86,80 +97,14 @@ pub fn handle_recevied_message( None => {} Some(resp_msg) => { println!("msg_sent:{:?}", resp_msg); - senders.send_via( - 0, + senders.send_dispatch( resp_msg, ip.to_string(), is_resp_to_server_handshake, - messages_list, + messages_list.clone(), ); } } - - // Lock the mutex to access the HashMap - /*let list = messages_list.lock().unwrap(); - - let eventtype = list.get(&id); // Clone the enum so we can release the lock if needed - match eventtype { - Some(EventType::ServerHelloReply) => { - /*registration::register_ip_addresses( - crypto_pair, - socket_addr.to_string(), - senders, - &messages_list, // Pass the mutable reference inside the lock - 546, - );*/ - } - Some(_) => print!("Not implemented"), - None => { - let message_type = recevied_message[4]; - - // Handle handshake - if message_type == 1 { - let mut resp_to_serv = false; - println!("verify the signature"); - let parsed_received_message = HandshakeMessage::parse(recevied_message.to_vec()); - let received_name = String::from_utf8(parsed_received_message.name).expect("error"); - let peer_pubkey = tokio::runtime::Runtime::new() - .unwrap() - .block_on(get_peer_key(&received_name)) - .expect("failed to retrieve public key"); - - if received_name == server_name.to_string() { - resp_to_serv = true; - } - - if !verify_signature(peer_pubkey, recevied_message) { - println!( - "incorrect signature from given peer: {}, ignoring message {}", - &received_name, id - ); - } else { - // verify if this is a server handshake request - let username_size = crypto_pair.username.len(); - let hello_handshake = HandshakeMessage::helloReply( - id as u32, - username_size as u16 + 4, - crypto_pair.username.clone(), - ); - //HandshakeMessage::display(&hello_handshake); - let hello_handshake_serialized = hello_handshake.serialize(); - let message_signed = sign_message(crypto_pair, &hello_handshake_serialized); - senders.send_via(0, message_signed, socket_addr.to_string(), resp_to_serv); - let mut list = messages_list.lock().expect("Failed to lock messages_list"); - match list.get(&id) { - Some(_) => { - list.remove(&id); - } - None => { - list.insert(id, EventType::ServerHelloReply); - } - } - } - } - print!("Message not found for ID: {}", id) - } - }*/ } pub fn parse_message( @@ -188,7 +133,7 @@ pub fn parse_message( let msg_length = u16::from_be_bytes(length_bytes) as usize; // verify signature match msgtype { - HELLO | HELLOREPLY | NODATUM | NATTRAVERSALREQUEST | NATTRAVERSALREQUEST2 => { + HELLO | HELLOREPLY => { let ilength = u16::from_be_bytes(length_bytes); println!("name received length: {}", ilength); let received_name = &received_message[LENGTH + EXTENSIONS..LENGTH + ilength as usize]; @@ -227,7 +172,7 @@ pub fn parse_message( } } } - ROOTREPLY => { + ROOTREPLY | NODATUM | NATTRAVERSALREQUEST | NATTRAVERSALREQUEST2 => { let ilength = u16::from_be_bytes(length_bytes); println!("name received length: {}", ilength); if let Some(peerinfo) = handhsake_history.get_peer_info_ip(ip.to_string()) { @@ -257,6 +202,20 @@ pub fn parse_message( } // // OK + OK => { + let mut guard = messages_list.lock().unwrap(); + let res = guard.get(&id); + match res { + Some(ev) => { + println!("{:?}", ev); + let _ = &guard.remove_entry(&id); + println!("message {} retirĂ© de la liste", id); + } + None => { + println!("ping non trouvĂ©"); + } + } + } // // rien ? // si NATTRAVERSALREQUEST alors @@ -276,12 +235,11 @@ pub fn parse_message( crypto_pair, ); - senders.send_via( - 0, + senders.send_dispatch( natreq2.expect("couldnt construct message nattraversalrequest2"), address, false, - &messages_list, + messages_list.clone(), ); } @@ -291,17 +249,33 @@ pub fn parse_message( let ilength = u16::from_be_bytes(length_bytes); let received_address = &received_message[LENGTH..LENGTH + ilength as usize]; - let address = String::from_utf8(received_address.to_vec()).expect("wrong name"); + println!("received_address:{:?}", received_message); + let addressv4 = IpAddr::V4(Ipv4Addr::from_octets( + received_address[0..4].try_into().expect("incorrect size"), + )); + let address = SocketAddr::new( + addressv4, + u16::from_be_bytes(received_address[4..6].try_into().expect("incorrect size")), + ); + + println!("ip: {}", address); let pingreq = construct_message(PING, Vec::new(), id, crypto_pair); - senders.send_via( - 0, - pingreq.expect("couldnt construct message ping request"), - address, + senders.send_dispatch( + constructed_message.expect("couldnt construct message ping request"), + ip.to_string(), false, - &messages_list, + messages_list.clone(), ); + + senders.send_dispatch( + pingreq.expect("couldnt construct message ping request"), + address.to_string(), + false, + messages_list.clone(), + ); + constructed_message = None; } // // ERROR @@ -325,6 +299,18 @@ pub fn parse_message( HELLO => { let mut payload = Vec::new(); + let received_length = u16::from_be_bytes( + received_message[TYPE..LENGTH] + .try_into() + .expect("incorrect size"), + ); + let received_username = + &received_message[LENGTH + EXTENSIONS..LENGTH + received_length as usize]; + handhsake_history.update_peer_info( + ip.to_string(), + String::from_utf8(received_username.to_vec()).expect("invalid conversion"), + ); + payload.extend_from_slice(&0u32.to_be_bytes()); payload.extend_from_slice(&crypto_pair.username.clone().as_bytes()); @@ -350,21 +336,30 @@ pub fn parse_message( String::from_utf8(received_username.to_vec()).expect("invalid conversion"), ); // verifie s'il faut renvoyer un root request - let guard = messages_list.lock().expect("Échec du verrouillage"); + let mut guard = messages_list.lock().expect("Échec du verrouillage"); let res = guard.get(&id); match res { Some(ev) => { match ev { - EventType::SendRootRequest => { + EventType::HelloThenRootRequest => { // envoyer la root request + let _ = &guard.remove_entry(&id); + println!("message {} retirĂ© de la liste", id); + let rootrequest = construct_message( ROOTREQUEST, Vec::new(), generate_id(), crypto_pair, ); + //&guard.insert(, v) return rootrequest; } + EventType::Hello => { + let _ = &guard.remove_entry(&id); + println!("message {} retirĂ© de la liste", id); + } + _ => {} } } None => {} @@ -382,15 +377,45 @@ pub fn parse_message( let peers_exist = handhsake_history.get_peer_info_ip(ip.to_string()); match peers_exist { Some(peerinfo) => { - // envoyer le hash a la gui - let received_hash: NodeHash = received_message[LENGTH..(32 + LENGTH)] - .try_into() - .expect("incorrect size"); - let res = cmd_tx_clone.send(NetworkEvent::FileTreeRootReceived( - peerinfo.username.clone(), - received_hash, - )); - println!("file tree sent") + let mut guard = messages_list.lock().expect("Échec du verrouillage"); + let res = guard.get(&id); + match res { + Some(ev) => { + match ev { + EventType::RootRequest => { + // envoyer la root request + let _ = &guard.remove_entry(&id); + println!("message {} retirĂ© de la liste", id); + + // envoyer le hash a la gui + let received_hash: NodeHash = received_message + [LENGTH..(32 + LENGTH)] + .try_into() + .expect("incorrect size"); + let res = + cmd_tx_clone.send(NetworkEvent::FileTreeRootReceived( + peerinfo.username.clone(), + received_hash, + )); + println!("file tree sent"); + // envoyer un datum + let mut payload = Vec::new(); + payload.extend_from_slice(&received_hash); + let new_id = generate_id(); + let datumreqest = construct_message( + DATUMREQUEST, + payload, + new_id, + crypto_pair, + ); + constructed_message = datumreqest; + guard.insert(new_id, EventType::DatumRequest); + } + _ => {} + } + } + None => {} + } } None => { eprintln!("no peers found"); @@ -407,7 +432,35 @@ pub fn parse_message( // affiche un msg d'erreur // // DATUM - // + DATUM => { + let mut guard = messages_list.lock().expect("Échec du verrouillage"); + let res = guard.get(&id); + match res { + Some(ev) => match ev { + EventType::DatumRequest => { + let _ = &guard.remove_entry(&id); + println!("message {} retirĂ© de la liste", id); + let received_length = u16::from_be_bytes( + received_message[TYPE..LENGTH] + .try_into() + .expect("incorrect size"), + ); + let received_datum = &received_message[LENGTH..]; + let parsed_node = + parse_received_datum(received_datum.to_vec(), received_length as usize); + match parsed_node { + Some(tuple) => { + let _ = + cmd_tx.send(NetworkEvent::FileTreeReceived(tuple.0, tuple.1)); + } + None => {} + } + } + _ => {} + }, + None => {} + } + } // parcourt le directory recu ou le big directory et renvoie une DATUMREQUEST pour chaque // directory ou big directory lu // diff --git a/client-network/src/messages_channels.rs b/client-network/src/messages_channels.rs index 83c3cee..e0a603e 100644 --- a/client-network/src/messages_channels.rs +++ b/client-network/src/messages_channels.rs @@ -1,9 +1,14 @@ +use crossbeam_channel::Receiver; +use tokio::time::sleep; + use crate::P2PSharedData; use crate::cryptographic_signature::CryptographicSignature; use crate::message_handling::EventType; use crate::message_handling::handle_recevied_message; use crate::peers_refresh::HandshakeHistory; -use std::collections::HashMap; +use crate::threads_handling::Worker; +use std::collections::{HashMap, HashSet}; +use std::hash::Hash; use std::net::SocketAddr; use std::net::UdpSocket; use std::sync::{Arc, Mutex}; @@ -12,15 +17,12 @@ use std::sync::mpsc::{self, Sender}; use std::thread; use std::collections::VecDeque; +use std::time::SystemTime; +use std::time::UNIX_EPOCH; use std::time::{Duration, Instant}; use crate::NetworkEvent; -pub struct MultipleSenders { - senders: Vec>, - response_channel: crossbeam_channel::Sender, -} - pub struct Message { pub payload: Vec, pub address: String, @@ -30,175 +32,77 @@ pub struct Message { struct RetryMessage { msg: Message, attempts: u8, - next_try: Instant, + next_try: u64, +} + +pub struct MultipleSenders { + sender: crossbeam_channel::Sender, + receiver: crossbeam_channel::Receiver, + response_channel: crossbeam_channel::Sender, + retry_queue: Arc>>, + completed_messages: HashSet, } impl MultipleSenders { - /*pub fn new(num_channels: usize, socket: &Arc) -> Self { - let mut senders = Vec::new(); - - // Wrap the socket in an Arc so it can be shared across threads - - for i in 0..num_channels { - let (tx, rx) = mpsc::channel::(); - - // Clone the Arc (this just bumps the reference count, it doesn't copy the socket) - let sock_clone = Arc::clone(&socket); - - senders.push(tx); - - thread::spawn(move || { - println!("Canal d'envoi {} prĂȘt", i); - - for msg in rx { - // Use the cloned Arc inside the thread - if let Err(e) = sock_clone.send_to(&msg.payload, &msg.address) { - eprintln!( - "Erreur d'envoi sur canal {}: {}, address: {}", - i, e, &msg.address - ); - } else { - let message_id: [u8; 4] = msg.payload[0..4].try_into().expect("size error"); - let id = i32::from_be_bytes(message_id); - let message_type = msg.payload[4]; - println!( - "Message {0} de type {1} envoyĂ© Ă  {2} par le canal {3}", - id, message_type, msg.address, i - ); - } - } - }); - } - - MultipleSenders { senders } - }*/ - pub fn new( num_channels: usize, socket: &Arc, cmd_tx: crossbeam_channel::Sender, + threads: &mut Vec, ) -> Self { - let mut senders = Vec::new(); + let (tx1, rx1) = crossbeam_channel::unbounded(); for i in 0..num_channels { - let (tx, rx) = mpsc::channel::(); let sock_clone = Arc::clone(&socket); let cmd_tx_clone = cmd_tx.clone(); + let rx: Receiver = rx1.clone(); - senders.push(tx); - - thread::spawn(move || { + let thread = thread::spawn(move || { println!("Canal d'envoi {} prĂȘt", i); - let mut queue: VecDeque = VecDeque::new(); - let max_attempts = 5; - loop { // PrioritĂ© aux messages en attente prĂȘts Ă  ĂȘtre rĂ©essayĂ©s - if let Some(front) = queue.front() { - if front.next_try <= Instant::now() { - // On prend le message de la queue - let mut item = queue.pop_front().unwrap(); - match sock_clone.send_to(&item.msg.payload, &item.msg.address) { - Ok(_) => { - if (&item).msg.is_resp_to_server_handshake { - let res = - cmd_tx_clone.send(NetworkEvent::ConnectedHandshake()); - } - let message_id: [u8; 4] = - item.msg.payload[0..4].try_into().expect("size error"); - let id = i32::from_be_bytes(message_id); - let message_type = item.msg.payload[4]; - println!( - "Message {0} de type {1} envoyĂ© Ă  {2} par le canal {3} (retry {4})", - id, message_type, item.msg.address, i, item.attempts - ); - } - Err(e) => { - item.attempts += 1; - if item.attempts >= max_attempts { - let str = format!( - "Abandon du message aprĂšs {} tentatives sur canal {}: {}, address: {}", - item.attempts, i, e, item.msg.address - ); - if (&item).msg.is_resp_to_server_handshake { - let res = cmd_tx_clone - .send(NetworkEvent::ServerHandshakeFailed(str)); - } - } else { - // Backoff exponentiel simple - let backoff = Duration::from_millis( - 2000u64.saturating_pow(item.attempts as u32), - ); - item.next_try = Instant::now() + backoff; - eprintln!( - "Erreur d'envoi sur canal {}: {}, reprogrammation dans {:?}, tentative {}", - i, e, backoff, item.attempts - ); - queue.push_front(item); // remettre en tĂȘte pour rĂ©essayer plus tĂŽt - } - } - } - continue; - } - } - // Si aucun retry prĂȘt, on bloque sur rx avec timeout court, pour pouvoir traiter les timers - match rx.recv_timeout(Duration::from_millis(200)) { - Ok(msg) => { - // On tente d'envoyer immĂ©diatement - match sock_clone.send_to(&msg.payload, &msg.address) { - Ok(_) => { - if msg.is_resp_to_server_handshake { - let res = - cmd_tx_clone.send(NetworkEvent::ConnectedHandshake()); - } - let message_id: [u8; 4] = - msg.payload[0..4].try_into().expect("size error"); - let id = i32::from_be_bytes(message_id); - let message_type = msg.payload[4]; - println!( - "Message {0} de type {1} envoyĂ© Ă  {2} par le canal {3}", - id, message_type, msg.address, i - ); - } - Err(e) => { - eprintln!( - "Erreur d'envoi initial sur canal {}: {}, address: {} -- mise en queue pour retry", - i, e, &msg.address - ); - let retry = RetryMessage { - msg, - attempts: 1, - next_try: Instant::now() + Duration::from_millis(100), - }; - queue.push_back(retry); - } + let msg = rx.recv().unwrap(); + match sock_clone.send_to(&msg.payload, &msg.address) { + Ok(_) => { + if msg.is_resp_to_server_handshake { + let res = cmd_tx_clone.send(NetworkEvent::ConnectedHandshake()); } - } - Err(mpsc::RecvTimeoutError::Timeout) => { - // Permet de vĂ©rifier la queue Ă  nouveau - continue; - } - Err(mpsc::RecvTimeoutError::Disconnected) => { - // Le sender a Ă©tĂ© fermĂ© ; vider la queue et sortir - eprintln!( - "Sender fermĂ© pour le canal {}, fermeture du thread d'envoi", - i + let message_id: [u8; 4] = + msg.payload[0..4].try_into().expect("size error"); + let id = i32::from_be_bytes(message_id); + let message_type = msg.payload[4]; + println!( + "Message {0} de type {1} envoyĂ© Ă  {2} par le canal {3}", + id, message_type, msg.address, i + ); + } + Err(e) => { + eprintln!( + "Erreur d'envoi initial sur canal {}: {}, address: {}", + i, e, &msg.address ); - break; } } } }); + + threads.push(Worker::spawn( + thread, + crate::threads_handling::WorkerType::MSGSENDER, + )); } MultipleSenders { - senders, + sender: tx1, + receiver: rx1, response_channel: cmd_tx.clone(), + retry_queue: Arc::new(Mutex::new(VecDeque::new())), + completed_messages: HashSet::new(), } } - + /* /// Envoie un message via un canal spĂ©cifique (round-robin ou index prĂ©cis) pub fn send_via( &self, @@ -226,44 +130,136 @@ impl MultipleSenders { let id = i32::from_be_bytes(message_id); guard.insert(id, EventType::SendRootRequest); } + }*/ + + pub fn send_dispatch( + &self, + data: Vec, + remote_addr: String, + is_resp_to_server_handshake: bool, + messages_list: Arc>>, + ) { + let msg_to_send = Message { + payload: data.clone(), + address: remote_addr, + is_resp_to_server_handshake, + }; + let _ = self.sender.send(msg_to_send); + println!("message sent"); } - /*pub fn start_receving_thread( - socket: &Arc, - messages_list: &Arc>, - crypto_pair: &Arc, - socket_addr: SocketAddr, - senders: &Arc, + pub fn add_message_to_retry_queue( + &self, + data: Vec, + remote_addr: String, + is_resp_to_server_handshake: bool, ) { - let sock_clone = Arc::clone(socket); - let cryptopair_clone = Arc::clone(crypto_pair); - let senders_clone = Arc::clone(senders); + let msg_to_send = Message { + payload: data.clone(), + address: remote_addr, + is_resp_to_server_handshake, + }; + let base: u64 = 2; + let attempts = 1; + let backoff = base.saturating_pow(attempts); // 2^1 == 2 seconds + let newretry = RetryMessage { + next_try: SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("clock") + .as_secs() + + backoff, + msg: msg_to_send, + attempts: attempts as u8, + }; - let messages_clone = Arc::clone(messages_list); - thread::spawn(move || { - let mut buf = [0u8; 1024]; + let mut guard = self.retry_queue.lock().unwrap(); + guard.push_back(newretry); + } +} - loop { - match sock_clone.recv_from(&mut buf) { - Ok((amt, src)) => { - handle_recevied_message( - &messages_clone, - &buf.to_vec(), - &cryptopair_clone, - &socket_addr, - &senders_clone, - ); - println!("Reçu {} octets de {}: {:?}", amt, src, &buf[..amt]); +pub fn start_retry_thread( + senders: Arc, + max_attempts: u8, + messages_list: Arc>>, + threads: &mut Vec, +) { + let thread = thread::spawn(move || { + loop { + thread::sleep(Duration::from_millis(100)); + let mut q = senders.retry_queue.lock().unwrap(); + //println!("size of retry thread: {}", q.len()); + if let Some(front) = q.pop_front() { + // on verifie si le message a recu une reponse + let message_id: [u8; 4] = front.msg.payload[0..4].try_into().expect("size error"); + let id = i32::from_be_bytes(message_id); + let message_type = front.msg.payload[4]; + let guard = messages_list.lock().unwrap(); + + if guard.contains_key(&id) { + drop(guard); + // si le message est n'a pas encore a etre traitĂ©, on le + // remet en queue de liste + if front.next_try + <= SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("clock") + .as_secs() + { + let attempt = front.attempts + 1; + if attempt >= max_attempts { + let str = format!( + "Abandon du message {} de type {} aprĂšs {} tentatives, address: {}", + id, message_type, front.attempts, front.msg.address + ); + println!("{}", str); + if front.msg.is_resp_to_server_handshake { + let res = senders + .response_channel + .send(NetworkEvent::ServerHandshakeFailed(str)); + } + } else { + let str = format!( + "Reemission du message {} de type {}, {} tentatives, address: {}", + id, message_type, front.attempts, front.msg.address + ); + println!("{}", str); + + senders.send_dispatch( + front.msg.payload.clone(), + front.msg.address.clone(), + front.msg.is_resp_to_server_handshake, + messages_list.clone(), + ); + let base: u64 = 2; + + let backoff = base.saturating_pow(attempt as u32); // 2^1 == 2 seconds + let newretry = RetryMessage { + next_try: SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("clock") + .as_secs() + + backoff, + msg: front.msg, + attempts: attempt, + }; + + q.push_back(newretry); // remettre en tĂȘte pour rĂ©essayer plus tĂŽt + } + } else { + q.push_back(front); // remettre en tĂȘte pour rĂ©essayer plus tĂŽt } - Err(e) => eprintln!("Erreur de rĂ©ception: {}", e), } } - }); - }*/ + } + }); + threads.push(Worker::spawn( + thread, + crate::threads_handling::WorkerType::MSGRETRY, + )); } pub fn start_receving_thread( - shared_data: &P2PSharedData, + shared_data: &mut P2PSharedData, cmd_tx: crossbeam_channel::Sender, handshake_history: &Arc>, ) { @@ -274,7 +270,7 @@ pub fn start_receving_thread( let servername_clone = shared_data.servername(); let handshake_clone = handshake_history.clone(); - thread::spawn(move || { + let thread = thread::spawn(move || { let mut buf = [0u8; 1024]; loop { match sock_clone.recv_from(&mut buf) { @@ -297,4 +293,8 @@ pub fn start_receving_thread( } } }); + shared_data.threads.push(Worker::spawn( + thread, + crate::threads_handling::WorkerType::MSGRECEPTION, + )); } diff --git a/client-network/src/messages_structure.rs b/client-network/src/messages_structure.rs index 115b19c..95b451f 100644 --- a/client-network/src/messages_structure.rs +++ b/client-network/src/messages_structure.rs @@ -48,18 +48,23 @@ pub fn construct_message( return Some(message); } ERROR | DATUMREQUEST => { - message.extend_from_slice(&payload.len().to_be_bytes()); + let a = payload.len() as u16; + println!("payload size:{}", a); + message.extend_from_slice(&a.to_be_bytes()); message.extend_from_slice(&payload); return Some(message); } ROOTREPLY | NODATUM | DATUM | NATTRAVERSALREQUEST => { println!("payload:{:?}", &payload); - message.extend_from_slice(&(payload.len() as u16).to_be_bytes()); + let a = payload.len() as u16; + println!("payload size:{}", a); + message.extend_from_slice(&a.to_be_bytes()); message.extend_from_slice(&payload); println!("payload:{:?}", &message); let signature = sign_message(crypto_pair, &message); message.extend_from_slice(&signature); - return Some(message); + println!("message_to_send_len:{}", &message.len()); + return Some(signature); } _ => {} diff --git a/client-network/src/peers_refresh.rs b/client-network/src/peers_refresh.rs index ea70c4a..9153fba 100644 --- a/client-network/src/peers_refresh.rs +++ b/client-network/src/peers_refresh.rs @@ -7,11 +7,11 @@ use std::{ ops::Add, process::Command, sync::{Arc, Mutex}, - thread, + thread::{self, JoinHandle}, time::{self, Duration, SystemTime}, }; -use crate::NetworkEvent; +use crate::{NetworkEvent, threads_handling::Worker}; use crate::{ P2PSharedData, construct_message, generate_id, messages_structure, registration::perform_handshake, @@ -65,21 +65,17 @@ impl HandshakeHistory { self.ip_k_peerinfo_v.get(&ip).clone() } - pub fn update_handshake(&self) { - // clone the map so we own it (cheap if PeerInfo is Clone) + pub fn update_handshake(&self) -> Worker { let map_clone: Arc> = Arc::new(self.username_k_peerinfo_v.clone()); - //let map_ip_clone: Arc> = Arc::new(self.ip_k_peerinfo_v.clone()); let map_for_thread = Arc::clone(&map_clone); - thread::spawn(move || { + let handle = thread::spawn(move || { loop { - // Arc> derefs to &HashMap so these reads work - for (peer, peerinfo) in map_for_thread.iter() { - // send ping to peerinfo - } + for (peer, peerinfo) in map_for_thread.iter() {} thread::sleep(Duration::from_secs(10)); } }); + Worker::spawn(handle, crate::threads_handling::WorkerType::PING) } pub fn update_peer_info(&mut self, ip: String, username: String) { diff --git a/client-network/src/registration.rs b/client-network/src/registration.rs index 36a6d6b..69035a2 100644 --- a/client-network/src/registration.rs +++ b/client-network/src/registration.rs @@ -60,7 +60,7 @@ pub async fn perform_handshake( println!("username: {}, ip: {}", username.clone(), ip.clone()); let crypto_pair = sd.cryptopair_ref(); let senders = sd.senders_ref(); - let messages_list = sd.messages_list_ref(); + let id = generate_id(); let server_addr_query = get_socket_address(username.clone(), ip.clone()); match server_addr_query.await { @@ -73,12 +73,11 @@ pub async fn perform_handshake( let hello_handshake = construct_message(1, payload, id, crypto_pair); match hello_handshake { Some(handshake_message) => { - senders.send_via( - 0, + senders.send_dispatch( handshake_message, sockaddr_bytes.to_string(), is_server_handshake, - messages_list, + sd.messages_list(), ); } None => {} diff --git a/client-network/src/threads_handling.rs b/client-network/src/threads_handling.rs new file mode 100644 index 0000000..ec6413f --- /dev/null +++ b/client-network/src/threads_handling.rs @@ -0,0 +1,35 @@ +use std::sync::{ + Arc, + atomic::{AtomicBool, Ordering}, +}; +use std::thread::JoinHandle; + +pub enum WorkerType { + MSGRECEPTION, + MSGSENDER, + PING, + MSGRETRY, +} + +pub struct Worker { + thread: Option>, + stop: Arc, + workertype: WorkerType, +} + +impl Worker { + pub fn spawn(thread: JoinHandle<()>, workertype: WorkerType) -> Self { + Worker { + stop: Arc::new(AtomicBool::new(false)), + thread: Some(thread), + workertype, + } + } + + pub fn stop(mut self) { + self.stop.store(true, Ordering::Relaxed); + if let Some(h) = self.thread.take() { + let _ = h.join(); + } + } +}