From b465608797c3741d444ad99a32ad4167c4016960 Mon Sep 17 00:00:00 2001 From: Tiago Batista Cardoso Date: Wed, 21 Jan 2026 23:58:15 +0100 Subject: [PATCH] splash --- client-network/src/lib.rs | 129 ++++++++++++++---------- client-network/src/message_handling.rs | 17 +++- client-network/src/messages_channels.rs | 5 + 3 files changed, 99 insertions(+), 52 deletions(-) diff --git a/client-network/src/lib.rs b/client-network/src/lib.rs index 3d44647..d8a7d65 100644 --- a/client-network/src/lib.rs +++ b/client-network/src/lib.rs @@ -26,6 +26,7 @@ use crate::{ threads_handling::Worker, }; use std::collections::HashSet; + use std::{ io::Error, net::{IpAddr, Ipv4Addr, UdpSocket}, @@ -36,6 +37,8 @@ use std::{ sync::{Arc, Mutex}, }; +type PendingMap = Arc>>>>; + pub struct P2PSharedData { shared_socket: Arc, shared_cryptopair: Arc, @@ -45,10 +48,13 @@ pub struct P2PSharedData { server_address: Arc>, handshake_peers: Arc, threads: Vec, + pub pending: PendingMap, } use bytes::Bytes; use reqwest::Client; +use tokio::sync::oneshot; +use tokio::time::timeout; impl P2PSharedData { pub fn new( @@ -64,6 +70,7 @@ impl P2PSharedData { let shared_messageslist = Arc::new(Mutex::new(messages_list)); let mut threads = Vec::new(); + let pending = Arc::new(Mutex::new(HashMap::new())); let senders = MultipleSenders::new(1, &shared_socket, cmd_tx, &mut threads); let shared_senders = Arc::new(senders); @@ -80,6 +87,7 @@ impl P2PSharedData { server_address: server_address, handshake_peers: handhsake_peers, threads, + pending, }) } pub fn socket(&self) -> Arc { @@ -552,48 +560,40 @@ pub fn start_p2p_executor( if let Some(sd) = shared_data.as_ref() { println!("username:{}, ip:{}", username, ip); // user server to send nattraversal request - let server_addr_query = get_socket_address( - sd.servername().clone(), - ip.clone(), - shared_data.as_ref(), - ); + let server_addr = sd.serveraddress(); let peer_addr_query = get_socket_address( username.clone(), ip.clone(), shared_data.as_ref(), ); - match server_addr_query.await { - Some(server_addr) => match peer_addr_query.await { - Some(peer_addr) => { - let payload = socket_addr_to_vec(server_addr); + match peer_addr_query.await { + Some(peer_addr) => { + let payload = socket_addr_to_vec( + SocketAddr::from_str(server_addr.as_str()) + .expect("couldnt parse server address"), + ); - print!("{:?}", payload.clone()); + print!("{:?}", payload.clone()); - let id = generate_id(); - let natreq = construct_message( - NATTRAVERSALREQUEST, - payload.clone(), - id.clone(), - &sd.cryptopair(), - ); + let id = generate_id(); + let natreq = construct_message( + NATTRAVERSALREQUEST, + payload.clone(), + id.clone(), + &sd.cryptopair(), + ); - sd.add_message(id, EventType::NatTraversal); - sd.senders_ref().send_dispatch( - natreq.expect( - "couldnt construct message nattraversalrequest2", - ), - server_addr.to_string(), - false, - sd.messages_list(), - ); - } - None => { - let err_msg = format!("failed to retreive socket address") - .to_string(); - let res = event_tx.send(NetworkEvent::Error(err_msg)); - } - }, + sd.add_message(id, EventType::NatTraversal); + sd.senders_ref().send_dispatch( + natreq.expect( + "couldnt construct message nattraversalrequest2", + ), + server_addr.to_string(), + false, + sd.messages_list(), + ); + } None => { let err_msg = format!("failed to retreive socket address").to_string(); @@ -641,10 +641,32 @@ fn parse_pack(s: &str) -> Option<[u8; 6]> { ]) } -fn quick_ping(addr: &SocketAddr, timeout_ms: u64) -> bool { - let connect = - std::net::TcpStream::connect_timeout(addr, std::time::Duration::from_millis(timeout_ms)); - connect.is_ok() +async fn quick_ping(addr: &SocketAddr, timeout_ms: u64, sd: &P2PSharedData) -> bool { + let id = generate_id(); + let pingreq = construct_message(PING, Vec::new(), id, &sd.shared_cryptopair) + .expect("couldn't build ping message"); + + let (tx, rx) = oneshot::channel(); + { + let mut pending = sd.pending.lock().expect("couldnt lock pending map"); + pending.insert(id, tx); + } + if let Err(e) = sd.socket().send_to(&pingreq, addr) { + // remove pending on send failure + let mut pending = sd.pending.lock().expect("couldnt lock pending map"); + pending.remove(&id); + eprintln!("send_to failed: {}", e); + return false; + } + + let dur = Duration::from_millis(timeout_ms); + let res = timeout(dur, rx).await; + + // cleanup pending (in case of timeout or channel closed) + let mut pending = sd.pending.lock().expect("couldn't lock pending map"); + pending.remove(&id); + + matches!(res, Ok(Ok(_))) } /// @@ -704,28 +726,33 @@ pub async fn get_socket_address( for addr in addresses { println!("trying address : {}", addr); - if quick_ping(&addr, 1000) { + if quick_ping(&addr, 1000, sd).await { return Some(addr); } // TODO: nat_traversal(&addr).await; // after NAT traversal attempt, ping again - let natreq = construct_message( - NATTRAVERSALREQUEST, - addr.to_string().into_bytes(), - generate_id(), - &sd.shared_cryptopair, - ); + //let payload = socket_addr_to_vec( + // SocketAddr::from_str(&sd.serveraddress().as_str()) + // .expect("couldnt parse server address"), + //); - sd.senders_ref().send_dispatch( - natreq.expect("couldnt construct message nattraversalrequest2"), - sd.serveraddress(), - false, - sd.messages_list().clone(), - ); + //let natreq = construct_message( + // NATTRAVERSALREQUEST, + // payload, + // generate_id(), + // &sd.shared_cryptopair, + //); - if quick_ping(&addr, 1500) { + //sd.senders_ref().send_dispatch( + // natreq.expect("couldnt construct message nattraversalrequest2"), + // sd.serveraddress(), + // false, + // sd.messages_list().clone(), + //); + + if quick_ping(&addr, 1500, sd).await { return Some(addr); } } diff --git a/client-network/src/message_handling.rs b/client-network/src/message_handling.rs index cd7896a..5e12db2 100644 --- a/client-network/src/message_handling.rs +++ b/client-network/src/message_handling.rs @@ -1,5 +1,7 @@ +use tokio::sync::oneshot; + use crate::{ - NetworkEvent, NodeHash, + NetworkEvent, NodeHash, P2PSharedData, cryptographic_signature::{ CryptographicSignature, get_peer_key, sign_message, verify_signature, }, @@ -50,6 +52,8 @@ const DATUM: u8 = 132; const NATTRAVERSALREQUEST: u8 = 4; const NATTRAVERSALREQUEST2: u8 = 5; +type PendingMap = Arc>>>>; + pub fn handle_recevied_message( messages_list: &Arc>>, recevied_message: &Vec, @@ -60,6 +64,7 @@ pub fn handle_recevied_message( cmd_tx: crossbeam_channel::Sender, ip: SocketAddr, handhsake_history: Arc, + pending: PendingMap, ) { if recevied_message.len() < 4 { return; @@ -68,6 +73,16 @@ pub fn handle_recevied_message( let message_id: [u8; 4] = recevied_message[0..4].try_into().expect("size error"); let id = i32::from_be_bytes(message_id); + let maybe_tx = { + let mut map = pending.lock().expect("couldnt lock pending map"); + map.remove(&id) + }; + + if let Some(tx) = maybe_tx { + let _ = tx.send(Vec::new()); // ignore send error if receiver dropped + return; + } + let mut is_resp_to_server_handshake = false; if recevied_message[4] == HELLO { diff --git a/client-network/src/messages_channels.rs b/client-network/src/messages_channels.rs index 8a90dda..b92b1fc 100644 --- a/client-network/src/messages_channels.rs +++ b/client-network/src/messages_channels.rs @@ -1,7 +1,9 @@ use crossbeam_channel::Receiver; +use tokio::sync::oneshot; use tokio::time::sleep; use crate::P2PSharedData; +use crate::PendingMap; use crate::cryptographic_signature::CryptographicSignature; use crate::message_handling::EventType; use crate::message_handling::handle_recevied_message; @@ -193,6 +195,7 @@ pub fn start_retry_thread( // on verifie si le message a recu une reponse let message_id: [u8; 4] = front.msg.payload[0..4].try_into().expect("size error"); let id = i32::from_be_bytes(message_id); + let message_type = front.msg.payload[4]; let guard = messages_list.lock().unwrap(); @@ -269,6 +272,7 @@ pub fn start_receving_thread( let senders_clone = shared_data.senders(); let messages_clone = shared_data.messages_list(); let servername_clone = shared_data.servername(); + let pending_clone = shared_data.pending.clone(); // Arc> let thread = thread::spawn(move || { let mut buf = [0u8; 1024]; loop { @@ -286,6 +290,7 @@ pub fn start_receving_thread( cmd_tx.clone(), src, handshake_history.clone(), + pending_clone.clone(), ); } Err(e) => eprintln!("Erreur de réception: {}", e),