splash
This commit is contained in:
@@ -26,6 +26,7 @@ use crate::{
|
||||
threads_handling::Worker,
|
||||
};
|
||||
use std::collections::HashSet;
|
||||
|
||||
use std::{
|
||||
io::Error,
|
||||
net::{IpAddr, Ipv4Addr, UdpSocket},
|
||||
@@ -36,6 +37,8 @@ use std::{
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
|
||||
type PendingMap = Arc<Mutex<HashMap<i32, oneshot::Sender<Vec<u8>>>>>;
|
||||
|
||||
pub struct P2PSharedData {
|
||||
shared_socket: Arc<UdpSocket>,
|
||||
shared_cryptopair: Arc<CryptographicSignature>,
|
||||
@@ -45,10 +48,13 @@ pub struct P2PSharedData {
|
||||
server_address: Arc<Mutex<String>>,
|
||||
handshake_peers: Arc<HandshakeHistory>,
|
||||
threads: Vec<Worker>,
|
||||
pub pending: PendingMap,
|
||||
}
|
||||
|
||||
use bytes::Bytes;
|
||||
use reqwest::Client;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::time::timeout;
|
||||
|
||||
impl P2PSharedData {
|
||||
pub fn new(
|
||||
@@ -64,6 +70,7 @@ impl P2PSharedData {
|
||||
let shared_messageslist = Arc::new(Mutex::new(messages_list));
|
||||
|
||||
let mut threads = Vec::new();
|
||||
let pending = Arc::new(Mutex::new(HashMap::new()));
|
||||
|
||||
let senders = MultipleSenders::new(1, &shared_socket, cmd_tx, &mut threads);
|
||||
let shared_senders = Arc::new(senders);
|
||||
@@ -80,6 +87,7 @@ impl P2PSharedData {
|
||||
server_address: server_address,
|
||||
handshake_peers: handhsake_peers,
|
||||
threads,
|
||||
pending,
|
||||
})
|
||||
}
|
||||
pub fn socket(&self) -> Arc<UdpSocket> {
|
||||
@@ -552,48 +560,40 @@ pub fn start_p2p_executor(
|
||||
if let Some(sd) = shared_data.as_ref() {
|
||||
println!("username:{}, ip:{}", username, ip);
|
||||
// user server to send nattraversal request
|
||||
let server_addr_query = get_socket_address(
|
||||
sd.servername().clone(),
|
||||
ip.clone(),
|
||||
shared_data.as_ref(),
|
||||
);
|
||||
let server_addr = sd.serveraddress();
|
||||
let peer_addr_query = get_socket_address(
|
||||
username.clone(),
|
||||
ip.clone(),
|
||||
shared_data.as_ref(),
|
||||
);
|
||||
|
||||
match server_addr_query.await {
|
||||
Some(server_addr) => match peer_addr_query.await {
|
||||
Some(peer_addr) => {
|
||||
let payload = socket_addr_to_vec(server_addr);
|
||||
match peer_addr_query.await {
|
||||
Some(peer_addr) => {
|
||||
let payload = socket_addr_to_vec(
|
||||
SocketAddr::from_str(server_addr.as_str())
|
||||
.expect("couldnt parse server address"),
|
||||
);
|
||||
|
||||
print!("{:?}", payload.clone());
|
||||
print!("{:?}", payload.clone());
|
||||
|
||||
let id = generate_id();
|
||||
let natreq = construct_message(
|
||||
NATTRAVERSALREQUEST,
|
||||
payload.clone(),
|
||||
id.clone(),
|
||||
&sd.cryptopair(),
|
||||
);
|
||||
let id = generate_id();
|
||||
let natreq = construct_message(
|
||||
NATTRAVERSALREQUEST,
|
||||
payload.clone(),
|
||||
id.clone(),
|
||||
&sd.cryptopair(),
|
||||
);
|
||||
|
||||
sd.add_message(id, EventType::NatTraversal);
|
||||
sd.senders_ref().send_dispatch(
|
||||
natreq.expect(
|
||||
"couldnt construct message nattraversalrequest2",
|
||||
),
|
||||
server_addr.to_string(),
|
||||
false,
|
||||
sd.messages_list(),
|
||||
);
|
||||
}
|
||||
None => {
|
||||
let err_msg = format!("failed to retreive socket address")
|
||||
.to_string();
|
||||
let res = event_tx.send(NetworkEvent::Error(err_msg));
|
||||
}
|
||||
},
|
||||
sd.add_message(id, EventType::NatTraversal);
|
||||
sd.senders_ref().send_dispatch(
|
||||
natreq.expect(
|
||||
"couldnt construct message nattraversalrequest2",
|
||||
),
|
||||
server_addr.to_string(),
|
||||
false,
|
||||
sd.messages_list(),
|
||||
);
|
||||
}
|
||||
None => {
|
||||
let err_msg =
|
||||
format!("failed to retreive socket address").to_string();
|
||||
@@ -641,10 +641,32 @@ fn parse_pack(s: &str) -> Option<[u8; 6]> {
|
||||
])
|
||||
}
|
||||
|
||||
fn quick_ping(addr: &SocketAddr, timeout_ms: u64) -> bool {
|
||||
let connect =
|
||||
std::net::TcpStream::connect_timeout(addr, std::time::Duration::from_millis(timeout_ms));
|
||||
connect.is_ok()
|
||||
async fn quick_ping(addr: &SocketAddr, timeout_ms: u64, sd: &P2PSharedData) -> bool {
|
||||
let id = generate_id();
|
||||
let pingreq = construct_message(PING, Vec::new(), id, &sd.shared_cryptopair)
|
||||
.expect("couldn't build ping message");
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
{
|
||||
let mut pending = sd.pending.lock().expect("couldnt lock pending map");
|
||||
pending.insert(id, tx);
|
||||
}
|
||||
if let Err(e) = sd.socket().send_to(&pingreq, addr) {
|
||||
// remove pending on send failure
|
||||
let mut pending = sd.pending.lock().expect("couldnt lock pending map");
|
||||
pending.remove(&id);
|
||||
eprintln!("send_to failed: {}", e);
|
||||
return false;
|
||||
}
|
||||
|
||||
let dur = Duration::from_millis(timeout_ms);
|
||||
let res = timeout(dur, rx).await;
|
||||
|
||||
// cleanup pending (in case of timeout or channel closed)
|
||||
let mut pending = sd.pending.lock().expect("couldn't lock pending map");
|
||||
pending.remove(&id);
|
||||
|
||||
matches!(res, Ok(Ok(_)))
|
||||
}
|
||||
|
||||
///
|
||||
@@ -704,28 +726,33 @@ pub async fn get_socket_address(
|
||||
|
||||
for addr in addresses {
|
||||
println!("trying address : {}", addr);
|
||||
if quick_ping(&addr, 1000) {
|
||||
if quick_ping(&addr, 1000, sd).await {
|
||||
return Some(addr);
|
||||
}
|
||||
|
||||
// TODO: nat_traversal(&addr).await;
|
||||
// after NAT traversal attempt, ping again
|
||||
|
||||
let natreq = construct_message(
|
||||
NATTRAVERSALREQUEST,
|
||||
addr.to_string().into_bytes(),
|
||||
generate_id(),
|
||||
&sd.shared_cryptopair,
|
||||
);
|
||||
//let payload = socket_addr_to_vec(
|
||||
// SocketAddr::from_str(&sd.serveraddress().as_str())
|
||||
// .expect("couldnt parse server address"),
|
||||
//);
|
||||
|
||||
sd.senders_ref().send_dispatch(
|
||||
natreq.expect("couldnt construct message nattraversalrequest2"),
|
||||
sd.serveraddress(),
|
||||
false,
|
||||
sd.messages_list().clone(),
|
||||
);
|
||||
//let natreq = construct_message(
|
||||
// NATTRAVERSALREQUEST,
|
||||
// payload,
|
||||
// generate_id(),
|
||||
// &sd.shared_cryptopair,
|
||||
//);
|
||||
|
||||
if quick_ping(&addr, 1500) {
|
||||
//sd.senders_ref().send_dispatch(
|
||||
// natreq.expect("couldnt construct message nattraversalrequest2"),
|
||||
// sd.serveraddress(),
|
||||
// false,
|
||||
// sd.messages_list().clone(),
|
||||
//);
|
||||
|
||||
if quick_ping(&addr, 1500, sd).await {
|
||||
return Some(addr);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user