From 524eaec76d8588bf3a0ea103acc68452b7a9fe69 Mon Sep 17 00:00:00 2001 From: Tiago Batista Cardoso Date: Thu, 22 Jan 2026 01:05:02 +0100 Subject: [PATCH] decent progress --- client-network/src/lib.rs | 65 ++++++++----------------- client-network/src/message_handling.rs | 9 ---- client-network/src/messages_channels.rs | 4 -- 3 files changed, 19 insertions(+), 59 deletions(-) diff --git a/client-network/src/lib.rs b/client-network/src/lib.rs index 6aae74e..fe47c71 100644 --- a/client-network/src/lib.rs +++ b/client-network/src/lib.rs @@ -32,8 +32,6 @@ use std::{ sync::{Arc, Mutex}, }; -type PendingMap = Arc>>>>; - pub struct P2PSharedData { shared_socket: Arc, shared_cryptopair: Arc, @@ -43,13 +41,12 @@ pub struct P2PSharedData { server_address: Arc>, handshake_peers: Arc, threads: Vec, - pub pending: PendingMap, } use bytes::Bytes; use reqwest::Client; use tokio::sync::oneshot; -use tokio::time::timeout; +use tokio::time::{sleep, timeout}; impl P2PSharedData { pub fn new( @@ -65,7 +62,6 @@ impl P2PSharedData { let shared_messageslist = Arc::new(Mutex::new(messages_list)); let mut threads = Vec::new(); - let pending = Arc::new(Mutex::new(HashMap::new())); let senders = MultipleSenders::new(1, &shared_socket, cmd_tx, &mut threads); let shared_senders = Arc::new(senders); @@ -82,7 +78,6 @@ impl P2PSharedData { server_address: server_address, handshake_peers: handhsake_peers, threads, - pending, }) } pub fn socket(&self) -> Arc { @@ -449,10 +444,7 @@ pub fn start_p2p_executor( match peer_addr_query.await { Some(peer_addr) => { - let payload = socket_addr_to_vec( - SocketAddr::from_str(server_addr.as_str()) - .expect("couldnt parse server address"), - ); + let payload = socket_addr_to_vec(peer_addr); print!("{:?}", payload.clone()); @@ -525,27 +517,15 @@ async fn quick_ping(addr: &SocketAddr, timeout_ms: u64, sd: &P2PSharedData) -> b let id = 42069; let pingreq = construct_message(PING, Vec::new(), id, &sd.shared_cryptopair); - let (tx, rx) = oneshot::channel(); - { - let mut pending = sd.pending.lock().expect("couldnt lock pending map"); - pending.insert(id, tx); - } - if let Some(ping) = pingreq { - sd.senders_ref() - .add_message_to_retry_queue(ping.clone(), addr.to_string(), false); + sd.add_message(id, EventType::Ping); sd.senders_ref() .send_dispatch(ping, addr.to_string(), false, sd.messages_list()); } - let dur = Duration::from_millis(timeout_ms); - let res = timeout(dur, rx).await; + sleep(Duration::from_millis(timeout_ms)).await; - // cleanup pending (in case of timeout or channel closed) - let mut pending = sd.pending.lock().expect("couldn't lock pending map"); - pending.remove(&id); - - matches!(res, Ok(Ok(_))) && !pending.contains_key(&id) + !sd.messages_list().lock().expect("yooo").contains_key(&id) } /// @@ -605,35 +585,28 @@ pub async fn get_socket_address( for addr in addresses { println!("trying address : {}", addr); - if quick_ping(&addr, 20000, sd).await { + if quick_ping(&addr, 10000, sd).await { return Some(addr); } // TODO: nat_traversal(&addr).await; // after NAT traversal attempt, ping again + let payload = socket_addr_to_vec(addr); + let id = generate_id(); - //let payload = socket_addr_to_vec( - // SocketAddr::from_str(&sd.serveraddress().as_str()) - // .expect("couldnt parse server address"), - //); + let natreq = construct_message(NATTRAVERSALREQUEST, payload.clone(), id, &sd.cryptopair()); - //let natreq = construct_message( - // NATTRAVERSALREQUEST, - // payload, - // generate_id(), - // &sd.shared_cryptopair, - //); + sd.add_message(id, EventType::NatTraversal); + sd.senders_ref().send_dispatch( + natreq.expect("couldnt construct message nattraversalrequest2"), + sd.serveraddress().to_string(), + false, + sd.messages_list(), + ); - //sd.senders_ref().send_dispatch( - // natreq.expect("couldnt construct message nattraversalrequest2"), - // sd.serveraddress(), - // false, - // sd.messages_list().clone(), - //); - - //if quick_ping(&addr, 1500, sd).await { - // return Some(addr); - //} + if quick_ping(&addr, 15000, sd).await { + return Some(addr); + } } None diff --git a/client-network/src/message_handling.rs b/client-network/src/message_handling.rs index fed2e85..c9ea821 100644 --- a/client-network/src/message_handling.rs +++ b/client-network/src/message_handling.rs @@ -51,8 +51,6 @@ const DATUM: u8 = 132; const NATTRAVERSALREQUEST: u8 = 4; const NATTRAVERSALREQUEST2: u8 = 5; -type PendingMap = Arc>>>>; - pub fn handle_recevied_message( messages_list: &Arc>>, recevied_message: &Vec, @@ -63,7 +61,6 @@ pub fn handle_recevied_message( cmd_tx: crossbeam_channel::Sender, ip: SocketAddr, handhsake_history: Arc, - pending: PendingMap, ) { if recevied_message.len() < 4 { return; @@ -72,12 +69,6 @@ pub fn handle_recevied_message( let message_id: [u8; 4] = recevied_message[0..4].try_into().expect("size error"); let id = i32::from_be_bytes(message_id); - let maybe_tx = { - let mut map = pending.lock().expect("couldnt lock pending map"); - println!("trying to remove : {}", id); - map.remove(&id) - }; - let mut is_resp_to_server_handshake = false; if recevied_message[4] == HELLO { diff --git a/client-network/src/messages_channels.rs b/client-network/src/messages_channels.rs index b92b1fc..b881c1f 100644 --- a/client-network/src/messages_channels.rs +++ b/client-network/src/messages_channels.rs @@ -3,8 +3,6 @@ use tokio::sync::oneshot; use tokio::time::sleep; use crate::P2PSharedData; -use crate::PendingMap; -use crate::cryptographic_signature::CryptographicSignature; use crate::message_handling::EventType; use crate::message_handling::handle_recevied_message; use crate::peers_refresh::HandshakeHistory; @@ -272,7 +270,6 @@ pub fn start_receving_thread( let senders_clone = shared_data.senders(); let messages_clone = shared_data.messages_list(); let servername_clone = shared_data.servername(); - let pending_clone = shared_data.pending.clone(); // Arc> let thread = thread::spawn(move || { let mut buf = [0u8; 1024]; loop { @@ -290,7 +287,6 @@ pub fn start_receving_thread( cmd_tx.clone(), src, handshake_history.clone(), - pending_clone.clone(), ); } Err(e) => eprintln!("Erreur de réception: {}", e),