exp backoff and theads handling

This commit is contained in:
TIBERGHIEN corentin
2026-01-20 01:10:09 +01:00
parent 08518892f2
commit dacedd1ceb
9 changed files with 469 additions and 331 deletions

View File

@@ -7,17 +7,19 @@ mod messages_structure;
mod peers_refresh;
mod registration;
mod server_communication;
mod threads_handling;
use crate::{
cryptographic_signature::CryptographicSignature,
message_handling::EventType,
messages_channels::{MultipleSenders, start_receving_thread},
messages_channels::{MultipleSenders, start_receving_thread, start_retry_thread},
messages_structure::{
NATTRAVERSALREQUEST, NATTRAVERSALREQUEST2, ROOTREQUEST, construct_message,
NATTRAVERSALREQUEST, NATTRAVERSALREQUEST2, PING, ROOTREQUEST, construct_message,
},
peers_refresh::HandshakeHistory,
registration::{parse_addresses, perform_handshake, register_with_the_server},
server_communication::{generate_id, get_peer_list},
threads_handling::Worker,
};
use std::{
io::Error,
@@ -35,6 +37,7 @@ pub struct P2PSharedData {
shared_senders: Arc<MultipleSenders>,
server_name: Arc<Mutex<String>>,
handshake_peers: Arc<HandshakeHistory>,
threads: Vec<Worker>,
}
use bytes::Bytes;
@@ -53,10 +56,16 @@ impl P2PSharedData {
let shared_cryptopair = Arc::new(crypto_pair);
let shared_messageslist = Arc::new(Mutex::new(messages_list));
let senders = MultipleSenders::new(1, &shared_socket, cmd_tx);
let mut threads = Vec::new();
let senders = MultipleSenders::new(1, &shared_socket, cmd_tx, &mut threads);
let shared_senders = Arc::new(senders);
let server_name = Arc::new(Mutex::new("".to_string()));
let handhsake_peers = Arc::new(HandshakeHistory::new());
let worker = handhsake_peers.update_handshake();
threads.push(worker);
Ok(P2PSharedData {
shared_socket: shared_socket,
shared_cryptopair: shared_cryptopair,
@@ -64,6 +73,7 @@ impl P2PSharedData {
shared_senders: shared_senders,
server_name: server_name,
handshake_peers: handhsake_peers,
threads,
})
}
pub fn socket(&self) -> Arc<UdpSocket> {
@@ -110,6 +120,15 @@ impl P2PSharedData {
let mut map = self.shared_messageslist.lock().unwrap();
map.insert(id, evt);
}
pub fn threads(&mut self) -> &mut Vec<Worker> {
&mut self.threads
}
pub fn close_threads(&mut self) {
for w in self.threads.drain(..) {
w.stop();
}
}
}
/// Messages sent to the Network thread by the GUI.
@@ -118,7 +137,7 @@ pub enum NetworkCommand {
ServerHandshake(String, String), // ServerName
FetchPeerList(String), // ServerIP
RegisterAsPeer(String),
Ping(String),
Ping(String, String),
NatTraversal(String, String),
ConnectPeer((String, bool)), // IP:PORT
RequestFileTree(String), // peer_id
@@ -139,7 +158,7 @@ pub enum NetworkEvent {
Error(String),
PeerConnected(String),
PeerListUpdated(Vec<(String, bool)>),
FileTreeReceived(String, Vec<MerkleNode>), // peer_id, content
FileTreeReceived([u8; 32], MerkleNode), // peer_id, content
DataReceived(String, MerkleNode),
FileTreeRootReceived(String, NodeHash),
HandshakeFailed(),
@@ -188,8 +207,14 @@ pub fn start_p2p_executor(
match cmd {
NetworkCommand::ServerHandshake(username, ip) => {
println!("server handshake called");
if let Some(sd) = shared_data.as_ref() {
if let Some(sd) = shared_data.as_mut() {
start_receving_thread(sd, event_tx.clone(), &handshake_clone);
start_retry_thread(
sd.senders(),
4,
sd.messages_list(),
sd.threads().as_mut(),
);
let res =
perform_handshake(&sd, username, ip, event_tx.clone(), true).await;
} else {
@@ -215,24 +240,32 @@ pub fn start_p2p_executor(
};
match res {
Some(peerinfo) => {
let id = generate_id();
// envoyer un root request
let rootrequest = construct_message(
ROOTREQUEST,
Vec::new(),
generate_id(),
id,
sd.cryptopair_ref(),
);
println!("matching");
match rootrequest {
None => {}
Some(resp_msg) => {
sd.add_message(id, EventType::RootRequest);
println!("msg_sent:{:?}", resp_msg);
sd.senders_ref().send_via(
0,
sd.senders_ref().add_message_to_retry_queue(
resp_msg.clone(),
peerinfo.ip.to_string(),
false,
);
sd.senders_ref().send_dispatch(
resp_msg,
peerinfo.ip.to_string(),
false,
sd.messages_list_ref(),
sd.messages_list(),
);
}
}
@@ -336,7 +369,32 @@ pub fn start_p2p_executor(
NetworkCommand::RegisterAsPeer(_) => {
println!("[Network] RegisterAsPeer() called");
}
NetworkCommand::Ping(String) => {
NetworkCommand::Ping(str, ip) => {
if let Some(sd) = shared_data.as_ref() {
let id = generate_id();
sd.add_message(id, EventType::Ping);
let pingrequest =
construct_message(PING, Vec::new(), id, sd.cryptopair_ref());
let peer_address = get_socket_address(str, ip).await;
match peer_address {
Some(addr) => {
if let Some(ping) = pingrequest {
sd.senders_ref().add_message_to_retry_queue(
ping.clone(),
addr.to_string(),
false,
);
sd.senders_ref().send_dispatch(
ping,
addr.to_string(),
false,
sd.messages_list(),
);
}
}
None => {}
}
}
println!("[Network] Ping() called");
}
NetworkCommand::Disconnect() => {
@@ -372,19 +430,18 @@ pub fn start_p2p_executor(
let natreq = construct_message(
NATTRAVERSALREQUEST,
server_addr.to_string().into_bytes(),
payload.clone(),
generate_id(),
&sd.cryptopair(),
);
sd.senders_ref().send_via(
0,
sd.senders_ref().send_dispatch(
natreq.expect(
"couldnt construct message nattraversalrequest2",
),
server_addr.to_string(),
false,
sd.messages_list_ref(),
sd.messages_list(),
);
}
None => {