mod cryptographic_signature; mod data; mod datum_generation; mod datum_parsing; mod fetchsocketaddresserror; mod message_handling; mod messages_channels; mod messages_structure; mod peers_refresh; mod registration; mod server_communication; mod threads_handling; mod timestamp; use crate::fetchsocketaddresserror::FetchSocketAddressError; use crate::messages_structure::ROOTREPLY; use crate::peers_refresh::*; use crate::timestamp::Timestamp; use crate::{ cryptographic_signature::CryptographicSignature, message_handling::EventType, messages_channels::{MultipleSenders, start_receving_thread, start_retry_thread}, messages_structure::{ DATUM, DATUMREQUEST, NATTRAVERSALREQUEST, NATTRAVERSALREQUEST2, NODATUM, PING, ROOTREQUEST, construct_message, }, peers_refresh::HandshakeHistory, registration::{parse_addresses, perform_handshake, register_with_the_server}, server_communication::{generate_id, get_peer_list}, threads_handling::Worker, }; use std::collections::HashSet; use std::{ io::Error, net::{IpAddr, Ipv4Addr, UdpSocket}, time::Duration, }; use std::{ net::SocketAddr, sync::{Arc, Mutex}, }; pub struct P2PSharedData { shared_socket: Arc, shared_cryptopair: Arc, shared_messageslist: Arc>>, shared_messagesreceived: Arc>>, shared_senders: Arc, server_name: Arc>, server_address: Arc>, handshake_peers: Arc, threads: Vec, } use bytes::Bytes; use reqwest::Client; use tokio::time::sleep; impl P2PSharedData { pub fn new( username: String, cmd_tx: crossbeam_channel::Sender, ) -> Result { let messages_list = HashMap::::new(); let messagesrecv_list = HashMap::::new(); let username = String::from(username); let crypto_pair = CryptographicSignature::new(username); let socket = UdpSocket::bind("0.0.0.0:0")?; let shared_socket = Arc::new(socket); let shared_cryptopair = Arc::new(crypto_pair); let shared_messageslist = Arc::new(Mutex::new(messages_list)); let shared_messagesreceived = Arc::new(Mutex::new(messagesrecv_list)); let mut threads = Vec::new(); let senders = MultipleSenders::new( 5, &shared_socket, cmd_tx, &mut threads, shared_messageslist.clone(), ); let shared_senders = Arc::new(senders); let server_name = Arc::new(Mutex::new("".to_string())); let server_address = Arc::new(Mutex::new("".to_string())); let handhsake_peers = Arc::new(HandshakeHistory::new()); Ok(P2PSharedData { shared_socket: shared_socket, shared_cryptopair: shared_cryptopair, shared_messageslist: shared_messageslist, shared_messagesreceived: shared_messagesreceived, shared_senders: shared_senders, server_name: server_name, server_address: server_address, handshake_peers: handhsake_peers, threads, }) } pub fn socket(&self) -> Arc { self.shared_socket.clone() } pub fn cryptopair(&self) -> Arc { self.shared_cryptopair.clone() } pub fn messages_list(&self) -> Arc>> { self.shared_messageslist.clone() } pub fn messages_received(&self) -> Arc>> { self.shared_messagesreceived.clone() } pub fn servername(&self) -> String { let guard = { let maybe_sn = self.server_name.lock().unwrap(); maybe_sn.clone() }; guard.to_string() } pub fn serveraddress(&self) -> String { let guard = { let maybe_sn = self.server_address.lock().unwrap(); maybe_sn.clone() }; guard.to_string() } pub fn set_servername(&self, new: String) { let mut guard = self.server_name.lock().unwrap(); *guard = new } pub fn set_serveraddress(&self, new: String) { let mut guard = self.server_address.lock().unwrap(); *guard = new } pub fn senders(&self) -> Arc { self.shared_senders.clone() } pub fn socket_ref(&self) -> &UdpSocket { &*self.shared_socket } pub fn handshakes(&self) -> Arc { self.handshake_peers.clone() } pub fn cryptopair_ref(&self) -> &CryptographicSignature { &*self.shared_cryptopair } pub fn messages_list_ref(&self) -> &Mutex> { &*self.shared_messageslist } pub fn messages_received_ref(&self) -> &Mutex> { &*self.shared_messagesreceived } pub fn senders_ref(&self) -> &MultipleSenders { &*self.shared_senders } pub fn add_message(&self, id: i32, evt: EventType) { let mut map = self.shared_messageslist.lock().unwrap(); map.insert(id, evt); } pub fn threads(&mut self) -> &mut Vec { &mut self.threads } pub fn close_threads(&mut self) { for w in self.threads.drain(..) { w.stop(); } } } /// Messages sent to the Network thread by the GUI. pub enum NetworkCommand { ConnectToServerPut(String, String), // ServerIP ServerHandshake(String, String), // ServerName FetchPeerList(String), // ServerIP RegisterAsPeer(String), Ping(String, String), NatTraversal(String, String), ConnectPeer((String, bool)), // IP:PORT RequestFileTree(String), // peer_id RequestDirectoryContent(String, String), RequestChunk(String, String), Disconnect(), ResetServerPeer(), Discover(String, String, String), GetChildren([u8; 32], String, bool), SendDatum(MerkleNode, [u8; 32], String), SendNoDatum(Vec, String), SendRootReply(Vec, String), InitDownload([u8; 32], String, String), // ... } /// Messages sent to the GUI by the Network thread. pub enum NetworkEvent { Connected(String), ConnectedHandshake(), Disconnected(), Error(String, String), Success(String, String), PeerListUpdated(Vec<(String, bool)>), FileTreeReceived([u8; 32], MerkleNode, String), // peer_id, content DataReceived([u8; 32], MerkleNode, String), FileTreeRootReceived(String, NodeHash), HandshakeFailed(), ServerHandshakeFailed(String), DatumRequest([u8; 32], String), RootRequest(String), InitDownload([u8; 32], String, String), // ... } use std::collections::HashMap; pub use crate::data::*; use crossbeam_channel::{Receiver, Sender}; use sha2::{Digest, Sha256}; pub fn calculate_chunk_id(data: &[u8]) -> String { // 1. Create a new Sha256 hasher instance let mut hasher = Sha256::new(); // 2. Write the input data into the hasher hasher.update(data); // 3. Finalize the hash computation and get the resulting bytes let hash_bytes = hasher.finalize(); // 4. Convert the hash bytes (array of u8) into a hexadecimal string // This is the common, human-readable format for cryptographic IDs. hex::encode(hash_bytes) } pub fn start_p2p_executor( cmd_rx: Receiver, event_tx: Sender, mut shared_data: Option, ) -> tokio::task::JoinHandle<()> { // Use tokio to spawn the asynchronous networking logic tokio::task::spawn(async move { // P2P/Networking Setup goes here println!("Network executor started."); // Main network loop loop { // Check for commands from the GUI if let Ok(cmd) = cmd_rx.try_recv() { match cmd { NetworkCommand::InitDownload(hash, ip, name) => { if let Some(sd) = shared_data.as_ref() { if let Some(res) = sd.handshake_peers.get_peer_info_username(ip) { let _ = event_tx.send(NetworkEvent::InitDownload( hash, res.ip.to_string(), name.to_string(), )); } } } NetworkCommand::SendRootReply(node_hash, addr) => { if let Some(sd) = shared_data.as_mut() { let mut payload = Vec::new(); payload.extend_from_slice(&node_hash); let new_id = generate_id(); let message = construct_message(ROOTREPLY, payload, new_id, sd.cryptopair_ref()); match message { None => {} Some(resp_msg) => { println!("msg_sent:{:?}", resp_msg); sd.senders_ref().send_dispatch( resp_msg, addr.clone(), false, sd.messages_list(), ); } } } } NetworkCommand::SendNoDatum(node_hash, addr) => { if let Some(sd) = shared_data.as_mut() { let mut payload = Vec::new(); payload.extend_from_slice(&node_hash); let new_id = generate_id(); let message = construct_message(NODATUM, payload, new_id, sd.cryptopair_ref()); match message { None => {} Some(resp_msg) => { println!("msg_sent:{:?}", resp_msg); sd.senders_ref().send_dispatch( resp_msg, addr.clone(), false, sd.messages_list(), ); } } } } NetworkCommand::SendDatum(merklennode, node_hash, addr) => { if let Some(sd) = shared_data.as_mut() { let mut payload = Vec::new(); payload.extend_from_slice(&node_hash); payload.extend_from_slice(&merklennode.serialize()); let new_id = generate_id(); let message = construct_message(DATUM, payload, new_id, sd.cryptopair_ref()); match message { None => {} Some(resp_msg) => { println!("msg_sent:{:?}", resp_msg); sd.senders_ref().send_dispatch( resp_msg, addr.clone(), false, sd.messages_list(), ); } } } } NetworkCommand::ServerHandshake(username, ip) => { println!("server handshake called"); if let Some(sd) = shared_data.as_mut() { start_receving_thread(sd, event_tx.clone(), sd.handshakes()); start_retry_thread( sd.senders(), 4, sd.messages_list(), sd.threads().as_mut(), ); update_handshake( sd.senders(), sd.cryptopair(), sd.messages_list(), sd.handshake_peers.get_username_peerinfo_map(), ); let server_address = { match get_server_address(username.to_owned(), ip.to_owned()).await { Some(addr) => addr.to_string(), None => { match event_tx.send(NetworkEvent::Error( "Couldn't fetch server socket address.".to_owned(), username.to_owned(), )) { Ok(_) => {} Err(e) => { println!("Network Event Error : {}", e.to_string()); } } "".to_owned() } } }; if server_address.to_owned().eq(&"".to_owned()) { continue; } sd.set_servername(username.to_owned()); sd.set_serveraddress(server_address.to_string()); println!("SET SERVERADDRESS"); match perform_handshake( &sd, username.to_owned(), ip, event_tx.clone(), (true, server_address.to_string()), ) .await { true => { match event_tx.send(NetworkEvent::Success( "Handshake established ✔️".to_string(), username.to_owned(), )) { Ok(_) => {} Err(err) => { println!("Network Event Error : {}", err.to_string()); } }; } false => {} }; } else { println!("no shared data"); } } NetworkCommand::ConnectPeer((username, connected)) => { println!("[Network] ConnectPeer() called"); println!("[Network] Attempting to connect to: {}", username); // Network logic to connect... // If successful, send an event back: // event_tx.send(NetworkEvent::PeerConnected(addr)).unwrap(); } NetworkCommand::RequestFileTree(_) => { println!("[Network] RequestFileTree() called"); } NetworkCommand::Discover(username, hash, ip) => { // envoie un handshake au peer, puis un root request if let Some(sd) = shared_data.as_ref() { let res = sd .handshake_peers .get_peer_info_username(username.to_owned()); match res { Some(peerinfo) => { let id = generate_id(); // envoyer un root request let rootrequest = construct_message( ROOTREQUEST, Vec::new(), id, sd.cryptopair_ref(), ); println!("matching"); match rootrequest { None => {} Some(resp_msg) => { sd.add_message(id, EventType::RootRequest); println!("msg_sent:{:?}", resp_msg); sd.senders_ref().add_message_to_retry_queue( resp_msg.clone(), peerinfo.ip.to_string(), false, ); sd.senders_ref().send_dispatch( resp_msg, peerinfo.ip.to_string(), false, sd.messages_list(), ); } } } None => { // envoyer un handshake match perform_handshake( &sd, username.to_owned(), ip, event_tx.clone(), (false, "".to_string()), ) .await { true => { match event_tx.send(NetworkEvent::Success( "Handshake established ✔️".to_string(), username.to_owned(), )) { Ok(_) => {} Err(err) => { println!( "Network Event Error : {}", err.to_string() ); } }; } false => {} } } } } else { println!("no shared data"); } } NetworkCommand::GetChildren(hash, ip, is_file) => { if let Some(sd) = shared_data.as_ref() { let mut payload = Vec::new(); payload.extend_from_slice(&hash); let new_id = generate_id(); let datumreqest = construct_message( DATUMREQUEST, payload, new_id, sd.cryptopair_ref(), ); match datumreqest { None => {} Some(resp_msg) => { if is_file { sd.add_message(new_id, EventType::DatumRequestBig); } else { sd.add_message(new_id, EventType::DatumRequest); } println!("msg_sent:{:?}", resp_msg); sd.senders_ref().add_message_to_retry_queue( resp_msg.clone(), ip.clone(), false, ); sd.senders_ref().send_dispatch( resp_msg, ip.clone(), false, sd.messages_list(), ); } } } } NetworkCommand::RequestDirectoryContent(_, _) => { println!("[Network] RequestDirectoryContent() called"); } NetworkCommand::RequestChunk(_, _) => { println!("[Network] RequestChunk() called"); } NetworkCommand::ConnectToServerPut(ip, name) => { println!("[Network] ConnectToServer() called"); // Actual server connection shared_data = match P2PSharedData::new(name.clone(), event_tx.clone()) { Ok(sd) => Some(sd), Err(e) => { let mut err_msg = String::from("failed to initialize socket: "); err_msg += &e.to_string(); let res = event_tx.send(NetworkEvent::Error(err_msg, name.to_owned())); let res = event_tx.send(NetworkEvent::Disconnected()); None } }; if let Some(sd) = shared_data.as_ref() { if let Err(e) = register_with_the_server(&sd.cryptopair(), &ip).await { let mut err_msg = String::from("request failed: "); err_msg += &e.to_string(); let res = event_tx.send(NetworkEvent::Error(err_msg, name.to_owned())); let res = event_tx.send(NetworkEvent::Disconnected()); } else { let res = event_tx.send(NetworkEvent::Connected(ip)); println!("username created: {}", sd.cryptopair().username); } //println!("ip: {}", ip); } //tokio::time::sleep(std::time::Duration::from_millis(5000)).await; /*let res = event_tx.send(NetworkEvent::Connected()); if let Some(error) = res.err() { println!( "[Network] Couldn't send crossbeam message to GUI: {}", error.to_string() ); }*/ } NetworkCommand::FetchPeerList(ip) => { println!("[Network] FetchPeerList() called"); if ip == "" { let res = event_tx.send(NetworkEvent::Error( "Not registered to any server".to_string(), "".to_owned(), )); } else { println!("cc"); match get_peer_list(ip).await { Ok(body) => match String::from_utf8(body.to_vec()) { Ok(peers_list) => { let mut peers: Vec<(String, bool)> = Vec::new(); let mut current = String::new(); for i in peers_list.chars() { if i == '\n' { peers.push((current.clone(), false)); current.clear(); } else { current.push(i); } } match event_tx.send(NetworkEvent::PeerListUpdated(peers)) { Ok(_) => {} Err(err) => { println!( "Network Event Error : {}", err.to_string() ); } }; } Err(e) => { eprintln!("invalid UTF-8 in socket address bytes: {}", e); } }, Err(e) => println!("error : {}", e), } } } NetworkCommand::RegisterAsPeer(_) => { println!("[Network] RegisterAsPeer() called"); } NetworkCommand::Ping(str, ip) => { println!("[Network] Ping({}) called", str); if let Some(sd) = shared_data.as_ref() { let id = generate_id(); sd.add_message(id, EventType::Ping); let peer_address = get_socket_address(str.to_owned(), ip, shared_data.as_ref()).await; match peer_address { Ok(addr) => { match event_tx.send(NetworkEvent::Success( format!( "Successfully sent ping message to {}.", addr.to_string(), ), str.to_owned(), )) { Ok(_) => {} Err(e) => { eprintln!("NetworkEvent error : {}", e); } }; } Err(err_msg) => { match event_tx .send(NetworkEvent::Error(err_msg.to_string(), str)) { Ok(_) => {} Err(e) => { eprintln!("NetworkEvent error : {}", e); } } } } } } NetworkCommand::Disconnect() => { if let Some(sd) = shared_data.as_ref() { println!("Disconnecting: {}", &sd.cryptopair().username); shared_data = None; match event_tx.send(NetworkEvent::Disconnected()) { Ok(_) => {} Err(e) => { eprintln!("NetworkEvent error : {}", e); } } } else { println!("no p2p data"); } } NetworkCommand::ResetServerPeer() => { if let Some(sd) = shared_data.as_ref() { sd.set_servername("".to_string()); } else { println!("no p2p data"); } } NetworkCommand::NatTraversal(username, ip) => { if let Some(sd) = shared_data.as_ref() { println!("username:{}, ip:{}", username, ip); // user server to send nattraversal request let server_addr = sd.serveraddress(); let peer_addr_query = get_socket_address( username.clone(), ip.clone(), shared_data.as_ref(), ); match peer_addr_query.await { Ok(peer_addr) => { let payload = socket_addr_to_vec(peer_addr); print!("{:?}", payload.clone()); let id = generate_id(); let natreq = construct_message( NATTRAVERSALREQUEST, payload.clone(), id.clone(), &sd.cryptopair(), ); sd.add_message(id, EventType::NatTraversal); sd.senders_ref().send_dispatch( natreq.expect( "couldnt construct message nattraversalrequest2", ), server_addr.to_string(), false, sd.messages_list(), ); } Err(err_msg) => { match event_tx .send(NetworkEvent::Error(err_msg.to_string(), username)) { Ok(_) => {} Err(e) => { eprintln!("NetworkEvent error : {}", e); } } } } } } } } // 2. Poll network for new events (e.g., an incoming connection) // ... // When a new peer is found: // event_tx.send(NetworkEvent::PeerConnected("NewPeerID".to_string())).unwrap(); // Avoid spinning too fast sleep(std::time::Duration::from_millis(50)).await; } }) } fn socket_addr_to_vec(addr: SocketAddr) -> Vec { let mut v = match addr.ip() { IpAddr::V4(v4) => v4.octets().to_vec(), IpAddr::V6(v6) => v6.octets().to_vec(), }; v.extend(&addr.port().to_be_bytes()); v } fn parse_pack(s: &str) -> Option<[u8; 6]> { // split into "ip" and "port" let mut parts = s.rsplitn(2, ':'); let port_str = parts.next()?; let ip_str = parts.next()?; // if missing, invalid let ip: Ipv4Addr = ip_str.parse().ok()?; let port: u16 = port_str.parse().ok()?; let octets = ip.octets(); let port_be = port.to_be_bytes(); Some([ octets[0], octets[1], octets[2], octets[3], port_be[0], port_be[1], ]) } async fn quick_ping(addr: &SocketAddr, timeout_ms: u64, sd: &P2PSharedData) -> bool { let id = generate_id(); let pingreq = construct_message(PING, Vec::new(), id, &sd.shared_cryptopair); if let Some(ping) = pingreq { sd.add_message(id, EventType::Ping); sd.senders_ref() .send_dispatch(ping, addr.to_string(), false, sd.messages_list()); } sleep(Duration::from_millis(timeout_ms)).await; let msg_list = sd.messages_list_ref().lock().expect("yooo"); let res = !msg_list.contains_key(&id); for (id, evt) in msg_list.iter() { println!("id : {}, evt : {}", id, evt.to_string()); } println!("message list doesnt contain key? {}", res); res } /// /// sends a get request to the server to get the socket address of the given peer /// pub async fn get_socket_address( username: String, ip: String, shared_data: Option<&P2PSharedData>, ) -> Result { let sd = shared_data.expect("No shared data"); let client = match Client::builder().timeout(Duration::from_secs(5)).build() { Ok(c) => c, Err(e) => { return Err(FetchSocketAddressError::ClientError(e.to_string())); } }; let uri = format!("{}/peers/{}/addresses", ip, username); let res = match client.get(&uri).send().await { Ok(r) => r, Err(e) => return Err(FetchSocketAddressError::ClientError(e.to_string())), }; if res.status().is_success() { println!("Successfully retrieved the addresses. {}", res.status()); } else { eprintln!( "Failed to get the peers addresses from the server. Status: {}", res.status() ); } let body = match res.bytes().await { Ok(b) => b, Err(e) => { return Err(FetchSocketAddressError::ClientError(e.to_string())); } }; let s = match String::from_utf8(body.to_vec()) { Ok(st) => st, Err(e) => { return Err(FetchSocketAddressError::ClientError(e.to_string())); } }; let addresses: Vec = { let temp = parse_addresses(&s); temp.iter() .filter_map(|a| match a { SocketAddr::V4(_) => Some(*a), SocketAddr::V6(_) => None, }) .collect() }; if addresses.is_empty() { return Err(FetchSocketAddressError::NoRegisteredAddresses); } else if !addresses.iter().any(|a| matches!(a, SocketAddr::V4(_))) { return Err(FetchSocketAddressError::NoIPV4Address); } for addr in addresses { println!("trying address : {}", addr); if quick_ping(&addr, 5000, sd).await { return Ok(addr); } let payload = socket_addr_to_vec(addr); let id = generate_id(); let natreq = construct_message(NATTRAVERSALREQUEST, payload.clone(), id, &sd.cryptopair()); sd.add_message(id, EventType::NatTraversal); sd.senders_ref().send_dispatch( natreq.expect("couldnt construct message nattraversalrequest2"), sd.serveraddress().to_string(), false, sd.messages_list(), ); sleep(Duration::from_millis(5000)).await; let maybe_entry = { let guard = sd.messages_received_ref().lock().unwrap(); guard.clone() }; // guard dropped for (id, (evt, time)) in maybe_entry.iter() { println!("{} : {} at {}", id, evt.to_string(), time.to_string()); if id.eq(&addr.to_string()) && Timestamp::now().diff(time) < 10 { println!("received message from address, returning said address.."); return Ok(addr); } } if quick_ping(&addr, 15000, sd).await { return Ok(addr); } } Err(FetchSocketAddressError::NoResponseFromUser) } pub async fn get_server_address(username: String, ip: String) -> Option { let client = Client::builder() .timeout(Duration::from_secs(5)) .build() .expect("cannot create client"); let uri = format!("{}/peers/{}/addresses", ip, username); let res = client.get(uri).send().await.expect("couldnt get response"); if res.status().is_success() { println!("Successfully retreived the addresses. {}", res.status()); } else { eprintln!( "Failed to get the peers addresses from the server. Status: {}", res.status() ); } let body: Bytes = res.bytes().await.expect("couldnt get bytes"); match String::from_utf8(body.to_vec()) { Ok(s) => { let addresses = parse_addresses(&s); if let Some(first) = addresses.first() { Some(first.clone()) } else { None } } Err(_) => None, } }