decent progress
This commit is contained in:
@@ -32,8 +32,6 @@ use std::{
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
|
||||
type PendingMap = Arc<Mutex<HashMap<i32, oneshot::Sender<Vec<u8>>>>>;
|
||||
|
||||
pub struct P2PSharedData {
|
||||
shared_socket: Arc<UdpSocket>,
|
||||
shared_cryptopair: Arc<CryptographicSignature>,
|
||||
@@ -43,13 +41,12 @@ pub struct P2PSharedData {
|
||||
server_address: Arc<Mutex<String>>,
|
||||
handshake_peers: Arc<HandshakeHistory>,
|
||||
threads: Vec<Worker>,
|
||||
pub pending: PendingMap,
|
||||
}
|
||||
|
||||
use bytes::Bytes;
|
||||
use reqwest::Client;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::time::timeout;
|
||||
use tokio::time::{sleep, timeout};
|
||||
|
||||
impl P2PSharedData {
|
||||
pub fn new(
|
||||
@@ -65,7 +62,6 @@ impl P2PSharedData {
|
||||
let shared_messageslist = Arc::new(Mutex::new(messages_list));
|
||||
|
||||
let mut threads = Vec::new();
|
||||
let pending = Arc::new(Mutex::new(HashMap::new()));
|
||||
|
||||
let senders = MultipleSenders::new(1, &shared_socket, cmd_tx, &mut threads);
|
||||
let shared_senders = Arc::new(senders);
|
||||
@@ -82,7 +78,6 @@ impl P2PSharedData {
|
||||
server_address: server_address,
|
||||
handshake_peers: handhsake_peers,
|
||||
threads,
|
||||
pending,
|
||||
})
|
||||
}
|
||||
pub fn socket(&self) -> Arc<UdpSocket> {
|
||||
@@ -449,10 +444,7 @@ pub fn start_p2p_executor(
|
||||
|
||||
match peer_addr_query.await {
|
||||
Some(peer_addr) => {
|
||||
let payload = socket_addr_to_vec(
|
||||
SocketAddr::from_str(server_addr.as_str())
|
||||
.expect("couldnt parse server address"),
|
||||
);
|
||||
let payload = socket_addr_to_vec(peer_addr);
|
||||
|
||||
print!("{:?}", payload.clone());
|
||||
|
||||
@@ -525,27 +517,15 @@ async fn quick_ping(addr: &SocketAddr, timeout_ms: u64, sd: &P2PSharedData) -> b
|
||||
let id = 42069;
|
||||
let pingreq = construct_message(PING, Vec::new(), id, &sd.shared_cryptopair);
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
{
|
||||
let mut pending = sd.pending.lock().expect("couldnt lock pending map");
|
||||
pending.insert(id, tx);
|
||||
}
|
||||
|
||||
if let Some(ping) = pingreq {
|
||||
sd.senders_ref()
|
||||
.add_message_to_retry_queue(ping.clone(), addr.to_string(), false);
|
||||
sd.add_message(id, EventType::Ping);
|
||||
sd.senders_ref()
|
||||
.send_dispatch(ping, addr.to_string(), false, sd.messages_list());
|
||||
}
|
||||
|
||||
let dur = Duration::from_millis(timeout_ms);
|
||||
let res = timeout(dur, rx).await;
|
||||
sleep(Duration::from_millis(timeout_ms)).await;
|
||||
|
||||
// cleanup pending (in case of timeout or channel closed)
|
||||
let mut pending = sd.pending.lock().expect("couldn't lock pending map");
|
||||
pending.remove(&id);
|
||||
|
||||
matches!(res, Ok(Ok(_))) && !pending.contains_key(&id)
|
||||
!sd.messages_list().lock().expect("yooo").contains_key(&id)
|
||||
}
|
||||
|
||||
///
|
||||
@@ -605,35 +585,28 @@ pub async fn get_socket_address(
|
||||
|
||||
for addr in addresses {
|
||||
println!("trying address : {}", addr);
|
||||
if quick_ping(&addr, 20000, sd).await {
|
||||
if quick_ping(&addr, 10000, sd).await {
|
||||
return Some(addr);
|
||||
}
|
||||
|
||||
// TODO: nat_traversal(&addr).await;
|
||||
// after NAT traversal attempt, ping again
|
||||
let payload = socket_addr_to_vec(addr);
|
||||
let id = generate_id();
|
||||
|
||||
//let payload = socket_addr_to_vec(
|
||||
// SocketAddr::from_str(&sd.serveraddress().as_str())
|
||||
// .expect("couldnt parse server address"),
|
||||
//);
|
||||
let natreq = construct_message(NATTRAVERSALREQUEST, payload.clone(), id, &sd.cryptopair());
|
||||
|
||||
//let natreq = construct_message(
|
||||
// NATTRAVERSALREQUEST,
|
||||
// payload,
|
||||
// generate_id(),
|
||||
// &sd.shared_cryptopair,
|
||||
//);
|
||||
sd.add_message(id, EventType::NatTraversal);
|
||||
sd.senders_ref().send_dispatch(
|
||||
natreq.expect("couldnt construct message nattraversalrequest2"),
|
||||
sd.serveraddress().to_string(),
|
||||
false,
|
||||
sd.messages_list(),
|
||||
);
|
||||
|
||||
//sd.senders_ref().send_dispatch(
|
||||
// natreq.expect("couldnt construct message nattraversalrequest2"),
|
||||
// sd.serveraddress(),
|
||||
// false,
|
||||
// sd.messages_list().clone(),
|
||||
//);
|
||||
|
||||
//if quick_ping(&addr, 1500, sd).await {
|
||||
// return Some(addr);
|
||||
//}
|
||||
if quick_ping(&addr, 15000, sd).await {
|
||||
return Some(addr);
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
|
||||
@@ -51,8 +51,6 @@ const DATUM: u8 = 132;
|
||||
const NATTRAVERSALREQUEST: u8 = 4;
|
||||
const NATTRAVERSALREQUEST2: u8 = 5;
|
||||
|
||||
type PendingMap = Arc<Mutex<HashMap<i32, oneshot::Sender<Vec<u8>>>>>;
|
||||
|
||||
pub fn handle_recevied_message(
|
||||
messages_list: &Arc<Mutex<HashMap<i32, EventType>>>,
|
||||
recevied_message: &Vec<u8>,
|
||||
@@ -63,7 +61,6 @@ pub fn handle_recevied_message(
|
||||
cmd_tx: crossbeam_channel::Sender<NetworkEvent>,
|
||||
ip: SocketAddr,
|
||||
handhsake_history: Arc<HandshakeHistory>,
|
||||
pending: PendingMap,
|
||||
) {
|
||||
if recevied_message.len() < 4 {
|
||||
return;
|
||||
@@ -72,12 +69,6 @@ pub fn handle_recevied_message(
|
||||
let message_id: [u8; 4] = recevied_message[0..4].try_into().expect("size error");
|
||||
let id = i32::from_be_bytes(message_id);
|
||||
|
||||
let maybe_tx = {
|
||||
let mut map = pending.lock().expect("couldnt lock pending map");
|
||||
println!("trying to remove : {}", id);
|
||||
map.remove(&id)
|
||||
};
|
||||
|
||||
let mut is_resp_to_server_handshake = false;
|
||||
|
||||
if recevied_message[4] == HELLO {
|
||||
|
||||
@@ -3,8 +3,6 @@ use tokio::sync::oneshot;
|
||||
use tokio::time::sleep;
|
||||
|
||||
use crate::P2PSharedData;
|
||||
use crate::PendingMap;
|
||||
use crate::cryptographic_signature::CryptographicSignature;
|
||||
use crate::message_handling::EventType;
|
||||
use crate::message_handling::handle_recevied_message;
|
||||
use crate::peers_refresh::HandshakeHistory;
|
||||
@@ -272,7 +270,6 @@ pub fn start_receving_thread(
|
||||
let senders_clone = shared_data.senders();
|
||||
let messages_clone = shared_data.messages_list();
|
||||
let servername_clone = shared_data.servername();
|
||||
let pending_clone = shared_data.pending.clone(); // Arc<Mutex<...>>
|
||||
let thread = thread::spawn(move || {
|
||||
let mut buf = [0u8; 1024];
|
||||
loop {
|
||||
@@ -290,7 +287,6 @@ pub fn start_receving_thread(
|
||||
cmd_tx.clone(),
|
||||
src,
|
||||
handshake_history.clone(),
|
||||
pending_clone.clone(),
|
||||
);
|
||||
}
|
||||
Err(e) => eprintln!("Erreur de réception: {}", e),
|
||||
|
||||
Reference in New Issue
Block a user