This commit is contained in:
Tiago Batista Cardoso
2026-01-21 23:58:15 +01:00
parent 424c11c5aa
commit 0799841cf2
3 changed files with 99 additions and 52 deletions

View File

@@ -21,6 +21,7 @@ use crate::{
server_communication::{generate_id, get_peer_list}, server_communication::{generate_id, get_peer_list},
threads_handling::Worker, threads_handling::Worker,
}; };
use std::str::FromStr;
use std::{ use std::{
io::Error, io::Error,
net::{IpAddr, Ipv4Addr, UdpSocket}, net::{IpAddr, Ipv4Addr, UdpSocket},
@@ -31,6 +32,8 @@ use std::{
sync::{Arc, Mutex}, sync::{Arc, Mutex},
}; };
type PendingMap = Arc<Mutex<HashMap<i32, oneshot::Sender<Vec<u8>>>>>;
pub struct P2PSharedData { pub struct P2PSharedData {
shared_socket: Arc<UdpSocket>, shared_socket: Arc<UdpSocket>,
shared_cryptopair: Arc<CryptographicSignature>, shared_cryptopair: Arc<CryptographicSignature>,
@@ -40,10 +43,13 @@ pub struct P2PSharedData {
server_address: Arc<Mutex<String>>, server_address: Arc<Mutex<String>>,
handshake_peers: Arc<HandshakeHistory>, handshake_peers: Arc<HandshakeHistory>,
threads: Vec<Worker>, threads: Vec<Worker>,
pub pending: PendingMap,
} }
use bytes::Bytes; use bytes::Bytes;
use reqwest::Client; use reqwest::Client;
use tokio::sync::oneshot;
use tokio::time::timeout;
impl P2PSharedData { impl P2PSharedData {
pub fn new( pub fn new(
@@ -59,6 +65,7 @@ impl P2PSharedData {
let shared_messageslist = Arc::new(Mutex::new(messages_list)); let shared_messageslist = Arc::new(Mutex::new(messages_list));
let mut threads = Vec::new(); let mut threads = Vec::new();
let pending = Arc::new(Mutex::new(HashMap::new()));
let senders = MultipleSenders::new(1, &shared_socket, cmd_tx, &mut threads); let senders = MultipleSenders::new(1, &shared_socket, cmd_tx, &mut threads);
let shared_senders = Arc::new(senders); let shared_senders = Arc::new(senders);
@@ -75,6 +82,7 @@ impl P2PSharedData {
server_address: server_address, server_address: server_address,
handshake_peers: handhsake_peers, handshake_peers: handhsake_peers,
threads, threads,
pending,
}) })
} }
pub fn socket(&self) -> Arc<UdpSocket> { pub fn socket(&self) -> Arc<UdpSocket> {
@@ -432,21 +440,19 @@ pub fn start_p2p_executor(
if let Some(sd) = shared_data.as_ref() { if let Some(sd) = shared_data.as_ref() {
println!("username:{}, ip:{}", username, ip); println!("username:{}, ip:{}", username, ip);
// user server to send nattraversal request // user server to send nattraversal request
let server_addr_query = get_socket_address( let server_addr = sd.serveraddress();
sd.servername().clone(),
ip.clone(),
shared_data.as_ref(),
);
let peer_addr_query = get_socket_address( let peer_addr_query = get_socket_address(
username.clone(), username.clone(),
ip.clone(), ip.clone(),
shared_data.as_ref(), shared_data.as_ref(),
); );
match server_addr_query.await { match peer_addr_query.await {
Some(server_addr) => match peer_addr_query.await {
Some(peer_addr) => { Some(peer_addr) => {
let payload = socket_addr_to_vec(server_addr); let payload = socket_addr_to_vec(
SocketAddr::from_str(server_addr.as_str())
.expect("couldnt parse server address"),
);
print!("{:?}", payload.clone()); print!("{:?}", payload.clone());
@@ -468,12 +474,6 @@ pub fn start_p2p_executor(
sd.messages_list(), sd.messages_list(),
); );
} }
None => {
let err_msg = format!("failed to retreive socket address")
.to_string();
let res = event_tx.send(NetworkEvent::Error(err_msg));
}
},
None => { None => {
let err_msg = let err_msg =
format!("failed to retreive socket address").to_string(); format!("failed to retreive socket address").to_string();
@@ -521,10 +521,32 @@ fn parse_pack(s: &str) -> Option<[u8; 6]> {
]) ])
} }
fn quick_ping(addr: &SocketAddr, timeout_ms: u64) -> bool { async fn quick_ping(addr: &SocketAddr, timeout_ms: u64, sd: &P2PSharedData) -> bool {
let connect = let id = generate_id();
std::net::TcpStream::connect_timeout(addr, std::time::Duration::from_millis(timeout_ms)); let pingreq = construct_message(PING, Vec::new(), id, &sd.shared_cryptopair)
connect.is_ok() .expect("couldn't build ping message");
let (tx, rx) = oneshot::channel();
{
let mut pending = sd.pending.lock().expect("couldnt lock pending map");
pending.insert(id, tx);
}
if let Err(e) = sd.socket().send_to(&pingreq, addr) {
// remove pending on send failure
let mut pending = sd.pending.lock().expect("couldnt lock pending map");
pending.remove(&id);
eprintln!("send_to failed: {}", e);
return false;
}
let dur = Duration::from_millis(timeout_ms);
let res = timeout(dur, rx).await;
// cleanup pending (in case of timeout or channel closed)
let mut pending = sd.pending.lock().expect("couldn't lock pending map");
pending.remove(&id);
matches!(res, Ok(Ok(_)))
} }
/// ///
@@ -584,28 +606,33 @@ pub async fn get_socket_address(
for addr in addresses { for addr in addresses {
println!("trying address : {}", addr); println!("trying address : {}", addr);
if quick_ping(&addr, 1000) { if quick_ping(&addr, 1000, sd).await {
return Some(addr); return Some(addr);
} }
// TODO: nat_traversal(&addr).await; // TODO: nat_traversal(&addr).await;
// after NAT traversal attempt, ping again // after NAT traversal attempt, ping again
let natreq = construct_message( //let payload = socket_addr_to_vec(
NATTRAVERSALREQUEST, // SocketAddr::from_str(&sd.serveraddress().as_str())
addr.to_string().into_bytes(), // .expect("couldnt parse server address"),
generate_id(), //);
&sd.shared_cryptopair,
);
sd.senders_ref().send_dispatch( //let natreq = construct_message(
natreq.expect("couldnt construct message nattraversalrequest2"), // NATTRAVERSALREQUEST,
sd.serveraddress(), // payload,
false, // generate_id(),
sd.messages_list().clone(), // &sd.shared_cryptopair,
); //);
if quick_ping(&addr, 1500) { //sd.senders_ref().send_dispatch(
// natreq.expect("couldnt construct message nattraversalrequest2"),
// sd.serveraddress(),
// false,
// sd.messages_list().clone(),
//);
if quick_ping(&addr, 1500, sd).await {
return Some(addr); return Some(addr);
} }
} }

View File

@@ -1,5 +1,7 @@
use tokio::sync::oneshot;
use crate::{ use crate::{
NetworkEvent, NodeHash, NetworkEvent, NodeHash, P2PSharedData,
cryptographic_signature::{ cryptographic_signature::{
CryptographicSignature, get_peer_key, sign_message, verify_signature, CryptographicSignature, get_peer_key, sign_message, verify_signature,
}, },
@@ -49,6 +51,8 @@ const DATUM: u8 = 132;
const NATTRAVERSALREQUEST: u8 = 4; const NATTRAVERSALREQUEST: u8 = 4;
const NATTRAVERSALREQUEST2: u8 = 5; const NATTRAVERSALREQUEST2: u8 = 5;
type PendingMap = Arc<Mutex<HashMap<i32, oneshot::Sender<Vec<u8>>>>>;
pub fn handle_recevied_message( pub fn handle_recevied_message(
messages_list: &Arc<Mutex<HashMap<i32, EventType>>>, messages_list: &Arc<Mutex<HashMap<i32, EventType>>>,
recevied_message: &Vec<u8>, recevied_message: &Vec<u8>,
@@ -59,6 +63,7 @@ pub fn handle_recevied_message(
cmd_tx: crossbeam_channel::Sender<NetworkEvent>, cmd_tx: crossbeam_channel::Sender<NetworkEvent>,
ip: SocketAddr, ip: SocketAddr,
handhsake_history: Arc<HandshakeHistory>, handhsake_history: Arc<HandshakeHistory>,
pending: PendingMap,
) { ) {
if recevied_message.len() < 4 { if recevied_message.len() < 4 {
return; return;
@@ -67,6 +72,16 @@ pub fn handle_recevied_message(
let message_id: [u8; 4] = recevied_message[0..4].try_into().expect("size error"); let message_id: [u8; 4] = recevied_message[0..4].try_into().expect("size error");
let id = i32::from_be_bytes(message_id); let id = i32::from_be_bytes(message_id);
let maybe_tx = {
let mut map = pending.lock().expect("couldnt lock pending map");
map.remove(&id)
};
if let Some(tx) = maybe_tx {
let _ = tx.send(Vec::new()); // ignore send error if receiver dropped
return;
}
let mut is_resp_to_server_handshake = false; let mut is_resp_to_server_handshake = false;
if recevied_message[4] == HELLO { if recevied_message[4] == HELLO {

View File

@@ -1,7 +1,9 @@
use crossbeam_channel::Receiver; use crossbeam_channel::Receiver;
use tokio::sync::oneshot;
use tokio::time::sleep; use tokio::time::sleep;
use crate::P2PSharedData; use crate::P2PSharedData;
use crate::PendingMap;
use crate::cryptographic_signature::CryptographicSignature; use crate::cryptographic_signature::CryptographicSignature;
use crate::message_handling::EventType; use crate::message_handling::EventType;
use crate::message_handling::handle_recevied_message; use crate::message_handling::handle_recevied_message;
@@ -193,6 +195,7 @@ pub fn start_retry_thread(
// on verifie si le message a recu une reponse // on verifie si le message a recu une reponse
let message_id: [u8; 4] = front.msg.payload[0..4].try_into().expect("size error"); let message_id: [u8; 4] = front.msg.payload[0..4].try_into().expect("size error");
let id = i32::from_be_bytes(message_id); let id = i32::from_be_bytes(message_id);
let message_type = front.msg.payload[4]; let message_type = front.msg.payload[4];
let guard = messages_list.lock().unwrap(); let guard = messages_list.lock().unwrap();
@@ -269,6 +272,7 @@ pub fn start_receving_thread(
let senders_clone = shared_data.senders(); let senders_clone = shared_data.senders();
let messages_clone = shared_data.messages_list(); let messages_clone = shared_data.messages_list();
let servername_clone = shared_data.servername(); let servername_clone = shared_data.servername();
let pending_clone = shared_data.pending.clone(); // Arc<Mutex<...>>
let thread = thread::spawn(move || { let thread = thread::spawn(move || {
let mut buf = [0u8; 1024]; let mut buf = [0u8; 1024];
loop { loop {
@@ -286,6 +290,7 @@ pub fn start_receving_thread(
cmd_tx.clone(), cmd_tx.clone(),
src, src,
handshake_history.clone(), handshake_history.clone(),
pending_clone.clone(),
); );
} }
Err(e) => eprintln!("Erreur de réception: {}", e), Err(e) => eprintln!("Erreur de réception: {}", e),