decent progress

This commit is contained in:
Tiago Batista Cardoso
2026-01-22 01:05:02 +01:00
parent 9ba752641b
commit f8e3e46672
3 changed files with 19 additions and 59 deletions

View File

@@ -37,8 +37,6 @@ use std::{
sync::{Arc, Mutex},
};
type PendingMap = Arc<Mutex<HashMap<i32, oneshot::Sender<Vec<u8>>>>>;
pub struct P2PSharedData {
shared_socket: Arc<UdpSocket>,
shared_cryptopair: Arc<CryptographicSignature>,
@@ -48,13 +46,12 @@ pub struct P2PSharedData {
server_address: Arc<Mutex<String>>,
handshake_peers: Arc<HandshakeHistory>,
threads: Vec<Worker>,
pub pending: PendingMap,
}
use bytes::Bytes;
use reqwest::Client;
use tokio::sync::oneshot;
use tokio::time::timeout;
use tokio::time::{sleep, timeout};
impl P2PSharedData {
pub fn new(
@@ -70,7 +67,6 @@ impl P2PSharedData {
let shared_messageslist = Arc::new(Mutex::new(messages_list));
let mut threads = Vec::new();
let pending = Arc::new(Mutex::new(HashMap::new()));
let senders = MultipleSenders::new(1, &shared_socket, cmd_tx, &mut threads);
let shared_senders = Arc::new(senders);
@@ -87,7 +83,6 @@ impl P2PSharedData {
server_address: server_address,
handshake_peers: handhsake_peers,
threads,
pending,
})
}
pub fn socket(&self) -> Arc<UdpSocket> {
@@ -569,10 +564,7 @@ pub fn start_p2p_executor(
match peer_addr_query.await {
Some(peer_addr) => {
let payload = socket_addr_to_vec(
SocketAddr::from_str(server_addr.as_str())
.expect("couldnt parse server address"),
);
let payload = socket_addr_to_vec(peer_addr);
print!("{:?}", payload.clone());
@@ -645,27 +637,15 @@ async fn quick_ping(addr: &SocketAddr, timeout_ms: u64, sd: &P2PSharedData) -> b
let id = 42069;
let pingreq = construct_message(PING, Vec::new(), id, &sd.shared_cryptopair);
let (tx, rx) = oneshot::channel();
{
let mut pending = sd.pending.lock().expect("couldnt lock pending map");
pending.insert(id, tx);
}
if let Some(ping) = pingreq {
sd.senders_ref()
.add_message_to_retry_queue(ping.clone(), addr.to_string(), false);
sd.add_message(id, EventType::Ping);
sd.senders_ref()
.send_dispatch(ping, addr.to_string(), false, sd.messages_list());
}
let dur = Duration::from_millis(timeout_ms);
let res = timeout(dur, rx).await;
sleep(Duration::from_millis(timeout_ms)).await;
// cleanup pending (in case of timeout or channel closed)
let mut pending = sd.pending.lock().expect("couldn't lock pending map");
pending.remove(&id);
matches!(res, Ok(Ok(_))) && !pending.contains_key(&id)
!sd.messages_list().lock().expect("yooo").contains_key(&id)
}
///
@@ -725,35 +705,28 @@ pub async fn get_socket_address(
for addr in addresses {
println!("trying address : {}", addr);
if quick_ping(&addr, 20000, sd).await {
if quick_ping(&addr, 10000, sd).await {
return Some(addr);
}
// TODO: nat_traversal(&addr).await;
// after NAT traversal attempt, ping again
let payload = socket_addr_to_vec(addr);
let id = generate_id();
//let payload = socket_addr_to_vec(
// SocketAddr::from_str(&sd.serveraddress().as_str())
// .expect("couldnt parse server address"),
//);
let natreq = construct_message(NATTRAVERSALREQUEST, payload.clone(), id, &sd.cryptopair());
//let natreq = construct_message(
// NATTRAVERSALREQUEST,
// payload,
// generate_id(),
// &sd.shared_cryptopair,
//);
sd.add_message(id, EventType::NatTraversal);
sd.senders_ref().send_dispatch(
natreq.expect("couldnt construct message nattraversalrequest2"),
sd.serveraddress().to_string(),
false,
sd.messages_list(),
);
//sd.senders_ref().send_dispatch(
// natreq.expect("couldnt construct message nattraversalrequest2"),
// sd.serveraddress(),
// false,
// sd.messages_list().clone(),
//);
//if quick_ping(&addr, 1500, sd).await {
// return Some(addr);
//}
if quick_ping(&addr, 15000, sd).await {
return Some(addr);
}
}
None