Files
p2p/client-network/src/lib.rs
TIBERGHIEN corentin 14fa256f9c wip nattraversal
2026-01-16 11:19:20 +01:00

473 lines
19 KiB
Rust

mod cryptographic_signature;
mod data;
mod datum_parsing;
mod message_handling;
mod messages_channels;
mod messages_structure;
mod peers_refresh;
mod registration;
mod server_communication;
use crate::{
cryptographic_signature::CryptographicSignature,
message_handling::EventType,
messages_channels::{MultipleSenders, start_receving_thread},
messages_structure::{
NATTRAVERSALREQUEST, NATTRAVERSALREQUEST2, ROOTREQUEST, construct_message,
},
peers_refresh::HandshakeHistory,
registration::{parse_addresses, perform_handshake, register_with_the_server},
server_communication::{generate_id, get_peer_list},
};
use std::{
io::Error,
net::{IpAddr, Ipv4Addr, UdpSocket},
};
use std::{
net::SocketAddr,
sync::{Arc, Mutex},
};
pub struct P2PSharedData {
shared_socket: Arc<UdpSocket>,
shared_cryptopair: Arc<CryptographicSignature>,
shared_messageslist: Arc<Mutex<HashMap<i32, EventType>>>,
shared_senders: Arc<MultipleSenders>,
server_name: Arc<Mutex<String>>,
handshake_peers: Arc<HandshakeHistory>,
}
use bytes::Bytes;
use p256::pkcs8::der::pem::Base64Encoder;
impl P2PSharedData {
pub fn new(
username: String,
cmd_tx: crossbeam_channel::Sender<NetworkEvent>,
) -> Result<P2PSharedData, Error> {
let messages_list = HashMap::<i32, EventType>::new();
let username = String::from(username);
let crypto_pair = CryptographicSignature::new(username);
let socket = UdpSocket::bind("0.0.0.0:0")?;
let shared_socket = Arc::new(socket);
let shared_cryptopair = Arc::new(crypto_pair);
let shared_messageslist = Arc::new(Mutex::new(messages_list));
let senders = MultipleSenders::new(1, &shared_socket, cmd_tx);
let shared_senders = Arc::new(senders);
let server_name = Arc::new(Mutex::new("".to_string()));
let handhsake_peers = Arc::new(HandshakeHistory::new());
Ok(P2PSharedData {
shared_socket: shared_socket,
shared_cryptopair: shared_cryptopair,
shared_messageslist: shared_messageslist,
shared_senders: shared_senders,
server_name: server_name,
handshake_peers: handhsake_peers,
})
}
pub fn socket(&self) -> Arc<UdpSocket> {
self.shared_socket.clone()
}
pub fn cryptopair(&self) -> Arc<CryptographicSignature> {
self.shared_cryptopair.clone()
}
pub fn messages_list(&self) -> Arc<Mutex<HashMap<i32, EventType>>> {
self.shared_messageslist.clone()
}
pub fn servername(&self) -> String {
let guard = self.server_name.lock().unwrap();
guard.to_string()
}
pub fn set_servername(&self, new: String) {
let mut guard = self.server_name.lock().unwrap();
*guard = new
}
pub fn senders(&self) -> Arc<MultipleSenders> {
self.shared_senders.clone()
}
pub fn socket_ref(&self) -> &UdpSocket {
&*self.shared_socket
}
pub fn cryptopair_ref(&self) -> &CryptographicSignature {
&*self.shared_cryptopair
}
pub fn handshake_ref(&self) -> &HandshakeHistory {
&*self.handshake_peers
}
pub fn messages_list_ref(&self) -> &Mutex<HashMap<i32, EventType>> {
&*self.shared_messageslist
}
pub fn senders_ref(&self) -> &MultipleSenders {
&*self.shared_senders
}
pub fn add_message(&self, id: i32, evt: EventType) {
let mut map = self.shared_messageslist.lock().unwrap();
map.insert(id, evt);
}
}
/// Messages sent to the Network thread by the GUI.
pub enum NetworkCommand {
ConnectToServerPut(String, String), // ServerIP
ServerHandshake(String, String), // ServerName
FetchPeerList(String), // ServerIP
RegisterAsPeer(String),
Ping(String),
NatTraversal(String, String),
ConnectPeer((String, bool)), // IP:PORT
RequestFileTree(String), // peer_id
RequestDirectoryContent(String, String),
RequestChunk(String, String),
Disconnect(),
ResetServerPeer(),
Discover(String, String, String),
GetChildren(String, String),
// ...
}
/// Messages sent to the GUI by the Network thread.
pub enum NetworkEvent {
Connected(String),
ConnectedHandshake(),
Disconnected(),
Error(String),
PeerConnected(String),
PeerListUpdated(Vec<(String, bool)>),
FileTreeReceived(String, Vec<MerkleNode>), // peer_id, content
DataReceived(String, MerkleNode),
FileTreeRootReceived(String, NodeHash),
HandshakeFailed(),
ServerHandshakeFailed(String),
// ...
}
use std::collections::HashMap;
pub use crate::data::*;
use crossbeam_channel::{Receiver, Sender};
use sha2::{Digest, Sha256};
pub fn calculate_chunk_id(data: &[u8]) -> String {
// 1. Create a new Sha256 hasher instance
let mut hasher = Sha256::new();
// 2. Write the input data into the hasher
hasher.update(data);
// 3. Finalize the hash computation and get the resulting bytes
let hash_bytes = hasher.finalize();
// 4. Convert the hash bytes (array of u8) into a hexadecimal string
// This is the common, human-readable format for cryptographic IDs.
hex::encode(hash_bytes)
}
pub fn start_p2p_executor(
cmd_rx: Receiver<NetworkCommand>,
event_tx: Sender<NetworkEvent>,
mut shared_data: Option<P2PSharedData>,
) -> tokio::task::JoinHandle<()> {
// Use tokio to spawn the asynchronous networking logic
tokio::task::spawn(async move {
// P2P/Networking Setup goes here
let handshake_history = Arc::new(Mutex::new(HandshakeHistory::new()));
let handshake_clone = handshake_history.clone();
println!("Network executor started.");
// Main network loop
loop {
// Check for commands from the GUI
if let Ok(cmd) = cmd_rx.try_recv() {
match cmd {
NetworkCommand::ServerHandshake(username, ip) => {
println!("server handshake called");
if let Some(sd) = shared_data.as_ref() {
start_receving_thread(sd, event_tx.clone(), &handshake_clone);
let res =
perform_handshake(&sd, username, ip, event_tx.clone(), true).await;
} else {
println!("no shared data");
}
}
NetworkCommand::ConnectPeer((username, connected)) => {
println!("[Network] ConnectPeer() called");
println!("[Network] Attempting to connect to: {}", username);
// Network logic to connect...
// If successful, send an event back:
// event_tx.send(NetworkEvent::PeerConnected(addr)).unwrap();
}
NetworkCommand::RequestFileTree(_) => {
println!("[Network] RequestFileTree() called");
}
NetworkCommand::Discover(username, hash, ip) => {
// envoie un handshake au peer, puis un root request
if let Some(sd) = shared_data.as_ref() {
let res = {
let m = handshake_clone.lock().unwrap();
m.get_peer_info_username(username.clone()).cloned()
};
match res {
Some(peerinfo) => {
// envoyer un root request
let rootrequest = construct_message(
ROOTREQUEST,
Vec::new(),
generate_id(),
sd.cryptopair_ref(),
);
match rootrequest {
None => {}
Some(resp_msg) => {
println!("msg_sent:{:?}", resp_msg);
sd.senders_ref().send_via(
0,
resp_msg,
peerinfo.ip.to_string(),
false,
sd.messages_list_ref(),
);
}
}
}
None => {
// envoyer un handshake
let res = perform_handshake(
&sd,
username,
ip,
event_tx.clone(),
false,
)
.await;
}
}
} else {
println!("no shared data");
}
}
NetworkCommand::GetChildren(username, hash) => {
// envoie un datum request au peer
}
NetworkCommand::RequestDirectoryContent(_, _) => {
println!("[Network] RequestDirectoryContent() called");
}
NetworkCommand::RequestChunk(_, _) => {
println!("[Network] RequestChunk() called");
}
NetworkCommand::ConnectToServerPut(ip, name) => {
println!("[Network] ConnectToServer() called");
// Actual server connection
shared_data = match P2PSharedData::new(name.clone(), event_tx.clone()) {
Ok(sd) => Some(sd),
Err(e) => {
let mut err_msg = String::from("failed to initialize socket: ");
err_msg += &e.to_string();
let res = event_tx.send(NetworkEvent::Error(err_msg));
let res = event_tx.send(NetworkEvent::Disconnected());
None
}
};
if let Some(sd) = shared_data.as_ref() {
if let Err(e) = register_with_the_server(&sd.cryptopair(), &ip).await {
let mut err_msg = String::from("request failed: ");
err_msg += &e.to_string();
let res = event_tx.send(NetworkEvent::Error(err_msg));
let res = event_tx.send(NetworkEvent::Disconnected());
} else {
let res = event_tx.send(NetworkEvent::Connected(ip));
println!("username created: {}", sd.cryptopair().username);
}
//println!("ip: {}", ip);
}
//tokio::time::sleep(std::time::Duration::from_millis(5000)).await;
/*let res = event_tx.send(NetworkEvent::Connected());
if let Some(error) = res.err() {
println!(
"[Network] Couldn't send crossbeam message to GUI: {}",
error.to_string()
);
}*/
}
NetworkCommand::FetchPeerList(ip) => {
if ip == "" {
let res = event_tx.send(NetworkEvent::Error(
"Not registered to any server".to_string(),
));
} else {
println!("cc");
match get_peer_list(ip).await {
Ok(body) => match String::from_utf8(body.to_vec()) {
Ok(peers_list) => {
let mut peers: Vec<(String, bool)> = Vec::new();
let mut current = String::new();
for i in peers_list.chars() {
if i == '\n' {
peers.push((current.clone(), false));
current.clear();
} else {
current.push(i);
}
}
let res =
event_tx.send(NetworkEvent::PeerListUpdated(peers));
}
Err(e) => {
eprintln!("invalid UTF-8 in socket address bytes: {}", e);
}
},
Err(e) => println!("error"),
}
}
println!("[Network] FetchPeerList() called");
}
NetworkCommand::RegisterAsPeer(_) => {
println!("[Network] RegisterAsPeer() called");
}
NetworkCommand::Ping(String) => {
println!("[Network] Ping() called");
}
NetworkCommand::Disconnect() => {
if let Some(sd) = shared_data.as_ref() {
println!("Disconnecting: {}", &sd.cryptopair().username);
shared_data = None;
let res = event_tx.send(NetworkEvent::Disconnected());
} else {
println!("no p2p data");
}
}
NetworkCommand::ResetServerPeer() => {
if let Some(sd) = shared_data.as_ref() {
sd.set_servername("".to_string());
} else {
println!("no p2p data");
}
}
NetworkCommand::NatTraversal(username, ip) => {
if let Some(sd) = shared_data.as_ref() {
println!("username:{}, ip:{}", username, ip);
// user server to send nattraversal request
let server_addr_query =
get_socket_address(sd.servername().clone(), ip.clone());
let peer_addr_query = get_socket_address(username.clone(), ip.clone());
match server_addr_query.await {
Some(server_addr) => match peer_addr_query.await {
Some(peer_addr) => {
let payload = socket_addr_to_vec(server_addr);
print!("{:?}", payload.clone());
let natreq = construct_message(
NATTRAVERSALREQUEST,
server_addr.to_string().into_bytes(),
generate_id(),
&sd.cryptopair(),
);
sd.senders_ref().send_via(
0,
natreq.expect(
"couldnt construct message nattraversalrequest2",
),
server_addr.to_string(),
false,
sd.messages_list_ref(),
);
}
None => {
let err_msg = format!("failed to retreive socket address")
.to_string();
let res = event_tx.send(NetworkEvent::Error(err_msg));
}
},
None => {
let err_msg =
format!("failed to retreive socket address").to_string();
let res = event_tx.send(NetworkEvent::Error(err_msg));
}
}
}
}
}
}
// 2. Poll network for new events (e.g., an incoming connection)
// ...
// When a new peer is found:
// event_tx.send(NetworkEvent::PeerConnected("NewPeerID".to_string())).unwrap();
// Avoid spinning too fast
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
})
}
fn socket_addr_to_vec(addr: SocketAddr) -> Vec<u8> {
let mut v = match addr.ip() {
IpAddr::V4(v4) => v4.octets().to_vec(),
IpAddr::V6(v6) => v6.octets().to_vec(),
};
v.extend(&addr.port().to_be_bytes());
v
}
fn parse_pack(s: &str) -> Option<[u8; 6]> {
// split into "ip" and "port"
let mut parts = s.rsplitn(2, ':');
let port_str = parts.next()?;
let ip_str = parts.next()?; // if missing, invalid
let ip: Ipv4Addr = ip_str.parse().ok()?;
let port: u16 = port_str.parse().ok()?;
let octets = ip.octets();
let port_be = port.to_be_bytes();
Some([
octets[0], octets[1], octets[2], octets[3], port_be[0], port_be[1],
])
}
///
/// sends a get request to the server to get the socket address of the given peer
///
pub async fn get_socket_address(username: String, ip: String) -> Option<SocketAddr> {
let client = reqwest::Client::new();
let uri = format!("{}/peers/{}/addresses", ip, username);
let res = client.get(uri).send().await.expect("couldnt get response");
if res.status().is_success() {
println!("Successfully retreived the addresses.");
} else {
eprintln!(
"Failed to get the peers addresses from the server. Status: {}",
res.status()
);
}
let body: Bytes = res.bytes().await.expect("couldnt get bytes");
match String::from_utf8(body.to_vec()) {
Ok(s) => {
let addresses = parse_addresses(&s);
if let Some(first) = addresses.first() {
Some(first.clone())
} else {
None
}
}
Err(_) => None,
}
}